Skip to content

Feature/Code Interpreter #3183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6f2f182
Base changes for ServerSide Events (instead of socket.io)
vinodkiran Aug 25, 2024
183e951
lint fixes
vinodkiran Aug 25, 2024
f88694e
adding of interface and separate methods for streaming events
vinodkiran Aug 26, 2024
7ec4fa1
lint
vinodkiran Aug 26, 2024
c5991dd
Merge branch 'main' into feature/sse
vinodkiran Aug 26, 2024
d3c5438
first draft, handles both internal and external prediction end points.
vinodkiran Sep 1, 2024
f0c259f
lint fixes
vinodkiran Sep 1, 2024
ddc10b3
Merge branch 'main' into feature/sse
vinodkiran Sep 1, 2024
57d0a75
additional internal end point for streaming and associated changes
vinodkiran Sep 2, 2024
6487381
return streamresponse as true to build agent flow
HenryHengZJ Sep 3, 2024
768ec51
1) JSON formatting for internal events
vinodkiran Sep 3, 2024
1174c0d
1) convert internal event to metadata to maintain consistency with ex…
vinodkiran Sep 3, 2024
d24c9a4
fix action and metadata streaming
HenryHengZJ Sep 3, 2024
8e00eb2
fix for error when agent flow is aborted
vinodkiran Sep 4, 2024
77163b9
Merge branch 'main' into FEATURE/sse
vinodkiran Sep 4, 2024
1c936b9
prevent subflows from streaming and other code cleanup
vinodkiran Sep 4, 2024
3a66e2b
prevent streaming from enclosed tools
vinodkiran Sep 4, 2024
0a520d8
add fix for preventing chaintool streaming
HenryHengZJ Sep 4, 2024
6367d48
update lock file
HenryHengZJ Sep 5, 2024
88ec25e
add open when hidden to sse
HenryHengZJ Sep 7, 2024
f5a8939
Streaming errors
vinodkiran Sep 11, 2024
285ddaf
Streaming errors
vinodkiran Sep 11, 2024
1a00e94
add fix for showing error message
HenryHengZJ Sep 11, 2024
8a53556
Merge branch 'feature/sse' into feature/E2B
HenryHengZJ Sep 12, 2024
530cbb2
add code interpreter
HenryHengZJ Sep 13, 2024
a2cf396
add artifacts to view message dialog
HenryHengZJ Sep 15, 2024
348f762
Merge branch 'main' into feature/E2B
HenryHengZJ Sep 17, 2024
8dceb43
Update pnpm-lock.yaml
HenryHengZJ Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ class OpenAIAssistant_Agents implements INode {

const usedTools: IUsedTool[] = []
const fileAnnotations = []
const artifacts = []

const assistant = await appDataSource.getRepository(databaseEntities['Assistant']).findOneBy({
id: selectedAssistantId
Expand Down Expand Up @@ -439,21 +440,23 @@ class OpenAIAssistant_Agents implements INode {
const fileId = chunk.image_file.file_id
const fileObj = await openai.files.retrieve(fileId)

const buffer = await downloadImg(openai, fileId, `${fileObj.filename}.png`, options.chatflowid, options.chatId)
const base64String = Buffer.from(buffer).toString('base64')

// TODO: Use a file path and retrieve image on the fly. Storing as base64 to localStorage and database will easily hit limits
const imgHTML = `<img src="data:image/png;base64,${base64String}" width="100%" height="max-content" alt="${fileObj.filename}" /><br/>`
text += imgHTML
const filePath = await downloadImg(
openai,
fileId,
`${fileObj.filename}.png`,
options.chatflowid,
options.chatId
)
artifacts.push({ type: 'png', data: filePath })

if (!isStreamingStarted) {
isStreamingStarted = true
if (sseStreamer) {
sseStreamer.streamStartEvent(chatId, imgHTML)
sseStreamer.streamStartEvent(chatId, ' ')
}
}
if (sseStreamer) {
sseStreamer.streamTokenEvent(chatId, imgHTML)
sseStreamer.streamArtifactsEvent(chatId, artifacts)
}
}
}
Expand Down Expand Up @@ -565,6 +568,7 @@ class OpenAIAssistant_Agents implements INode {
return {
text,
usedTools,
artifacts,
fileAnnotations,
assistant: { assistantId: openAIAssistantId, threadId, runId: runThreadId, messages: messageData }
}
Expand Down Expand Up @@ -769,12 +773,8 @@ class OpenAIAssistant_Agents implements INode {
const fileId = content.image_file.file_id
const fileObj = await openai.files.retrieve(fileId)

const buffer = await downloadImg(openai, fileId, `${fileObj.filename}.png`, options.chatflowid, options.chatId)
const base64String = Buffer.from(buffer).toString('base64')

// TODO: Use a file path and retrieve image on the fly. Storing as base64 to localStorage and database will easily hit limits
const imgHTML = `<img src="data:image/png;base64,${base64String}" width="100%" height="max-content" alt="${fileObj.filename}" /><br/>`
returnVal += imgHTML
const filePath = await downloadImg(openai, fileId, `${fileObj.filename}.png`, options.chatflowid, options.chatId)
artifacts.push({ type: 'png', data: filePath })
}
}

Expand All @@ -787,6 +787,7 @@ class OpenAIAssistant_Agents implements INode {
return {
text: returnVal,
usedTools,
artifacts,
fileAnnotations,
assistant: { assistantId: openAIAssistantId, threadId, runId: runThreadId, messages: messageData }
}
Expand All @@ -807,9 +808,9 @@ const downloadImg = async (openai: OpenAI, fileId: string, fileName: string, ...
const image_data_buffer = Buffer.from(image_data)
const mime = 'image/png'

await addSingleFileToStorage(mime, image_data_buffer, fileName, ...paths)
const res = await addSingleFileToStorage(mime, image_data_buffer, fileName, ...paths)

return image_data_buffer
return res
}

const downloadFile = async (openAIApiKey: string, fileObj: any, fileName: string, ...paths: string[]) => {
Expand Down
15 changes: 14 additions & 1 deletion packages/components/nodes/agents/ToolAgent/ToolAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class ToolAgent_Agents implements INode {
let res: ChainValues = {}
let sourceDocuments: ICommonObject[] = []
let usedTools: IUsedTool[] = []
let artifacts = []

if (shouldStreamResponse) {
const handler = new CustomChainHandler(sseStreamer, chatId)
Expand All @@ -150,6 +151,12 @@ class ToolAgent_Agents implements INode {
}
usedTools = res.usedTools
}
if (res.artifacts) {
if (sseStreamer) {
sseStreamer.streamArtifactsEvent(chatId, flatten(res.artifacts))
}
artifacts = res.artifacts
}
// If the tool is set to returnDirect, stream the output to the client
if (res.usedTools && res.usedTools.length) {
let inputTools = nodeData.inputs?.tools
Expand All @@ -169,6 +176,9 @@ class ToolAgent_Agents implements INode {
if (res.usedTools) {
usedTools = res.usedTools
}
if (res.artifacts) {
artifacts = res.artifacts
}
}

let output = res?.output
Expand Down Expand Up @@ -203,14 +213,17 @@ class ToolAgent_Agents implements INode {

let finalRes = output

if (sourceDocuments.length || usedTools.length) {
if (sourceDocuments.length || usedTools.length || artifacts.length) {
const finalRes: ICommonObject = { text: output }
if (sourceDocuments.length) {
finalRes.sourceDocuments = flatten(sourceDocuments)
}
if (usedTools.length) {
finalRes.usedTools = usedTools
}
if (artifacts.length) {
finalRes.artifacts = artifacts
}
return finalRes
}

Expand Down
39 changes: 34 additions & 5 deletions packages/components/nodes/sequentialagents/Agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
IDocument,
IStateWithMessages
} from '../../../src/Interface'
import { ToolCallingAgentOutputParser, AgentExecutor, SOURCE_DOCUMENTS_PREFIX } from '../../../src/agents'
import { ToolCallingAgentOutputParser, AgentExecutor, SOURCE_DOCUMENTS_PREFIX, ARTIFACTS_PREFIX } from '../../../src/agents'
import { getInputVariables, getVars, handleEscapeCharacters, prepareSandboxVars } from '../../../src/utils'
import {
customGet,
Expand All @@ -35,7 +35,6 @@ import {
} from '../commonUtils'
import { END, StateGraph } from '@langchain/langgraph'
import { StructuredTool } from '@langchain/core/tools'
import { DynamicStructuredTool } from '../../tools/CustomTool/core'

const defaultApprovalPrompt = `You are about to execute tool: {tools}. Ask if user want to proceed`
const examplePrompt = 'You are a research assistant who can search for up-to-date info using search engine.'
Expand Down Expand Up @@ -739,14 +738,22 @@ async function agentNode(

// If the last message is a tool message and is an interrupted message, format output into standard agent output
if (lastMessage._getType() === 'tool' && lastMessage.additional_kwargs?.nodeId === nodeData.id) {
let formattedAgentResult: { output?: string; usedTools?: IUsedTool[]; sourceDocuments?: IDocument[] } = {}
let formattedAgentResult: {
output?: string
usedTools?: IUsedTool[]
sourceDocuments?: IDocument[]
artifacts?: ICommonObject[]
} = {}
formattedAgentResult.output = result.content
if (lastMessage.additional_kwargs?.usedTools) {
formattedAgentResult.usedTools = lastMessage.additional_kwargs.usedTools as IUsedTool[]
}
if (lastMessage.additional_kwargs?.sourceDocuments) {
formattedAgentResult.sourceDocuments = lastMessage.additional_kwargs.sourceDocuments as IDocument[]
}
if (lastMessage.additional_kwargs?.artifacts) {
formattedAgentResult.artifacts = lastMessage.additional_kwargs.artifacts as ICommonObject[]
}
result = formattedAgentResult
} else {
result.name = name
Expand All @@ -765,12 +772,18 @@ async function agentNode(
if (result.sourceDocuments) {
additional_kwargs.sourceDocuments = result.sourceDocuments
}
if (result.artifacts) {
additional_kwargs.artifacts = result.artifacts
}
if (result.output) {
result.content = result.output
delete result.output
}

const outputContent = typeof result === 'string' ? result : result.content || result.output
let outputContent = typeof result === 'string' ? result : result.content || result.output

// remove invalid markdown image pattern: ![<some-string>](<some-string>)
outputContent = typeof outputContent === 'string' ? outputContent.replace(/!\[.*?\]\(.*?\)/g, '') : outputContent

if (nodeData.inputs?.updateStateMemoryUI || nodeData.inputs?.updateStateMemoryCode) {
let formattedOutput = {
Expand Down Expand Up @@ -931,6 +944,9 @@ class ToolNode<T extends BaseMessage[] | MessagesState> extends RunnableCallable
// Extract all properties except messages for IStateWithMessages
const { messages: _, ...inputWithoutMessages } = Array.isArray(input) ? { messages: input } : input
const ChannelsWithoutMessages = {
chatId: this.options.chatId,
sessionId: this.options.sessionId,
input: this.inputQuery,
state: inputWithoutMessages
}

Expand All @@ -940,12 +956,14 @@ class ToolNode<T extends BaseMessage[] | MessagesState> extends RunnableCallable
if (tool === undefined) {
throw new Error(`Tool ${call.name} not found.`)
}
if (tool && tool instanceof DynamicStructuredTool) {
if (tool && (tool as any).setFlowObject) {
// @ts-ignore
tool.setFlowObject(ChannelsWithoutMessages)
}
let output = await tool.invoke(call.args, config)
let sourceDocuments: Document[] = []
let artifacts = []

if (output?.includes(SOURCE_DOCUMENTS_PREFIX)) {
const outputArray = output.split(SOURCE_DOCUMENTS_PREFIX)
output = outputArray[0]
Expand All @@ -956,12 +974,23 @@ class ToolNode<T extends BaseMessage[] | MessagesState> extends RunnableCallable
console.error('Error parsing source documents from tool')
}
}
if (output?.includes(ARTIFACTS_PREFIX)) {
const outputArray = output.split(ARTIFACTS_PREFIX)
output = outputArray[0]
try {
artifacts = JSON.parse(outputArray[1])
} catch (e) {
console.error('Error parsing artifacts from tool')
}
}

return new ToolMessage({
name: tool.name,
content: typeof output === 'string' ? output : JSON.stringify(output),
tool_call_id: call.id!,
additional_kwargs: {
sourceDocuments,
artifacts,
args: call.args,
usedTools: [
{
Expand Down
23 changes: 19 additions & 4 deletions packages/components/nodes/sequentialagents/ToolNode/ToolNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ import {
import { AIMessage, AIMessageChunk, BaseMessage, ToolMessage } from '@langchain/core/messages'
import { StructuredTool } from '@langchain/core/tools'
import { RunnableConfig } from '@langchain/core/runnables'
import { SOURCE_DOCUMENTS_PREFIX } from '../../../src/agents'
import { ARTIFACTS_PREFIX, SOURCE_DOCUMENTS_PREFIX } from '../../../src/agents'
import { Document } from '@langchain/core/documents'
import { DataSource } from 'typeorm'
import { MessagesState, RunnableCallable, customGet, getVM } from '../commonUtils'
import { getVars, prepareSandboxVars } from '../../../src/utils'
import { ChatPromptTemplate } from '@langchain/core/prompts'
import { DynamicStructuredTool } from '../../tools/CustomTool/core'

const defaultApprovalPrompt = `You are about to execute tool: {tools}. Ask if user want to proceed`

Expand Down Expand Up @@ -408,6 +407,9 @@ class ToolNode<T extends IStateWithMessages | BaseMessage[] | MessagesState> ext
// Extract all properties except messages for IStateWithMessages
const { messages: _, ...inputWithoutMessages } = Array.isArray(input) ? { messages: input } : input
const ChannelsWithoutMessages = {
chatId: this.options.chatId,
sessionId: this.options.sessionId,
input: this.inputQuery,
state: inputWithoutMessages
}

Expand All @@ -417,12 +419,13 @@ class ToolNode<T extends IStateWithMessages | BaseMessage[] | MessagesState> ext
if (tool === undefined) {
throw new Error(`Tool ${call.name} not found.`)
}
if (tool && tool instanceof DynamicStructuredTool) {
if (tool && (tool as any).setFlowObject) {
// @ts-ignore
tool.setFlowObject(ChannelsWithoutMessages)
}
let output = await tool.invoke(call.args, config)
let sourceDocuments: Document[] = []
let artifacts = []
if (output?.includes(SOURCE_DOCUMENTS_PREFIX)) {
const outputArray = output.split(SOURCE_DOCUMENTS_PREFIX)
output = outputArray[0]
Expand All @@ -433,12 +436,23 @@ class ToolNode<T extends IStateWithMessages | BaseMessage[] | MessagesState> ext
console.error('Error parsing source documents from tool')
}
}
if (output?.includes(ARTIFACTS_PREFIX)) {
const outputArray = output.split(ARTIFACTS_PREFIX)
output = outputArray[0]
try {
artifacts = JSON.parse(outputArray[1])
} catch (e) {
console.error('Error parsing artifacts from tool')
}
}

return new ToolMessage({
name: tool.name,
content: typeof output === 'string' ? output : JSON.stringify(output),
tool_call_id: call.id!,
additional_kwargs: {
sourceDocuments,
artifacts,
args: call.args,
usedTools: [
{
Expand Down Expand Up @@ -489,7 +503,8 @@ const getReturnOutput = async (
tool: output.name,
toolInput: output.additional_kwargs.args,
toolOutput: output.content,
sourceDocuments: output.additional_kwargs.sourceDocuments
sourceDocuments: output.additional_kwargs.sourceDocuments,
artifacts: output.additional_kwargs.artifacts
} as IUsedTool
})

Expand Down
Loading
Loading