Skip to content
This repository was archived by the owner on Nov 4, 2021. It is now read-only.

Disallow plugins from changing the teamID #381

Merged
merged 4 commits into from
May 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Let's get you developing the plugin server in no time:

1. Install dependencies and prepare for takeoff by running command `yarn`.

1. Start a development instance of [PostHog](/PostHog/posthog). After all, this is the _PostHog_ Plugin Server, and it works in conjuction with the main server. To avoid interference, disable the plugin server there with `PLUGIN_SERVER_IDLE=true`.
1. Start a development instance of [PostHog](/PostHog/posthog). After all, this is the _PostHog_ Plugin Server, and it works in conjuction with the main server. To avoid interference, disable the plugin server there with setting the PLUGIN_SERVER_IDLE env variable before running. `PLUGIN_SERVER_IDLE=true ./bin/start`

1. Make sure that the plugin server is configured correctly (see [Configuration](#Configuration)). Two settings that you MUST get right are DATABASE_URL and REDIS_URL - they need to be identical between the plugin server and the main server.

Expand Down Expand Up @@ -74,6 +74,68 @@ It's magic! Just bump up `version` in `package.json` on the main branch and the
You can also use a `bump patch/minor/major` label on a PR - this will do the above for you when the PR is merged.
Courtesy of GitHub Actions.

## Walkthrough

The story begins with `pluginServer.ts -> startPluginServer`, which is the main thread of the plugin server.

This main thread spawns 4 worker threads, managed using Piscina. Each worker thread runs 10 tasks.<sup>[1](#f1)</sup>

### The main thread

Let's talk about the main thread first. This has:

1. `pubSub`: a Redis powered pubSub mechanism for reloading plugins whenever a message is published by the main PostHog app.

2. `server`: sets up connections to required DBs and queues(clickhouse, Kafka, Postgres, Redis), via `server.ts -> createServer`. This is a shared setup between the main and worker threads

3. `fastifyInstance`: sets up a web server. Unused for now, but may be used for enabling webhooks in the future.

4. `piscina`: this is the thread manager. `makePiscina` creates the manager, while `createWorker` creates the worker threads.

5. `scheduleControl`: The scheduled job controller. Responsible for adding piscina tasks for scheduled jobs, when the time comes.
The schedule information makes it into `server.pluginSchedule` via `vm.ts -> createPluginConfigVM -> __tasks`, which parses for `runEvery*` tasks, and
then used in `src/workers/plugins/setup.ts -> loadSchedule`. More about the vm internals in a bit.

6. `jobQueueConsumer`: The internal job queue consumer. This enables retries, scheduling jobs in the future (once) (Note: this is the difference between `scheduleControl` and this internal `jobQueue`). While `scheduleControl` is triggered via `runEveryMinute`, `runEveryHour` tasks, the `jobQueueConsumer` deals with `meta.jobs.doX(event).runAt(new Date())`.

Enqueuing jobs is managed by `job-queue-manager.ts`, which is backed by a Graphile-worker (`graphile-queue.ts`)

7. `queue`: Wait, another queue?

Side Note about Queues:

Yes, there are a LOT of queues. Each of them serve a separate function. The one we've seen already is the graphile job queue. This is the internal one dealing with `job.runAt()` tasks.

Then, there's the main ingestion queue, which sends events from PostHog to the plugin server. This is a Celery (backed by Redis) or Kafka queue, depending on the setup (Enterprise/high event volume is Kafka). These are consumed by the `queue` above, and sent off to the Piscina workers (`src/main/ingestion-queues/queue.ts -> ingestEvent`). Since all of the "real" stuff happens inside the worker threads, you'll find the specific ingestion code there (`src/worker/ingestion/ingest-event.ts`). This finally writes things into Postgres.

It's also a good idea to see the producer side of this ingestion queue, which comes from `Posthog/posthog/api/capture.py`. There's several tasks in this queue, and our plugin server is only interested in one kind of task: `posthog.tasks.process_event.process_event_with_plugins`.

### Worker threads

That's all for the main thread. Onto the workers now: It all begins with `worker.ts` and `createWorker()`

`server` is the same DB connections setup as in the main thread.

What's new here is `setupPlugins` and `createTaskRunner`.

1. `setupPlugins`: Does `loadPluginsFromDB` and then `loadPlugins` (which creates VMs lazily for each plugin+team). TeamID represents a company using plugins, and each team can have it's own set of plugins enabled. The PluginConfig shows which team the config belongs to, the plugin to run, and the VM to run it in.

2. `createTaskRunner`: There's some excellent wizardry happening here. `makePiscina` of `piscina.js` sets up the workers to run the existing file itself (using `__filename` in the setup config, returning `createWorker()`. This `createWorker()` is a function returning `createTaskRunner`, which is a [curried function](https://javascript.info/currying-partials), which given `{task, args}`, returns `workerTasks[task](server, args)`. These worker tasks are available in `src/worker/tasks.ts`.

### Worker Lifecycle

TODO: what happens with getPLuginRows, getPluginConfigRows and SetupPlugins.

Q: Where is teamID populated? At event creation time? (in posthog/posthog? row.pk)

### VM Internals

TODO

### End Notes

<a name="f1">1</a>: What are tasks? - TASKS_PER_WORKER - a Piscina setting (https://github.com/piscinajs/piscina#constructor-new-piscinaoptions) -> concurrentTasksPerWorker

## Questions?

### [Join our Slack community.](posthog.com/slack)
9 changes: 7 additions & 2 deletions src/worker/plugins/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ export async function runOnSnapshot(server: PluginsServer, event: PluginEvent):
}

export async function runProcessEvent(server: PluginsServer, event: PluginEvent): Promise<PluginEvent | null> {
const pluginsToRun = getPluginsForTeam(server, event.team_id)
const teamId = event.team_id
const pluginsToRun = getPluginsForTeam(server, teamId)
let returnedEvent: PluginEvent | null = event

const pluginsSucceeded = []
Expand All @@ -57,6 +58,10 @@ export async function runProcessEvent(server: PluginsServer, event: PluginEvent)

try {
returnedEvent = (await processEvent(returnedEvent)) || null
if (returnedEvent.team_id != teamId) {
returnedEvent = null // don't try to ingest events with modified teamIDs
throw new Error('Illegal Operation: Plugin tried to change teamID')
}
pluginsSucceeded.push(`${pluginConfig.plugin?.name} (${pluginConfig.id})`)
} catch (error) {
await processError(server, pluginConfig, error, returnedEvent)
Expand All @@ -65,7 +70,7 @@ export async function runProcessEvent(server: PluginsServer, event: PluginEvent)
}
server.statsd?.timing(`plugin.process_event`, timer, {
plugin: pluginConfig.plugin?.name ?? '?',
teamId: event.team_id.toString(),
teamId: teamId.toString(),
})

if (!returnedEvent) {
Expand Down
48 changes: 48 additions & 0 deletions tests/plugins.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,54 @@ test('local plugin with broken index.js does not do much', async () => {
unlink()
})

test('plugin changing teamID throws error', async () => {
getPluginRows.mockReturnValueOnce([
mockPluginWithArchive(`
function processEvent (event, meta) {
event.team_id = 400
return event }
`),
])

getPluginConfigRows.mockReturnValueOnce([pluginConfig39])
getPluginAttachmentRows.mockReturnValueOnce([])

await setupPlugins(mockServer)
const { pluginConfigs } = mockServer

const event = { event: '$test', properties: {}, team_id: 2 } as PluginEvent
const returnedEvent = await runProcessEvent(mockServer, event)

expect(returnedEvent).toEqual(null)

expect(processError).toHaveBeenCalledWith(
mockServer,
pluginConfigs.get(39)!,
Error('Illegal Operation: Plugin tried to change teamID'),
null
)
})

test('plugin changing teamID prevents ingestion', async () => {
getPluginRows.mockReturnValueOnce([
mockPluginWithArchive(`
function processEvent (event, meta) {
event.team_id = 400
return event }
`),
])

getPluginConfigRows.mockReturnValueOnce([pluginConfig39])
getPluginAttachmentRows.mockReturnValueOnce([])

await setupPlugins(mockServer)

const event = { event: '$test', properties: {}, team_id: 2 } as PluginEvent
const returnedEvent = await runProcessEvent(mockServer, event)

expect(returnedEvent).toEqual(null)
})

test('plugin throwing error does not prevent ingestion and failure is noted in event', async () => {
// silence some spam
console.log = jest.fn()
Expand Down