Skip to content

Commit a80ba4b

Browse files
HenryHengZJ0xi4o
authored andcommitted
Bugfix/Prevent streaming of chatflow tool and chain tool (#3257)
prevent streaming of chatflow tool and chain tool
1 parent d2577e5 commit a80ba4b

File tree

3 files changed

+22
-40
lines changed

3 files changed

+22
-40
lines changed

packages/components/nodes/tools/ChainTool/core.ts

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,41 +14,17 @@ export class ChainTool extends DynamicTool {
1414
super({
1515
...rest,
1616
func: async (input, runManager) => {
17-
// prevent sending SSE events of the sub-chain
18-
const sseStreamer = runManager?.handlers.find((handler) => handler instanceof CustomChainHandler)?.sseStreamer
19-
if (runManager) {
20-
const callbacks = runManager.handlers
21-
for (let i = 0; i < callbacks.length; i += 1) {
22-
if (callbacks[i] instanceof CustomChainHandler) {
23-
;(callbacks[i] as any).sseStreamer = undefined
24-
}
25-
}
26-
}
17+
const childManagers = runManager?.getChild()
18+
const handlers = childManagers?.handlers?.filter((handler) => !(handler instanceof CustomChainHandler)) || []
19+
if (childManagers) childManagers.handlers = handlers
2720

2821
if ((chain as any).prompt && (chain as any).prompt.promptValues) {
2922
const promptValues = handleEscapeCharacters((chain as any).prompt.promptValues, true)
30-
31-
const values = await chain.call(promptValues, runManager?.getChild())
32-
if (runManager && sseStreamer) {
33-
const callbacks = runManager.handlers
34-
for (let i = 0; i < callbacks.length; i += 1) {
35-
if (callbacks[i] instanceof CustomChainHandler) {
36-
;(callbacks[i] as any).sseStreamer = sseStreamer
37-
}
38-
}
39-
}
23+
const values = await chain.call(promptValues, childManagers)
4024
return values?.text
4125
}
4226

43-
const values = chain.run(input, runManager?.getChild())
44-
if (runManager && sseStreamer) {
45-
const callbacks = runManager.handlers
46-
for (let i = 0; i < callbacks.length; i += 1) {
47-
if (callbacks[i] instanceof CustomChainHandler) {
48-
;(callbacks[i] as any).sseStreamer = sseStreamer
49-
}
50-
}
51-
}
27+
const values = chain.run(input, childManagers)
5228
return values
5329
}
5430
})

packages/components/nodes/tools/ChatflowTool/ChatflowTool.ts

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import { StructuredTool } from '@langchain/core/tools'
77
import { ICommonObject, IDatabaseEntity, INode, INodeData, INodeOptionsValue, INodeParams } from '../../../src/Interface'
88
import { availableDependencies, defaultAllowBuiltInDep, getCredentialData, getCredentialParam } from '../../../src/utils'
99
import { v4 as uuidv4 } from 'uuid'
10-
import { CustomChainHandler } from '../../../src'
1110

1211
class ChatflowTool_Tools implements INode {
1312
label: string
@@ -24,7 +23,7 @@ class ChatflowTool_Tools implements INode {
2423
constructor() {
2524
this.label = 'Chatflow Tool'
2625
this.name = 'ChatflowTool'
27-
this.version = 4.0
26+
this.version = 5.0
2827
this.type = 'ChatflowTool'
2928
this.icon = 'chatflowTool.svg'
3029
this.category = 'Tools'
@@ -58,6 +57,12 @@ class ChatflowTool_Tools implements INode {
5857
placeholder:
5958
'State of the Union QA - useful for when you need to ask questions about the most recent state of the union address.'
6059
},
60+
{
61+
label: 'Return Direct',
62+
name: 'returnDirect',
63+
type: 'boolean',
64+
optional: true
65+
},
6166
{
6267
label: 'Override Config',
6368
name: 'overrideConfig',
@@ -135,6 +140,7 @@ class ChatflowTool_Tools implements INode {
135140
const _name = nodeData.inputs?.name as string
136141
const description = nodeData.inputs?.description as string
137142
const useQuestionFromChat = nodeData.inputs?.useQuestionFromChat as boolean
143+
const returnDirect = nodeData.inputs?.returnDirect as boolean
138144
const customInput = nodeData.inputs?.customInput as string
139145
const overrideConfig =
140146
typeof nodeData.inputs?.overrideConfig === 'string' &&
@@ -168,6 +174,7 @@ class ChatflowTool_Tools implements INode {
168174
name,
169175
baseURL,
170176
description,
177+
returnDirect,
171178
chatflowid: selectedChatflowId,
172179
startNewSession,
173180
headers,
@@ -206,6 +213,7 @@ class ChatflowTool extends StructuredTool {
206213
constructor({
207214
name,
208215
description,
216+
returnDirect,
209217
input,
210218
chatflowid,
211219
startNewSession,
@@ -215,6 +223,7 @@ class ChatflowTool extends StructuredTool {
215223
}: {
216224
name: string
217225
description: string
226+
returnDirect: boolean
218227
input: string
219228
chatflowid: string
220229
startNewSession: boolean
@@ -231,6 +240,7 @@ class ChatflowTool extends StructuredTool {
231240
this.headers = headers
232241
this.chatflowid = chatflowid
233242
this.overrideConfig = overrideConfig
243+
this.returnDirect = returnDirect
234244
}
235245

236246
async call(
@@ -249,15 +259,6 @@ class ChatflowTool extends StructuredTool {
249259
} catch (e) {
250260
throw new Error(`Received tool input did not match expected schema: ${JSON.stringify(arg)}`)
251261
}
252-
// iterate over the callbacks and the sse streamer
253-
if (config.callbacks instanceof CallbackManager) {
254-
const callbacks = config.callbacks.handlers
255-
for (let i = 0; i < callbacks.length; i += 1) {
256-
if (callbacks[i] instanceof CustomChainHandler) {
257-
;(callbacks[i] as any).sseStreamer = undefined
258-
}
259-
}
260-
}
261262
const callbackManager_ = await CallbackManager.configure(
262263
config.callbacks,
263264
this.callbacks,
@@ -283,6 +284,9 @@ class ChatflowTool extends StructuredTool {
283284
await runManager?.handleToolError(e)
284285
throw e
285286
}
287+
if (result && typeof result !== 'string') {
288+
result = JSON.stringify(result)
289+
}
286290
await runManager?.handleToolEnd(result)
287291
return result
288292
}

packages/server/src/utils/buildChatflow.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,8 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
360360
const nodeModule = await import(nodeInstanceFilePath)
361361
const nodeInstance = new nodeModule.nodeClass({ sessionId })
362362

363+
isStreamValid = (req.body.streaming === 'true' || req.body.streaming === true) && isStreamValid
364+
363365
let result = isStreamValid
364366
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
365367
chatId,

0 commit comments

Comments
 (0)