Skip to content

Several features for OpenAPI toolkit and OpenAI Assistants #3989

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 216 additions & 27 deletions packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { AnalyticHandler } from '../../../src/handler'
import { Moderation, checkInputs, streamResponse } from '../../moderation/Moderation'
import { formatResponse } from '../../outputparsers/OutputParserHelpers'
import { addSingleFileToStorage } from '../../../src/storageUtils'
import { DynamicStructuredTool } from '../../tools/OpenAPIToolkit/core'

const lenticularBracketRegex = /【[^】]*】/g
const imageRegex = /<img[^>]*\/>/g
Expand Down Expand Up @@ -504,7 +505,6 @@ class OpenAIAssistant_Agents implements INode {
toolCallId: item.id
})
})

const submitToolOutputs = []
for (let i = 0; i < actions.length; i += 1) {
const tool = tools.find((tool: any) => tool.name === actions[i].tool)
Expand Down Expand Up @@ -539,30 +539,23 @@ class OpenAIAssistant_Agents implements INode {
}

try {
const stream = openai.beta.threads.runs.submitToolOutputsStream(threadId, runThreadId, {
tool_outputs: submitToolOutputs
await handleToolSubmission({
openai,
threadId,
runThreadId,
submitToolOutputs,
tools,
analyticHandlers,
parentIds,
llmIds,
sseStreamer,
chatId,
options,
input,
usedTools,
text,
isStreamingStarted
})

for await (const event of stream) {
if (event.event === 'thread.message.delta') {
const chunk = event.data.delta.content?.[0]
if (chunk && 'text' in chunk && chunk.text?.value) {
text += chunk.text.value
if (!isStreamingStarted) {
isStreamingStarted = true
if (sseStreamer) {
sseStreamer.streamStartEvent(chatId, chunk.text.value)
}
}
if (sseStreamer) {
sseStreamer.streamTokenEvent(chatId, chunk.text.value)
}
}
}
}
if (sseStreamer) {
sseStreamer.streamUsedToolsEvent(chatId, usedTools)
}
} catch (error) {
console.error('Error submitting tool outputs:', error)
await openai.beta.threads.runs.cancel(threadId, runThreadId)
Expand Down Expand Up @@ -634,7 +627,6 @@ class OpenAIAssistant_Agents implements INode {
toolCallId: item.id
})
})

const submitToolOutputs = []
for (let i = 0; i < actions.length; i += 1) {
const tool = tools.find((tool: any) => tool.name === actions[i].tool)
Expand Down Expand Up @@ -895,15 +887,212 @@ const downloadFile = async (openAIApiKey: string, fileObj: any, fileName: string
}
}

interface ToolSubmissionParams {
openai: OpenAI
threadId: string
runThreadId: string
submitToolOutputs: any[]
tools: any[]
analyticHandlers: AnalyticHandler
parentIds: ICommonObject
llmIds: ICommonObject
sseStreamer: IServerSideEventStreamer
chatId: string
options: ICommonObject
input: string
usedTools: IUsedTool[]
text: string
isStreamingStarted: boolean
}

interface ToolSubmissionResult {
text: string
isStreamingStarted: boolean
}

async function handleToolSubmission(params: ToolSubmissionParams): Promise<ToolSubmissionResult> {
const {
openai,
threadId,
runThreadId,
submitToolOutputs,
tools,
analyticHandlers,
parentIds,
llmIds,
sseStreamer,
chatId,
options,
input,
usedTools
} = params

let updatedText = params.text
let updatedIsStreamingStarted = params.isStreamingStarted

const stream = openai.beta.threads.runs.submitToolOutputsStream(threadId, runThreadId, {
tool_outputs: submitToolOutputs
})

try {
for await (const event of stream) {
if (event.event === 'thread.message.delta') {
const chunk = event.data.delta.content?.[0]
if (chunk && 'text' in chunk && chunk.text?.value) {
updatedText += chunk.text.value
if (!updatedIsStreamingStarted) {
updatedIsStreamingStarted = true
if (sseStreamer) {
sseStreamer.streamStartEvent(chatId, chunk.text.value)
}
}
if (sseStreamer) {
sseStreamer.streamTokenEvent(chatId, chunk.text.value)
}
}
} else if (event.event === 'thread.run.requires_action') {
if (event.data.required_action?.submit_tool_outputs.tool_calls) {
const actions: ICommonObject[] = []

event.data.required_action.submit_tool_outputs.tool_calls.forEach((item) => {
const functionCall = item.function
let args = {}
try {
args = JSON.parse(functionCall.arguments)
} catch (e) {
console.error('Error parsing arguments, default to empty object')
}
actions.push({
tool: functionCall.name,
toolInput: args,
toolCallId: item.id
})
})

const nestedToolOutputs = []
for (let i = 0; i < actions.length; i += 1) {
const tool = tools.find((tool: any) => tool.name === actions[i].tool)
if (!tool) continue

const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds)

try {
const toolOutput = await tool.call(actions[i].toolInput, undefined, undefined, {
sessionId: threadId,
chatId: options.chatId,
input
})
await analyticHandlers.onToolEnd(toolIds, toolOutput)
nestedToolOutputs.push({
tool_call_id: actions[i].toolCallId,
output: toolOutput
})
usedTools.push({
tool: tool.name,
toolInput: actions[i].toolInput,
toolOutput
})
} catch (e) {
await analyticHandlers.onToolEnd(toolIds, e)
console.error('Error executing tool', e)
throw new Error(`Error executing tool. Tool: ${tool.name}. Thread ID: ${threadId}. Run ID: ${runThreadId}`)
}
}

// Recursively handle nested tool submissions
const result = await handleToolSubmission({
openai,
threadId,
runThreadId,
submitToolOutputs: nestedToolOutputs,
tools,
analyticHandlers,
parentIds,
llmIds,
sseStreamer,
chatId,
options,
input,
usedTools,
text: updatedText,
isStreamingStarted: updatedIsStreamingStarted
})
updatedText = result.text
updatedIsStreamingStarted = result.isStreamingStarted
}
}
}

if (sseStreamer) {
sseStreamer.streamUsedToolsEvent(chatId, usedTools)
}

return {
text: updatedText,
isStreamingStarted: updatedIsStreamingStarted
}
} catch (error) {
console.error('Error submitting tool outputs:', error)
await openai.beta.threads.runs.cancel(threadId, runThreadId)

const errMsg = `Error submitting tool outputs. Thread ID: ${threadId}. Run ID: ${runThreadId}`

await analyticHandlers.onLLMError(llmIds, errMsg)
await analyticHandlers.onChainError(parentIds, errMsg, true)

throw new Error(errMsg)
}
}

interface JSONSchema {
type?: string
properties?: Record<string, JSONSchema>
additionalProperties?: boolean
required?: string[]
[key: string]: any
}

const formatToOpenAIAssistantTool = (tool: any): OpenAI.Beta.FunctionTool => {
return {
const parameters = zodToJsonSchema(tool.schema) as JSONSchema

// For strict tools, we need to:
// 1. Set additionalProperties to false
// 2. Make all parameters required
// 3. Set the strict flag
if (tool instanceof DynamicStructuredTool && tool.isStrict()) {
// Get all property names from the schema
const properties = parameters.properties || {}
const allPropertyNames = Object.keys(properties)

parameters.additionalProperties = false
parameters.required = allPropertyNames

// Handle nested objects
for (const [_, prop] of Object.entries(properties)) {
if (prop.type === 'object') {
prop.additionalProperties = false
if (prop.properties) {
prop.required = Object.keys(prop.properties)
}
}
}
}

const functionTool: OpenAI.Beta.FunctionTool = {
type: 'function',
function: {
name: tool.name,
description: tool.description,
parameters: zodToJsonSchema(tool.schema)
parameters
}
}

// Add strict property if the tool is marked as strict
if (tool instanceof DynamicStructuredTool && tool.isStrict()) {
;(functionTool.function as any).strict = true
}

return functionTool
}

module.exports = { nodeClass: OpenAIAssistant_Agents }
Loading