diff --git a/package.json b/package.json index 015f3f50f..64a9a9586 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,7 @@ "benchmark:vm:worker": "node --expose-gc node_modules/.bin/jest --runInBand benchmarks/vm/worker.benchmark.ts", "start": "yarn start:dev", "start:dist": "node dist/index.js --base-dir ../posthog", - "start:dev": "ts-node-dev --exit-child src/index.ts --base-dir ../posthog", + "start:dev": "ts-node-dev --exit-child --debug src/index.ts --base-dir ../posthog", "start:dev:ee": "KAFKA_ENABLED=true KAFKA_HOSTS=localhost:9092 yarn start:dev", "build": "yarn clean && yarn compile", "clean": "rimraf dist/*", diff --git a/src/index.ts b/src/index.ts index a2e42a9b4..e32c8feea 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,6 @@ import { defaultConfig, formatConfigHelp } from './config/config' import { initApp } from './init' +import { makePiscina as makeInstallPiscina } from './install_worker/piscina' import { GraphileQueue } from './main/job-queues/concurrent/graphile-queue' import { startPluginsServer } from './main/pluginsServer' import { Status } from './utils/status' @@ -71,6 +72,6 @@ switch (alternativeMode) { default: initApp(defaultConfig) - void startPluginsServer(defaultConfig, makePiscina) // void the returned promise + void startPluginsServer(defaultConfig, makePiscina, makeInstallPiscina) // void the returned promise break } diff --git a/src/install_worker/config.ts b/src/install_worker/config.ts new file mode 100644 index 000000000..b1ba949b7 --- /dev/null +++ b/src/install_worker/config.ts @@ -0,0 +1,40 @@ +import { TaskQueue } from '@posthog/piscina/src/common' + +import { PluginsServerConfig } from '../types' +import { status } from '../utils/status' + +// Copy From: node_modules/piscina/src/index.ts -- copied because it's not exported +export interface PiscinaOptions { + filename?: string | null + minThreads?: number + maxThreads?: number + idleTimeout?: number + maxQueue?: number | 'auto' + concurrentTasksPerWorker?: number + useAtomics?: boolean + resourceLimits?: any + argv?: string[] + execArgv?: string[] + env?: any + workerData?: any + taskQueue?: TaskQueue + niceIncrement?: number + trackUnmanagedFds?: boolean +} + +export function createConfig(serverConfig: PluginsServerConfig, filename: string): PiscinaOptions { + const config: PiscinaOptions = { + filename, + workerData: { serverConfig }, + resourceLimits: { + stackSizeMb: 10, + }, + } + + status.info('Creating config for 2nd piscina pool', filename) + + config.minThreads = 1 + config.maxThreads = 1 + + return config +} diff --git a/src/install_worker/piscina.d.ts b/src/install_worker/piscina.d.ts new file mode 100644 index 000000000..2af6a717d --- /dev/null +++ b/src/install_worker/piscina.d.ts @@ -0,0 +1,4 @@ +import Piscina from '@posthog/piscina' + +import { PluginsServerConfig } from '../types' +export const makePiscina: (config: PluginsServerConfig) => Piscina diff --git a/src/install_worker/piscina.js b/src/install_worker/piscina.js new file mode 100644 index 000000000..2879c69f0 --- /dev/null +++ b/src/install_worker/piscina.js @@ -0,0 +1,27 @@ +const Sentry = require('@sentry/node') +const { isMainThread, threadId } = require('worker_threads') + +if (isMainThread) { + const Piscina = require('@posthog/piscina') + const { createConfig } = require('./config') + module.exports = { + makePiscina: (serverConfig) => { + const piscina = new Piscina(createConfig(serverConfig, __filename)) + piscina.on('error', (error) => { + Sentry.captureException(error) + console.error('⚠️', 'Piscina worker thread error:\n', error) + }) + console.log('🧵🧵 - building 2nd piscina pool') + return piscina + }, + } +} else { + if (process.env.NODE_ENV === 'test') { + require('ts-node').register() + } + + console.log('🧵🧵 - building worker thread') + const { createWorker } = require('./worker') + const { workerData } = require('@posthog/piscina') + module.exports = createWorker(workerData.serverConfig, threadId) +} diff --git a/src/install_worker/plugins/setup.ts b/src/install_worker/plugins/setup.ts new file mode 100644 index 000000000..e91cb982b --- /dev/null +++ b/src/install_worker/plugins/setup.ts @@ -0,0 +1,102 @@ +import { PluginAttachment } from '@posthog/plugin-scaffold' + +import { Hub, Plugin, PluginConfig, PluginConfigId, PluginId, PluginTaskType, TeamId } from '../../types' +import { status } from '../../utils/status' +import { loadPlugin } from '../../worker/plugins/loadPlugin' +import { loadSchedule } from '../../worker/plugins/setup' +import { teardownPlugins } from '../../worker/plugins/teardown' +import { LazyPluginVM } from '../../worker/vm/lazy' + +export async function setupPlugin(hub: Hub, pluginConfig: PluginConfig): Promise { + const { plugins, pluginConfigs, pluginConfigsPerTeam } = await loadPluginFromDB(hub, pluginConfig) + const pluginVMLoadPromises: Array> = [] + for (const [id, pluginConfig] of pluginConfigs) { + const plugin = plugins.get(pluginConfig.plugin_id) + const prevConfig = hub.pluginConfigs.get(id) + const prevPlugin = prevConfig ? hub.plugins.get(pluginConfig.plugin_id) : null + + status.info('Setting up plugin: ', pluginConfig) + if ( + prevConfig && + pluginConfig.updated_at === prevConfig.updated_at && + plugin?.updated_at == prevPlugin?.updated_at + ) { + pluginConfig.vm = prevConfig.vm + } else { + pluginConfig.vm = new LazyPluginVM() + pluginVMLoadPromises.push(loadPlugin(hub, pluginConfig)) + + if (prevConfig) { + void teardownPlugins(hub, prevConfig) + } + } + } + + await Promise.all(pluginVMLoadPromises) + + hub.plugins = plugins + hub.pluginConfigs = pluginConfigs + hub.pluginConfigsPerTeam = pluginConfigsPerTeam + + for (const teamId of hub.pluginConfigsPerTeam.keys()) { + hub.pluginConfigsPerTeam.get(teamId)?.sort((a, b) => a.order - b.order) + } + + void loadSchedule(hub) +} + +async function loadPluginFromDB( + server: Hub, + pluginConfig: PluginConfig +): Promise> { + const plugin = await server.db.fetchPlugin(pluginConfig.plugin_id) + const plugins = new Map() + + if (plugin) { + plugins.set(plugin.id, plugin) + } + + const pluginAttachmentRows = await server.db.fetchPluginAttachments(pluginConfig.id) + const attachmentsPerConfig = new Map>() + for (const row of pluginAttachmentRows) { + let attachments = attachmentsPerConfig.get(row.plugin_config_id!) + if (!attachments) { + attachments = {} + attachmentsPerConfig.set(row.plugin_config_id!, attachments) + } + attachments[row.key] = { + content_type: row.content_type, + file_name: row.file_name, + contents: row.contents, + } + } + + const row = await server.db.fetchPluginConfig(pluginConfig.id) + + const pluginConfigs = new Map() + const pluginConfigsPerTeam = new Map() + + if (row) { + const plugin = plugins.get(row.plugin_id) + const pluginConfig: PluginConfig = { + ...row, + plugin: plugin, + attachments: attachmentsPerConfig.get(row.id) || {}, + vm: null, + } + pluginConfigs.set(row.id, pluginConfig) + + if (!row.team_id) { + console.error(`🔴 PluginConfig(id=${row.id}) without team_id!`) + } + + let teamConfigs = pluginConfigsPerTeam.get(row.team_id) + if (!teamConfigs) { + teamConfigs = [] + pluginConfigsPerTeam.set(row.team_id, teamConfigs) + } + teamConfigs.push(pluginConfig) + } + + return { plugins, pluginConfigs, pluginConfigsPerTeam } +} diff --git a/src/install_worker/tasks.ts b/src/install_worker/tasks.ts new file mode 100644 index 000000000..60865ef15 --- /dev/null +++ b/src/install_worker/tasks.ts @@ -0,0 +1,30 @@ +import { + Action, + EnqueuedJob, + Hub, + InstallConfig, + PluginConfig, + PluginConfigId, + PluginId, + PluginTaskType, + Team, +} from '../types' +import { status } from '../utils/status' +import { workerTasks as originalWorkerTasks } from '../worker/tasks' + +type TaskRunner = (hub: Hub, args: any) => Promise | any + +export const workerTasks: Record = { + ...originalWorkerTasks, + installPlugin: (hub, args: { installConfig: InstallConfig }) => { + status.info('Piscina worker finished task') + return {} + }, + setupPlugin: (hub, args: { installConfig: InstallConfig }) => { + return {} + }, + reloadPlugins: (hub) => { + return {} // TODO(nk): figure how this should be setup + }, + // override other tasks here +} diff --git a/src/install_worker/vm/vm.ts b/src/install_worker/vm/vm.ts new file mode 100644 index 000000000..c5bbb5158 --- /dev/null +++ b/src/install_worker/vm/vm.ts @@ -0,0 +1,204 @@ +import { RetryError } from '@posthog/plugin-scaffold' +import { randomBytes } from 'crypto' +import { VM } from 'vm2' + +import { Hub, PluginConfig, PluginConfigVMResponse } from '../../types' +import { createJobs } from '../../worker/vm//extensions/jobs' +import { createCache } from '../../worker/vm/extensions/cache' +import { createConsole } from '../../worker/vm/extensions/console' +import { createGeoIp } from '../../worker/vm/extensions/geoip' +import { createGoogle } from '../../worker/vm/extensions/google' +import { createPosthog } from '../../worker/vm/extensions/posthog' +import { createStorage } from '../../worker/vm/extensions/storage' +import { imports } from '../../worker/vm/imports' +import { transformCode } from '../../worker/vm/transforms' +import { upgradeExportEvents } from '../../worker/vm/upgrades/export-events' + +// TODO(nk): Find a way to repeat myself less - can add param to run setup. Much better than this shit. + +export function createPluginConfigVM( + hub: Hub, + pluginConfig: PluginConfig, // NB! might have team_id = 0 + indexJs: string +): Promise { + const transformedCode = transformCode(indexJs, hub, imports) + + // Create virtual machine + const vm = new VM({ + timeout: hub.TASK_TIMEOUT * 1000 + 1, + sandbox: {}, + }) + + // Add PostHog utilities to virtual machine + vm.freeze(createConsole(hub, pluginConfig), 'console') + vm.freeze(createPosthog(hub, pluginConfig), 'posthog') + + // Add non-PostHog utilities to virtual machine + vm.freeze(imports['node-fetch'], 'fetch') + vm.freeze(createGoogle(), 'google') + + vm.freeze(imports, '__pluginHostImports') + + if (process.env.NODE_ENV === 'test') { + vm.freeze(setTimeout, '__jestSetTimeout') + } + + vm.freeze(RetryError, 'RetryError') + + // Creating this outside the vm (so not in a babel plugin for example) + // because `setTimeout` is not available inside the vm... and we don't want to + // make it available for now, as it makes it easier to create malicious code + const asyncGuard = async (promise: () => Promise) => { + const timeout = hub.TASK_TIMEOUT + return await Promise.race([ + promise, + new Promise((resolve, reject) => + setTimeout(() => { + const message = `Script execution timed out after promise waited for ${timeout} second${ + timeout === 1 ? '' : 's' + }` + reject(new Error(message)) + }, timeout * 1000) + ), + ]) + } + + vm.freeze(asyncGuard, '__asyncGuard') + + vm.freeze( + { + cache: createCache(hub, pluginConfig.plugin_id, pluginConfig.team_id), + config: pluginConfig.config, + attachments: pluginConfig.attachments, + storage: createStorage(hub, pluginConfig), + geoip: createGeoIp(hub), + jobs: createJobs(hub, pluginConfig), + }, + '__pluginHostMeta' + ) + + vm.run(` + // two ways packages could export themselves (plus "global") + const module = { exports: {} }; + let exports = {}; + + // the plugin JS code + ${transformedCode}; + `) + + // Add a secret hash to the end of some function names, so that we can (sometimes) identify + // the crashed plugin if it throws an uncaught exception in a promise. + if (!hub.pluginConfigSecrets.has(pluginConfig.id)) { + const secret = randomBytes(16).toString('hex') + hub.pluginConfigSecrets.set(pluginConfig.id, secret) + hub.pluginConfigSecretLookup.set(secret, pluginConfig.id) + } + + // Keep the format of this in sync with `pluginConfigIdFromStack` in utils.ts + // Only place this after functions whose names match /^__[a-zA-Z0-9]+$/ + const pluginConfigIdentifier = `__PluginConfig_${pluginConfig.id}_${hub.pluginConfigSecrets.get(pluginConfig.id)}` + const responseVar = `__pluginDetails${randomBytes(64).toString('hex')}` + + // Explicitly passing __asyncGuard to the returned function from `vm.run` in order + // to make it harder to override the global `__asyncGuard = noop` inside plugins. + // This way even if promises inside plugins are unbounded, the `processEvent` function + // itself will still terminate after TASK_TIMEOUT seconds, not clogging the entire ingestion. + vm.run(` + if (typeof global.${responseVar} !== 'undefined') { + throw new Error("Plugin created variable ${responseVar} that is reserved for the VM.") + } + let ${responseVar} = undefined; + ((__asyncGuard) => { + // where to find exports + let exportDestinations = [ + exports, + exports.default, + module.exports + ].filter(d => typeof d === 'object'); // filters out exports.default if not there + + // add "global" only if nothing exported at all + if (!exportDestinations.find(d => Object.keys(d).length > 0)) { + // we can't set it to just [global], as abstractions may add exports later + exportDestinations.push(global) + } + + // export helpers + function __getExported (key) { return exportDestinations.find(a => a[key])?.[key] }; + function __asyncFunctionGuard (func) { + return func ? function __innerAsyncGuard${pluginConfigIdentifier}(...args) { return __asyncGuard(func(...args)) } : func + }; + + // inject the meta object + shareable 'global' to the end of each exported function + const __pluginMeta = { + ...__pluginHostMeta, + global: {} + }; + function __bindMeta (keyOrFunc) { + const func = typeof keyOrFunc === 'function' ? keyOrFunc : __getExported(keyOrFunc); + if (func) return function __inBindMeta${pluginConfigIdentifier} (...args) { return func(...args, __pluginMeta) }; + } + function __callWithMeta (keyOrFunc, ...args) { + const func = __bindMeta(keyOrFunc); + if (func) return func(...args); + } + + // we have processEventBatch, but not processEvent + if (!__getExported('processEvent') && __getExported('processEventBatch')) { + exports.processEvent = async function __processEvent${pluginConfigIdentifier} (event, meta) { + return (await (__getExported('processEventBatch'))([event], meta))?.[0] + } + } + + // export various functions + const __methods = { + setupPlugin: __asyncFunctionGuard(__bindMeta('setupPlugin')), + teardownPlugin: __asyncFunctionGuard(__bindMeta('teardownPlugin')), + exportEvents: __asyncFunctionGuard(__bindMeta('exportEvents')), + onEvent: __asyncFunctionGuard(__bindMeta('onEvent')), + onSnapshot: __asyncFunctionGuard(__bindMeta('onSnapshot')), + processEvent: __asyncFunctionGuard(__bindMeta('processEvent')), + }; + + const __tasks = { + schedule: {}, + job: {}, + }; + + for (const exportDestination of exportDestinations.reverse()) { + // gather the runEveryX commands and export in __tasks + for (const [name, value] of Object.entries(exportDestination)) { + if (name.startsWith("runEvery") && typeof value === 'function') { + __tasks.schedule[name] = { + name: name, + type: 'schedule', + exec: __bindMeta(value) + } + } + } + + // gather all jobs + if (typeof exportDestination['jobs'] === 'object') { + for (const [key, value] of Object.entries(exportDestination['jobs'])) { + __tasks.job[key] = { + name: key, + type: 'job', + exec: __bindMeta(value) + } + } + } + } + + ${responseVar} = { methods: __methods, tasks: __tasks, meta: __pluginMeta, } + }) + `)(asyncGuard) + + upgradeExportEvents(hub, pluginConfig, vm.run(responseVar)) + + // await vm.run(`${responseVar}.methods.setupPlugin?.()`) + + return { + vm, + methods: vm.run(`${responseVar}.methods`), + tasks: vm.run(`${responseVar}.tasks`), + } +} diff --git a/src/install_worker/worker.ts b/src/install_worker/worker.ts new file mode 100644 index 000000000..84268bc7b --- /dev/null +++ b/src/install_worker/worker.ts @@ -0,0 +1,82 @@ +import * as Sentry from '@sentry/node' + +import { initApp } from '../init' +import { Hub, PluginsServerConfig } from '../types' +import { processError } from '../utils/db/error' +import { createHub } from '../utils/db/hub' +import { status } from '../utils/status' +import { cloneObject, pluginConfigIdFromStack } from '../utils/utils' +import { teardownPlugins } from '../worker/plugins/teardown' +import { setupPlugin } from './plugins/setup' +import { workerTasks } from './tasks' + +export type PiscinaTaskWorker = ({ task, args }: { task: string; args: any }) => Promise + +export async function createWorker(config: PluginsServerConfig, threadId: number): Promise { + initApp(config) + + status.info('🧵🧵🧵', `Starting Install Piscina worker thread ${threadId}…`) + + const [hub, closeHub] = await createHub(config, threadId) + // await setupPlugins(hub) + + for (const signal of ['SIGINT', 'SIGTERM', 'SIGHUP']) { + process.on(signal, closeHub) + } + + process.on('unhandledRejection', (error: Error) => processUnhandledRejections(error, hub)) + + return createTaskRunner(hub) +} + +export const createTaskRunner = (hub: Hub): PiscinaTaskWorker => async ({ task, args }) => { + const timer = new Date() + let response + + Sentry.setContext('task', { task, args }) + + if (task in workerTasks) { + //TODO(nk): add setup task here, which sets server.plugin* + try { + status.info('Task and args: ', task, args) + await setupPlugin(hub, args.installConfig) // do custom setup before running the task + + // TODO(nk): replace with setup from plugin when testing time comes + // for now, maybe include in setupPlugin? + // don't have to be lazy for install + await hub.pluginConfigs.get(args.installConfig.plugin_config_id)?.vm?.resolveInternalVm + // must clone the object, as we may get from VM2 something like { ..., properties: Proxy {} } + response = cloneObject(await workerTasks[task](hub, args)) + } catch (e) { + status.info('🔔', e) + Sentry.captureException(e) + response = { error: e.message } + } finally { + await teardownPlugins(hub) + } + } else { + response = { error: `Worker task "${task}" not found in: ${Object.keys(workerTasks).join(', ')}` } + } + + hub.statsd?.timing(`piscina_task.${task}`, timer) + return response +} + +export function processUnhandledRejections(error: Error, server: Hub): void { + const pluginConfigId = pluginConfigIdFromStack(error.stack || '', server.pluginConfigSecretLookup) + const pluginConfig = pluginConfigId ? server.pluginConfigs.get(pluginConfigId) : null + + if (pluginConfig) { + void processError(server, pluginConfig, error) + return + } + + Sentry.captureException(error, { + extra: { + type: 'Unhandled promise error in worker', + }, + }) + + status.error('🤮', `Unhandled Promise Error!`) + status.error('🤮', error) +} diff --git a/src/main/ingestion-queues/install-plugin.ts b/src/main/ingestion-queues/install-plugin.ts new file mode 100644 index 000000000..bd0f0f834 --- /dev/null +++ b/src/main/ingestion-queues/install-plugin.ts @@ -0,0 +1,77 @@ +import Piscina from '@posthog/piscina' +import * as Sentry from '@sentry/node' + +import { Hub, InstallConfig } from '../../types' +import { timeoutGuard } from '../../utils/db/utils' +import { status } from '../../utils/status' + +export async function installPlugin( + server: Hub, + piscina: Piscina, + installConfig: InstallConfig, + checkAndPause?: () => void // pause incoming messages if we are slow in getting them out again +): Promise { + const eachEventStartTimer = new Date() + + function sendInstallTask(installConfig: InstallConfig) { + status.info('sending task to piscina', installConfig) + return piscina.runTask({ task: 'installPlugin', args: { installConfig } }) + } + + status.info('Running install plugin task!', installConfig) + checkAndPause?.() + + // TODO(nk): leaving things here for now, but next steps: + /* + 1. [x] Installation id as part of pluginConfig + 2. [x] setup new piscina pool to be used here for installPlugin task + 3. [x] Trickle down, fix everything on those "test_worker"s + 4. [x] Trickle back up, figure out how to send results back via Celery + 4.1: consider making the queue separate, if very different things + 5. [ ] Do the Django Tango + */ + // const response = await sendInstallTask(pluginConfig) + + const response = await runInstrumentedFunction({ + server, + installConfig, + func: sendInstallTask, + statsKey: 'install_queue.single_plugin', + timeoutMessage: 'Still installing plugin. Timeout warning after 30 sec!', + }) + + checkAndPause?.() + status.info('Response for task: ', response) + return response + // TODO(nk): what should be the return type? PluginConfig? or PluginError? or just success / {error}? +} + +// TODO(nk): No need to dupe this? +async function runInstrumentedFunction({ + server, + timeoutMessage, + installConfig, + func, + statsKey, +}: { + server: Hub + installConfig: InstallConfig + timeoutMessage: string + statsKey: string + func: (installConfig: InstallConfig) => Promise +}): Promise { + const timeout = timeoutGuard(timeoutMessage, { + installConfig: JSON.stringify(installConfig), + }) + const timer = new Date() + try { + return await func(installConfig) + } catch (error) { + status.info('🔔', error) + Sentry.captureException(error) + throw error + } finally { + server.statsd?.timing(statsKey, timer) + clearTimeout(timeout) + } +} diff --git a/src/main/ingestion-queues/queue.ts b/src/main/ingestion-queues/queue.ts index d4f869db7..2a6c0f31a 100644 --- a/src/main/ingestion-queues/queue.ts +++ b/src/main/ingestion-queues/queue.ts @@ -2,11 +2,12 @@ import Piscina from '@posthog/piscina' import { PluginEvent } from '@posthog/plugin-scaffold' import * as Sentry from '@sentry/node' -import { Hub, Queue, WorkerMethods } from '../../types' +import { Hub, PluginId, Queue, WorkerMethods } from '../../types' import { status } from '../../utils/status' import { sanitizeEvent, UUIDT } from '../../utils/utils' import { CeleryQueue } from './celery-queue' import { ingestEvent } from './ingest-event' +import { installPlugin } from './install-plugin' import { KafkaQueue } from './kafka-queue' export function pauseQueueIfWorkerFull( @@ -22,6 +23,7 @@ export function pauseQueueIfWorkerFull( export async function startQueue( server: Hub, piscina: Piscina, + installPiscinaPool: Piscina, workerMethods: Partial = {} ): Promise { const mergedWorkerMethods = { @@ -52,7 +54,8 @@ export async function startQueue( if (server.KAFKA_ENABLED) { return await startQueueKafka(server, piscina, mergedWorkerMethods) } else { - return startQueueRedis(server, piscina, mergedWorkerMethods) + // TODO(nk): start redis for install plugin step, always + return startQueueRedis(server, piscina, installPiscinaPool, mergedWorkerMethods) } } catch (error) { status.error('💥', 'Failed to start event queue:\n', error) @@ -60,7 +63,12 @@ export async function startQueue( } } -function startQueueRedis(server: Hub, piscina: Piscina | undefined, workerMethods: WorkerMethods): Queue { +function startQueueRedis( + server: Hub, + piscina: Piscina, + installPiscinaPool: Piscina, + workerMethods: WorkerMethods +): Queue { const celeryQueue = new CeleryQueue(server.db, server.PLUGINS_CELERY_QUEUE) celeryQueue.register( @@ -93,6 +101,28 @@ function startQueueRedis(server: Hub, piscina: Piscina | undefined, workerMethod } ) + status.info('registering install plugin task') + // same queue => consumed based on plugin events. Perhaps worth switching out to its own queue? + // Makes everything around this cleaner, too. + celeryQueue.register( + 'posthog.tasks.test.install_plugin', + async (plugin_id: PluginId, plugin_config_id: number, plugin_installation_id?: number) => { + try { + console.info('Received args: ', plugin_id, plugin_config_id) + const checkAndPause = () => + pauseQueueIfWorkerFull(() => celeryQueue.pause(), server, installPiscinaPool) + return await installPlugin( + server, + installPiscinaPool, + { plugin_id, plugin_config_id, plugin_installation_id }, + checkAndPause + ) + } catch (e) { + Sentry.captureException(e) + } + } + ) + // run in the background void celeryQueue.start() diff --git a/src/main/pluginsServer.ts b/src/main/pluginsServer.ts index 0bfc65af8..3a0db028d 100644 --- a/src/main/pluginsServer.ts +++ b/src/main/pluginsServer.ts @@ -22,6 +22,7 @@ const { version } = require('../../package.json') export type ServerInstance = { hub: Hub piscina: Piscina + installPiscinaPool: Piscina queue: Queue mmdb?: ReaderModel mmdbUpdateJob?: schedule.Job @@ -30,7 +31,8 @@ export type ServerInstance = { export async function startPluginsServer( config: Partial, - makePiscina: (config: PluginsServerConfig) => Piscina + makePiscina: (config: PluginsServerConfig) => Piscina, + makeInstallPiscina: (config: PluginsServerConfig) => Piscina ): Promise { const serverConfig: PluginsServerConfig = { ...defaultConfig, @@ -46,6 +48,7 @@ export async function startPluginsServer( let piscinaStatsJob: schedule.Job | undefined let internalMetricsStatsJob: schedule.Job | undefined let piscina: Piscina | undefined + let installPiscinaPool: Piscina | undefined let queue: Queue | undefined let jobQueueConsumer: JobQueueConsumerControl | undefined let closeHub: () => Promise | undefined @@ -91,6 +94,9 @@ export async function startPluginsServer( if (piscina) { await stopPiscina(piscina) } + if (installPiscinaPool) { + await installPiscinaPool.destroy() + } await closeHub?.() status.info('👋', 'Over and out!') // wait an extra second for any misc async task to finish @@ -130,7 +136,10 @@ export async function startPluginsServer( scheduleControl = await startSchedule(hub, piscina) jobQueueConsumer = await startJobQueueConsumer(hub, piscina) - queue = await startQueue(hub, piscina) + installPiscinaPool = makeInstallPiscina(serverConfig) + + queue = await startQueue(hub, piscina, installPiscinaPool) + // TODO(nk): Separate queue for install tasks? piscina.on('drain', () => { void queue?.resume() void jobQueueConsumer?.resume() @@ -216,6 +225,7 @@ export async function startPluginsServer( } serverInstance.piscina = piscina + serverInstance.installPiscinaPool = installPiscinaPool serverInstance.queue = queue serverInstance.stop = closeJobs diff --git a/src/types.ts b/src/types.ts index 6c1377fb2..02a536641 100644 --- a/src/types.ts +++ b/src/types.ts @@ -211,6 +211,7 @@ export interface PluginConfig { team_id: TeamId plugin?: Plugin plugin_id: PluginId + plugin_installation_id?: number enabled: boolean order: number config: Record @@ -221,6 +222,12 @@ export interface PluginConfig { updated_at: string } +export type InstallConfig = { + plugin_id: PluginId + plugin_config_id: PluginConfigId + plugin_installation_id?: number +} + export interface PluginJsonConfig { name?: string description?: string diff --git a/src/utils/db/db.ts b/src/utils/db/db.ts index 0ac03844a..66617eff1 100644 --- a/src/utils/db/db.ts +++ b/src/utils/db/db.ts @@ -22,6 +22,8 @@ import { EventDefinitionType, Person, PersonDistinctId, + Plugin, + PluginAttachmentDB, PluginConfig, PluginLogEntry, PluginLogEntrySource, @@ -788,4 +790,39 @@ export class DB { return rows.length > 0 ? rows[0].team_id : null } + + // Plugin & PluginConfig + public async fetchPlugin(id: Plugin['id']): Promise { + const plugins: Plugin[] = ( + await this.postgresQuery('SELECT * FROM posthog_plugin WHERE id = $1', [id], 'fetchPlugin') + ).rows + + if (!plugins.length) { + return null + } + return plugins[0] + } + + public async fetchPluginConfig(id: PluginConfig['id']): Promise { + const pluginconfigs: PluginConfig[] = ( + await this.postgresQuery('SELECT * FROM posthog_pluginconfig WHERE id = $1', [id], 'fetchPluginConfig') + ).rows + + if (!pluginconfigs.length) { + return null + } + return pluginconfigs[0] + } + + public async fetchPluginAttachments(id: PluginConfig['id']): Promise { + const pluginattachments: PluginAttachmentDB[] = ( + await this.postgresQuery( + 'SELECT * FROM posthog_pluginattachment WHERE plugin_config_id = $1', + [id], + 'fetchPluginAttachment' + ) + ).rows + + return pluginattachments + } } diff --git a/src/utils/status.ts b/src/utils/status.ts index 4db19cef2..1c08f6b9a 100644 --- a/src/utils/status.ts +++ b/src/utils/status.ts @@ -16,6 +16,7 @@ export class Status implements StatusBlueprint { } determinePrefix(): string { + console.info('Writing status for thread: ', threadId) return `[${this.prefixOverride ?? (threadId ? threadId.toString().padStart(4, '_') : 'MAIN')}] ${ new Date().toTimeString().split(' ')[0] }` diff --git a/src/worker/vm/lazy.ts b/src/worker/vm/lazy.ts index 581a7f45f..0c575f336 100644 --- a/src/worker/vm/lazy.ts +++ b/src/worker/vm/lazy.ts @@ -72,6 +72,10 @@ export class LazyPluginVM { return (await this.resolveInternalVm)?.methods.processEvent || null } + async getSetupPlugin(): Promise { + return (await this.resolveInternalVm)?.methods.setupPlugin || null + } + async getTeardownPlugin(): Promise { return (await this.resolveInternalVm)?.methods.teardownPlugin || null } @@ -89,6 +93,7 @@ export class LazyPluginVM { pluginConfig: PluginConfig, vm: PluginConfigVMResponse ): Promise { + console.log('Inferring capabilities!!') if (!pluginConfig.plugin) { throw new Error(`'PluginConfig missing plugin: ${pluginConfig}`) }