Skip to content
This repository was archived by the owner on Nov 4, 2021. It is now read-only.

Install Plugin Step: Plugin Server side #456

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"benchmark:vm:worker": "node --expose-gc node_modules/.bin/jest --runInBand benchmarks/vm/worker.benchmark.ts",
"start": "yarn start:dev",
"start:dist": "node dist/index.js --base-dir ../posthog",
"start:dev": "ts-node-dev --exit-child src/index.ts --base-dir ../posthog",
"start:dev": "ts-node-dev --exit-child --debug src/index.ts --base-dir ../posthog",
"start:dev:ee": "KAFKA_ENABLED=true KAFKA_HOSTS=localhost:9092 yarn start:dev",
"build": "yarn clean && yarn compile",
"clean": "rimraf dist/*",
Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { defaultConfig, formatConfigHelp } from './config/config'
import { initApp } from './init'
import { makePiscina as makeInstallPiscina } from './install_worker/piscina'
import { GraphileQueue } from './main/job-queues/concurrent/graphile-queue'
import { startPluginsServer } from './main/pluginsServer'
import { Status } from './utils/status'
Expand Down Expand Up @@ -71,6 +72,6 @@ switch (alternativeMode) {

default:
initApp(defaultConfig)
void startPluginsServer(defaultConfig, makePiscina) // void the returned promise
void startPluginsServer(defaultConfig, makePiscina, makeInstallPiscina) // void the returned promise
break
}
40 changes: 40 additions & 0 deletions src/install_worker/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { TaskQueue } from '@posthog/piscina/src/common'

import { PluginsServerConfig } from '../types'
import { status } from '../utils/status'

// Copy From: node_modules/piscina/src/index.ts -- copied because it's not exported
export interface PiscinaOptions {
filename?: string | null
minThreads?: number
maxThreads?: number
idleTimeout?: number
maxQueue?: number | 'auto'
concurrentTasksPerWorker?: number
useAtomics?: boolean
resourceLimits?: any
argv?: string[]
execArgv?: string[]
env?: any
workerData?: any
taskQueue?: TaskQueue
niceIncrement?: number
trackUnmanagedFds?: boolean
}

export function createConfig(serverConfig: PluginsServerConfig, filename: string): PiscinaOptions {
const config: PiscinaOptions = {
filename,
workerData: { serverConfig },
resourceLimits: {
stackSizeMb: 10,
},
}

status.info('Creating config for 2nd piscina pool', filename)

config.minThreads = 1
config.maxThreads = 1

return config
}
4 changes: 4 additions & 0 deletions src/install_worker/piscina.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import Piscina from '@posthog/piscina'

import { PluginsServerConfig } from '../types'
export const makePiscina: (config: PluginsServerConfig) => Piscina
27 changes: 27 additions & 0 deletions src/install_worker/piscina.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
const Sentry = require('@sentry/node')
const { isMainThread, threadId } = require('worker_threads')

if (isMainThread) {
const Piscina = require('@posthog/piscina')
const { createConfig } = require('./config')
module.exports = {
makePiscina: (serverConfig) => {
const piscina = new Piscina(createConfig(serverConfig, __filename))
piscina.on('error', (error) => {
Sentry.captureException(error)
console.error('⚠️', 'Piscina worker thread error:\n', error)
})
console.log('🧵🧵 - building 2nd piscina pool')
return piscina
},
}
} else {
if (process.env.NODE_ENV === 'test') {
require('ts-node').register()
}

console.log('🧵🧵 - building worker thread')
const { createWorker } = require('./worker')
const { workerData } = require('@posthog/piscina')
module.exports = createWorker(workerData.serverConfig, threadId)
}
102 changes: 102 additions & 0 deletions src/install_worker/plugins/setup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { PluginAttachment } from '@posthog/plugin-scaffold'

import { Hub, Plugin, PluginConfig, PluginConfigId, PluginId, PluginTaskType, TeamId } from '../../types'
import { status } from '../../utils/status'
import { loadPlugin } from '../../worker/plugins/loadPlugin'
import { loadSchedule } from '../../worker/plugins/setup'
import { teardownPlugins } from '../../worker/plugins/teardown'
import { LazyPluginVM } from '../../worker/vm/lazy'

export async function setupPlugin(hub: Hub, pluginConfig: PluginConfig): Promise<void> {
const { plugins, pluginConfigs, pluginConfigsPerTeam } = await loadPluginFromDB(hub, pluginConfig)
const pluginVMLoadPromises: Array<Promise<any>> = []
for (const [id, pluginConfig] of pluginConfigs) {
const plugin = plugins.get(pluginConfig.plugin_id)
const prevConfig = hub.pluginConfigs.get(id)
const prevPlugin = prevConfig ? hub.plugins.get(pluginConfig.plugin_id) : null

status.info('Setting up plugin: ', pluginConfig)
if (
prevConfig &&
pluginConfig.updated_at === prevConfig.updated_at &&
plugin?.updated_at == prevPlugin?.updated_at
) {
pluginConfig.vm = prevConfig.vm
} else {
pluginConfig.vm = new LazyPluginVM()
pluginVMLoadPromises.push(loadPlugin(hub, pluginConfig))

if (prevConfig) {
void teardownPlugins(hub, prevConfig)
}
}
}

await Promise.all(pluginVMLoadPromises)

hub.plugins = plugins
hub.pluginConfigs = pluginConfigs
hub.pluginConfigsPerTeam = pluginConfigsPerTeam

for (const teamId of hub.pluginConfigsPerTeam.keys()) {
hub.pluginConfigsPerTeam.get(teamId)?.sort((a, b) => a.order - b.order)
}

void loadSchedule(hub)
}

async function loadPluginFromDB(
server: Hub,
pluginConfig: PluginConfig
): Promise<Pick<Hub, 'plugins' | 'pluginConfigs' | 'pluginConfigsPerTeam'>> {
const plugin = await server.db.fetchPlugin(pluginConfig.plugin_id)
const plugins = new Map<PluginId, Plugin>()

if (plugin) {
plugins.set(plugin.id, plugin)
}

const pluginAttachmentRows = await server.db.fetchPluginAttachments(pluginConfig.id)
const attachmentsPerConfig = new Map<TeamId, Record<string, PluginAttachment>>()
for (const row of pluginAttachmentRows) {
let attachments = attachmentsPerConfig.get(row.plugin_config_id!)
if (!attachments) {
attachments = {}
attachmentsPerConfig.set(row.plugin_config_id!, attachments)
}
attachments[row.key] = {
content_type: row.content_type,
file_name: row.file_name,
contents: row.contents,
}
}

const row = await server.db.fetchPluginConfig(pluginConfig.id)

const pluginConfigs = new Map<PluginConfigId, PluginConfig>()
const pluginConfigsPerTeam = new Map<TeamId, PluginConfig[]>()

if (row) {
const plugin = plugins.get(row.plugin_id)
const pluginConfig: PluginConfig = {
...row,
plugin: plugin,
attachments: attachmentsPerConfig.get(row.id) || {},
vm: null,
}
pluginConfigs.set(row.id, pluginConfig)

if (!row.team_id) {
console.error(`🔴 PluginConfig(id=${row.id}) without team_id!`)
}

let teamConfigs = pluginConfigsPerTeam.get(row.team_id)
if (!teamConfigs) {
teamConfigs = []
pluginConfigsPerTeam.set(row.team_id, teamConfigs)
}
teamConfigs.push(pluginConfig)
}

return { plugins, pluginConfigs, pluginConfigsPerTeam }
}
30 changes: 30 additions & 0 deletions src/install_worker/tasks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import {
Action,
EnqueuedJob,
Hub,
InstallConfig,
PluginConfig,
PluginConfigId,
PluginId,
PluginTaskType,
Team,
} from '../types'
import { status } from '../utils/status'
import { workerTasks as originalWorkerTasks } from '../worker/tasks'

type TaskRunner = (hub: Hub, args: any) => Promise<any> | any

export const workerTasks: Record<string, TaskRunner> = {
...originalWorkerTasks,
installPlugin: (hub, args: { installConfig: InstallConfig }) => {
status.info('Piscina worker finished task')
return {}
},
setupPlugin: (hub, args: { installConfig: InstallConfig }) => {
return {}
},
reloadPlugins: (hub) => {
return {} // TODO(nk): figure how this should be setup
},
// override other tasks here
}
Loading