App run node update (#2542)

* feat(workflow): allow apps to be invoked like plugins (#2521)

* feat(workflow): allow apps to be invoked like plugins

* fix type

* Encapsulate SSE response methods (#2530)

* perf: sse response fn

* perf: sse response

* fix: ts

* perf: not ssl copy

* perf: myselect auto scroll

* perf: run app code

* fix: app plugin (#2538)

---------

Co-authored-by: heheer <heheer@sealos.io>
This commit is contained in:
Archer
2024-08-27 16:43:19 +08:00
committed by GitHub
parent 67445b40bc
commit 450167c951
67 changed files with 706 additions and 4899 deletions

View File

@@ -0,0 +1,111 @@
import type { ChatItemType } from '@fastgpt/global/core/chat/type.d';
import type { ModuleDispatchProps } from '@fastgpt/global/core/workflow/runtime/type';
import { dispatchWorkFlow } from '../index';
import { ChatRoleEnum } from '@fastgpt/global/core/chat/constants';
import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import {
getWorkflowEntryNodeIds,
initWorkflowEdgeStatus,
storeNodes2RuntimeNodes,
textAdaptGptResponse
} from '@fastgpt/global/core/workflow/runtime/utils';
import { NodeInputKeyEnum, NodeOutputKeyEnum } from '@fastgpt/global/core/workflow/constants';
import { DispatchNodeResponseKeyEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import { getHistories } from '../utils';
import { chatValue2RuntimePrompt, runtimePrompt2ChatsValue } from '@fastgpt/global/core/chat/adapt';
import { DispatchNodeResultType } from '@fastgpt/global/core/workflow/runtime/type';
import { authAppByTmbId } from '../../../../support/permission/app/auth';
import { ReadPermissionVal } from '@fastgpt/global/support/permission/constant';
type Props = ModuleDispatchProps<{
[NodeInputKeyEnum.userChatInput]: string;
[NodeInputKeyEnum.history]?: ChatItemType[] | number;
[NodeInputKeyEnum.fileUrlList]?: string[];
}>;
type Response = DispatchNodeResultType<{
[NodeOutputKeyEnum.answerText]: string;
[NodeOutputKeyEnum.history]: ChatItemType[];
}>;
export const dispatchRunAppNode = async (props: Props): Promise<Response> => {
const {
app: workflowApp,
histories,
query,
node: { pluginId },
workflowStreamResponse,
params
} = props;
const { userChatInput, history, ...variables } = params;
if (!userChatInput) {
return Promise.reject('Input is empty');
}
if (!pluginId) {
return Promise.reject('pluginId is empty');
}
// Auth the app by tmbId(Not the user, but the workflow user)
const { app: appData } = await authAppByTmbId({
appId: pluginId,
tmbId: workflowApp.tmbId,
per: ReadPermissionVal
});
// Auto line
workflowStreamResponse?.({
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: '\n'
})
});
const chatHistories = getHistories(history, histories);
const { files } = chatValue2RuntimePrompt(query);
const { flowResponses, flowUsages, assistantResponses } = await dispatchWorkFlow({
...props,
app: appData,
runtimeNodes: storeNodes2RuntimeNodes(
appData.modules,
getWorkflowEntryNodeIds(appData.modules)
),
runtimeEdges: initWorkflowEdgeStatus(appData.edges),
histories: chatHistories,
query: runtimePrompt2ChatsValue({
files,
text: userChatInput
}),
variables: variables
});
const completeMessages = chatHistories.concat([
{
obj: ChatRoleEnum.Human,
value: query
},
{
obj: ChatRoleEnum.AI,
value: assistantResponses
}
]);
const { text } = chatValue2RuntimePrompt(assistantResponses);
return {
[DispatchNodeResponseKeyEnum.nodeResponse]: {
moduleLogo: appData.avatar,
query: userChatInput,
textOutput: text,
totalPoints: flowResponses.reduce((sum, item) => sum + (item.totalPoints || 0), 0)
},
[DispatchNodeResponseKeyEnum.nodeDispatchUsages]: [
{
moduleName: appData.name,
totalPoints: flowUsages.reduce((sum, item) => sum + (item.totalPoints || 0), 0)
}
],
answerText: text,
history: completeMessages
};
};

View File

@@ -11,18 +11,14 @@ import {
ChatCompletionAssistantMessageParam
} from '@fastgpt/global/core/ai/type.d';
import { NextApiResponse } from 'next';
import {
responseWrite,
responseWriteController,
responseWriteNodeStatus
} from '../../../../../common/response';
import { responseWriteController } from '../../../../../common/response';
import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils';
import { ChatCompletionRequestMessageRoleEnum } from '@fastgpt/global/core/ai/constants';
import { dispatchWorkFlow } from '../../index';
import { DispatchToolModuleProps, RunToolResponse, ToolNodeItemType } from './type.d';
import json5 from 'json5';
import { DispatchFlowResponse } from '../../type';
import { DispatchFlowResponse, WorkflowResponseType } from '../../type';
import { countGptMessagesTokens } from '../../../../../common/string/tiktoken/index';
import { getNanoid, sliceStrStartEnd } from '@fastgpt/global/common/string/tools';
import { AIChatItemType } from '@fastgpt/global/core/chat/type';
@@ -50,9 +46,9 @@ export const runToolWithFunctionCall = async (
res,
requestOrigin,
runtimeNodes,
detail = false,
node,
stream,
workflowStreamResponse,
params: { temperature = 0, maxToken = 4000, aiChatVision }
} = props;
const assistantResponses = response?.assistantResponses || [];
@@ -143,9 +139,9 @@ export const runToolWithFunctionCall = async (
if (res && stream) {
return streamResponse({
res,
detail,
toolNodes,
stream: aiResponse
stream: aiResponse,
workflowStreamResponse
});
} else {
const result = aiResponse as ChatCompletion;
@@ -216,21 +212,18 @@ export const runToolWithFunctionCall = async (
content: stringToolResponse
};
if (stream && detail) {
responseWrite({
res,
event: SseResponseEventEnum.toolResponse,
data: JSON.stringify({
tool: {
id: tool.id,
toolName: '',
toolAvatar: '',
params: '',
response: sliceStrStartEnd(stringToolResponse, 500, 500)
}
})
});
}
workflowStreamResponse?.({
event: SseResponseEventEnum.toolResponse,
data: {
tool: {
id: tool.id,
toolName: '',
toolAvatar: '',
params: '',
response: sliceStrStartEnd(stringToolResponse, 500, 500)
}
}
});
return {
toolRunResponse,
@@ -260,12 +253,14 @@ export const runToolWithFunctionCall = async (
];
// console.log(tokens, 'tool');
if (stream && detail) {
responseWriteNodeStatus({
res,
// Run tool status
workflowStreamResponse?.({
event: SseResponseEventEnum.flowNodeStatus,
data: {
status: 'running',
name: node.name
});
}
}
});
// tool assistant
const toolAssistants = toolsRunResponse
@@ -337,14 +332,14 @@ export const runToolWithFunctionCall = async (
async function streamResponse({
res,
detail,
toolNodes,
stream
stream,
workflowStreamResponse
}: {
res: NextApiResponse;
detail: boolean;
toolNodes: ToolNodeItemType[];
stream: StreamChatType;
workflowStreamResponse?: WorkflowResponseType;
}) {
const write = responseWriteController({
res,
@@ -367,9 +362,9 @@ async function streamResponse({
const content = responseChoice?.content || '';
textAnswer += content;
responseWrite({
workflowStreamResponse?.({
write,
event: detail ? SseResponseEventEnum.answer : undefined,
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: content
})
@@ -397,22 +392,20 @@ async function streamResponse({
toolAvatar: toolNode.avatar
});
if (detail) {
responseWrite({
write,
event: SseResponseEventEnum.toolCall,
data: JSON.stringify({
tool: {
id: functionId,
toolName: toolNode.name,
toolAvatar: toolNode.avatar,
functionName: functionCall.name,
params: functionCall.arguments,
response: ''
}
})
});
}
workflowStreamResponse?.({
write,
event: SseResponseEventEnum.toolCall,
data: {
tool: {
id: functionId,
toolName: toolNode.name,
toolAvatar: toolNode.avatar,
functionName: functionCall.name,
params: functionCall.arguments,
response: ''
}
}
});
}
continue;
@@ -424,21 +417,19 @@ async function streamResponse({
if (currentTool) {
currentTool.arguments += arg;
if (detail) {
responseWrite({
write,
event: SseResponseEventEnum.toolParams,
data: JSON.stringify({
tool: {
id: functionId,
toolName: '',
toolAvatar: '',
params: arg,
response: ''
}
})
});
}
workflowStreamResponse?.({
write,
event: SseResponseEventEnum.toolParams,
data: {
tool: {
id: functionId,
toolName: '',
toolAvatar: '',
params: arg,
response: ''
}
}
});
}
}
}

View File

@@ -8,11 +8,7 @@ import {
ChatCompletionAssistantMessageParam
} from '@fastgpt/global/core/ai/type';
import { NextApiResponse } from 'next';
import {
responseWrite,
responseWriteController,
responseWriteNodeStatus
} from '../../../../../common/response';
import { responseWriteController } from '../../../../../common/response';
import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils';
import { ChatCompletionRequestMessageRoleEnum } from '@fastgpt/global/core/ai/constants';
@@ -30,6 +26,7 @@ import { AIChatItemType } from '@fastgpt/global/core/chat/type';
import { GPTMessages2Chats } from '@fastgpt/global/core/chat/adapt';
import { updateToolInputValue } from './utils';
import { computedMaxToken, computedTemperature } from '../../../../ai/utils';
import { WorkflowResponseType } from '../../type';
type FunctionCallCompletion = {
id: string;
@@ -56,9 +53,9 @@ export const runToolWithPromptCall = async (
res,
requestOrigin,
runtimeNodes,
detail = false,
node,
stream,
workflowStreamResponse,
params: { temperature = 0, maxToken = 4000, aiChatVision }
} = props;
const assistantResponses = response?.assistantResponses || [];
@@ -143,9 +140,9 @@ export const runToolWithPromptCall = async (
if (res && stream) {
const { answer } = await streamResponse({
res,
detail,
toolNodes,
stream: aiResponse
stream: aiResponse,
workflowStreamResponse
});
return answer;
@@ -159,9 +156,8 @@ export const runToolWithPromptCall = async (
const { answer: replaceAnswer, toolJson } = parseAnswer(answer);
// No tools
if (!toolJson) {
if (replaceAnswer === ERROR_TEXT && stream && detail) {
responseWrite({
res,
if (replaceAnswer === ERROR_TEXT) {
workflowStreamResponse?.({
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: replaceAnswer
@@ -206,22 +202,19 @@ export const runToolWithPromptCall = async (
})();
// SSE response to client
if (stream && detail) {
responseWrite({
res,
event: SseResponseEventEnum.toolCall,
data: JSON.stringify({
tool: {
id: toolJson.id,
toolName: toolNode.name,
toolAvatar: toolNode.avatar,
functionName: toolJson.name,
params: toolJson.arguments,
response: ''
}
})
});
}
workflowStreamResponse?.({
event: SseResponseEventEnum.toolCall,
data: {
tool: {
id: toolJson.id,
toolName: toolNode.name,
toolAvatar: toolNode.avatar,
functionName: toolJson.name,
params: toolJson.arguments,
response: ''
}
}
});
const moduleRunResponse = await dispatchWorkFlow({
...props,
@@ -245,21 +238,18 @@ export const runToolWithPromptCall = async (
return moduleRunResponse.toolResponses ? String(moduleRunResponse.toolResponses) : 'none';
})();
if (stream && detail) {
responseWrite({
res,
event: SseResponseEventEnum.toolResponse,
data: JSON.stringify({
tool: {
id: toolJson.id,
toolName: '',
toolAvatar: '',
params: '',
response: sliceStrStartEnd(stringToolResponse, 500, 500)
}
})
});
}
workflowStreamResponse?.({
event: SseResponseEventEnum.toolResponse,
data: {
tool: {
id: toolJson.id,
toolName: '',
toolAvatar: '',
params: '',
response: sliceStrStartEnd(stringToolResponse, 500, 500)
}
}
});
return {
moduleRunResponse,
@@ -267,12 +257,14 @@ export const runToolWithPromptCall = async (
};
})();
if (stream && detail) {
responseWriteNodeStatus({
res,
// Run tool status
workflowStreamResponse?.({
event: SseResponseEventEnum.flowNodeStatus,
data: {
status: 'running',
name: node.name
});
}
}
});
// 合并工具调用的结果,使用 functionCall 格式存储。
const assistantToolMsgParams: ChatCompletionAssistantMessageParam = {
@@ -340,13 +332,13 @@ ANSWER: `;
async function streamResponse({
res,
detail,
stream
stream,
workflowStreamResponse
}: {
res: NextApiResponse;
detail: boolean;
toolNodes: ToolNodeItemType[];
stream: StreamChatType;
workflowStreamResponse?: WorkflowResponseType;
}) {
const write = responseWriteController({
res,
@@ -370,9 +362,9 @@ async function streamResponse({
textAnswer += content;
if (startResponseWrite) {
responseWrite({
workflowStreamResponse?.({
write,
event: detail ? SseResponseEventEnum.answer : undefined,
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: content
})
@@ -384,9 +376,9 @@ async function streamResponse({
// find first : index
const firstIndex = textAnswer.indexOf(':');
textAnswer = textAnswer.substring(firstIndex + 1).trim();
responseWrite({
workflowStreamResponse?.({
write,
event: detail ? SseResponseEventEnum.answer : undefined,
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: textAnswer
})

View File

@@ -12,24 +12,21 @@ import {
ChatCompletionAssistantMessageParam
} from '@fastgpt/global/core/ai/type';
import { NextApiResponse } from 'next';
import {
responseWrite,
responseWriteController,
responseWriteNodeStatus
} from '../../../../../common/response';
import { responseWriteController } from '../../../../../common/response';
import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils';
import { ChatCompletionRequestMessageRoleEnum } from '@fastgpt/global/core/ai/constants';
import { dispatchWorkFlow } from '../../index';
import { DispatchToolModuleProps, RunToolResponse, ToolNodeItemType } from './type.d';
import json5 from 'json5';
import { DispatchFlowResponse } from '../../type';
import { DispatchFlowResponse, WorkflowResponseType } from '../../type';
import { countGptMessagesTokens } from '../../../../../common/string/tiktoken/index';
import { GPTMessages2Chats } from '@fastgpt/global/core/chat/adapt';
import { AIChatItemType } from '@fastgpt/global/core/chat/type';
import { updateToolInputValue } from './utils';
import { computedMaxToken, computedTemperature } from '../../../../ai/utils';
import { sliceStrStartEnd } from '@fastgpt/global/common/string/tools';
import { addLog } from '../../../../../common/system/log';
type ToolRunResponseType = {
toolRunResponse: DispatchFlowResponse;
@@ -58,9 +55,9 @@ export const runToolWithToolChoice = async (
res,
requestOrigin,
runtimeNodes,
detail = false,
node,
stream,
workflowStreamResponse,
params: { temperature = 0, maxToken = 4000, aiChatVision }
} = props;
const assistantResponses = response?.assistantResponses || [];
@@ -145,91 +142,91 @@ export const runToolWithToolChoice = async (
const ai = getAIApi({
timeout: 480000
});
const aiResponse = await ai.chat.completions.create(requestBody, {
headers: {
Accept: 'application/json, text/plain, */*'
}
});
const { answer, toolCalls } = await (async () => {
if (res && stream) {
return streamResponse({
res,
detail,
toolNodes,
stream: aiResponse
});
} else {
const result = aiResponse as ChatCompletion;
const calls = result.choices?.[0]?.message?.tool_calls || [];
try {
const aiResponse = await ai.chat.completions.create(requestBody, {
headers: {
Accept: 'application/json, text/plain, */*'
}
});
// 加上name和avatar
const toolCalls = calls.map((tool) => {
const toolNode = toolNodes.find((item) => item.nodeId === tool.function?.name);
return {
...tool,
toolName: toolNode?.name || '',
toolAvatar: toolNode?.avatar || ''
};
});
const { answer, toolCalls } = await (async () => {
if (res && stream) {
return streamResponse({
res,
workflowStreamResponse,
toolNodes,
stream: aiResponse
});
} else {
const result = aiResponse as ChatCompletion;
const calls = result.choices?.[0]?.message?.tool_calls || [];
return {
answer: result.choices?.[0]?.message?.content || '',
toolCalls: toolCalls
};
}
})();
// Run the selected tool by LLM.
const toolsRunResponse = (
await Promise.all(
toolCalls.map(async (tool) => {
const toolNode = toolNodes.find((item) => item.nodeId === tool.function?.name);
if (!toolNode) return;
const startParams = (() => {
try {
return json5.parse(tool.function.arguments);
} catch (error) {
return {};
}
})();
const toolRunResponse = await dispatchWorkFlow({
...props,
isToolCall: true,
runtimeNodes: runtimeNodes.map((item) =>
item.nodeId === toolNode.nodeId
? {
...item,
isEntry: true,
inputs: updateToolInputValue({ params: startParams, inputs: item.inputs })
}
: item
)
// 加上name和avatar
const toolCalls = calls.map((tool) => {
const toolNode = toolNodes.find((item) => item.nodeId === tool.function?.name);
return {
...tool,
toolName: toolNode?.name || '',
toolAvatar: toolNode?.avatar || ''
};
});
const stringToolResponse = (() => {
if (typeof toolRunResponse.toolResponses === 'object') {
return JSON.stringify(toolRunResponse.toolResponses, null, 2);
}
return toolRunResponse.toolResponses ? String(toolRunResponse.toolResponses) : 'none';
})();
const toolMsgParams: ChatCompletionToolMessageParam = {
tool_call_id: tool.id,
role: ChatCompletionRequestMessageRoleEnum.Tool,
name: tool.function.name,
content: stringToolResponse
return {
answer: result.choices?.[0]?.message?.content || '',
toolCalls: toolCalls
};
}
})();
if (stream && detail) {
responseWrite({
res,
// Run the selected tool by LLM.
const toolsRunResponse = (
await Promise.all(
toolCalls.map(async (tool) => {
const toolNode = toolNodes.find((item) => item.nodeId === tool.function?.name);
if (!toolNode) return;
const startParams = (() => {
try {
return json5.parse(tool.function.arguments);
} catch (error) {
return {};
}
})();
const toolRunResponse = await dispatchWorkFlow({
...props,
isToolCall: true,
runtimeNodes: runtimeNodes.map((item) =>
item.nodeId === toolNode.nodeId
? {
...item,
isEntry: true,
inputs: updateToolInputValue({ params: startParams, inputs: item.inputs })
}
: item
)
});
const stringToolResponse = (() => {
if (typeof toolRunResponse.toolResponses === 'object') {
return JSON.stringify(toolRunResponse.toolResponses, null, 2);
}
return toolRunResponse.toolResponses ? String(toolRunResponse.toolResponses) : 'none';
})();
const toolMsgParams: ChatCompletionToolMessageParam = {
tool_call_id: tool.id,
role: ChatCompletionRequestMessageRoleEnum.Tool,
name: tool.function.name,
content: stringToolResponse
};
workflowStreamResponse?.({
event: SseResponseEventEnum.toolResponse,
data: JSON.stringify({
data: {
tool: {
id: tool.id,
toolName: '',
@@ -237,123 +234,130 @@ export const runToolWithToolChoice = async (
params: '',
response: sliceStrStartEnd(stringToolResponse, 500, 500)
}
})
}
});
return {
toolRunResponse,
toolMsgParams
};
})
)
).filter(Boolean) as ToolRunResponseType;
const flatToolsResponseData = toolsRunResponse.map((item) => item.toolRunResponse).flat();
if (toolCalls.length > 0 && !res?.closed) {
// Run the tool, combine its results, and perform another round of AI calls
const assistantToolMsgParams: ChatCompletionAssistantToolParam = {
role: ChatCompletionRequestMessageRoleEnum.Assistant,
tool_calls: toolCalls
};
const concatToolMessages = [
...requestMessages,
assistantToolMsgParams
] as ChatCompletionMessageParam[];
const tokens = await countGptMessagesTokens(concatToolMessages, tools);
const completeMessages = [
...concatToolMessages,
...toolsRunResponse.map((item) => item?.toolMsgParams)
];
// console.log(tokens, 'tool');
// Run tool status
workflowStreamResponse?.({
event: SseResponseEventEnum.flowNodeStatus,
data: {
status: 'running',
name: node.name
}
return {
toolRunResponse,
toolMsgParams
};
})
)
).filter(Boolean) as ToolRunResponseType;
const flatToolsResponseData = toolsRunResponse.map((item) => item.toolRunResponse).flat();
if (toolCalls.length > 0 && !res?.closed) {
// Run the tool, combine its results, and perform another round of AI calls
const assistantToolMsgParams: ChatCompletionAssistantToolParam = {
role: ChatCompletionRequestMessageRoleEnum.Assistant,
tool_calls: toolCalls
};
const concatToolMessages = [
...requestMessages,
assistantToolMsgParams
] as ChatCompletionMessageParam[];
const tokens = await countGptMessagesTokens(concatToolMessages, tools);
const completeMessages = [
...concatToolMessages,
...toolsRunResponse.map((item) => item?.toolMsgParams)
];
// console.log(tokens, 'tool');
if (stream && detail) {
responseWriteNodeStatus({
res,
name: node.name
});
}
// tool assistant
const toolAssistants = toolsRunResponse
.map((item) => {
const assistantResponses = item.toolRunResponse.assistantResponses || [];
return assistantResponses;
})
.flat();
// tool assistant
const toolAssistants = toolsRunResponse
.map((item) => {
const assistantResponses = item.toolRunResponse.assistantResponses || [];
return assistantResponses;
})
.flat();
// tool node assistant
const adaptChatMessages = GPTMessages2Chats(completeMessages);
const toolNodeAssistant = adaptChatMessages.pop() as AIChatItemType;
// tool node assistant
const adaptChatMessages = GPTMessages2Chats(completeMessages);
const toolNodeAssistant = adaptChatMessages.pop() as AIChatItemType;
const toolNodeAssistants = [
...assistantResponses,
...toolAssistants,
...toolNodeAssistant.value
];
const toolNodeAssistants = [
...assistantResponses,
...toolAssistants,
...toolNodeAssistant.value
];
// concat tool responses
const dispatchFlowResponse = response
? response.dispatchFlowResponse.concat(flatToolsResponseData)
: flatToolsResponseData;
// concat tool responses
const dispatchFlowResponse = response
? response.dispatchFlowResponse.concat(flatToolsResponseData)
: flatToolsResponseData;
/* check stop signal */
const hasStopSignal = flatToolsResponseData.some(
(item) => !!item.flowResponses?.find((item) => item.toolStop)
);
if (hasStopSignal) {
return {
dispatchFlowResponse,
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
completeMessages,
assistantResponses: toolNodeAssistants
};
}
return runToolWithToolChoice(
{
...props,
messages: completeMessages
},
{
dispatchFlowResponse,
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
assistantResponses: toolNodeAssistants
}
);
} else {
// No tool is invoked, indicating that the process is over
const gptAssistantResponse: ChatCompletionAssistantMessageParam = {
role: ChatCompletionRequestMessageRoleEnum.Assistant,
content: answer
};
const completeMessages = filterMessages.concat(gptAssistantResponse);
const tokens = await countGptMessagesTokens(completeMessages, tools);
// console.log(tokens, 'response token');
// concat tool assistant
const toolNodeAssistant = GPTMessages2Chats([gptAssistantResponse])[0] as AIChatItemType;
/* check stop signal */
const hasStopSignal = flatToolsResponseData.some(
(item) => !!item.flowResponses?.find((item) => item.toolStop)
);
if (hasStopSignal) {
return {
dispatchFlowResponse,
dispatchFlowResponse: response?.dispatchFlowResponse || [],
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
completeMessages,
assistantResponses: toolNodeAssistants
assistantResponses: [...assistantResponses, ...toolNodeAssistant.value]
};
}
return runToolWithToolChoice(
{
...props,
messages: completeMessages
},
{
dispatchFlowResponse,
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
assistantResponses: toolNodeAssistants
}
);
} else {
// No tool is invoked, indicating that the process is over
const gptAssistantResponse: ChatCompletionAssistantMessageParam = {
role: ChatCompletionRequestMessageRoleEnum.Assistant,
content: answer
};
const completeMessages = filterMessages.concat(gptAssistantResponse);
const tokens = await countGptMessagesTokens(completeMessages, tools);
// console.log(tokens, 'response token');
// concat tool assistant
const toolNodeAssistant = GPTMessages2Chats([gptAssistantResponse])[0] as AIChatItemType;
return {
dispatchFlowResponse: response?.dispatchFlowResponse || [],
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
completeMessages,
assistantResponses: [...assistantResponses, ...toolNodeAssistant.value]
};
} catch (error) {
addLog.warn(`LLM response error`, {
requestBody
});
return Promise.reject(error);
}
};
async function streamResponse({
res,
detail,
toolNodes,
stream
stream,
workflowStreamResponse
}: {
res: NextApiResponse;
detail: boolean;
toolNodes: ToolNodeItemType[];
stream: StreamChatType;
workflowStreamResponse?: WorkflowResponseType;
}) {
const write = responseWriteController({
res,
@@ -375,9 +379,9 @@ async function streamResponse({
const content = responseChoice.content || '';
textAnswer += content;
responseWrite({
workflowStreamResponse?.({
write,
event: detail ? SseResponseEventEnum.answer : undefined,
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: content
})
@@ -405,22 +409,19 @@ async function streamResponse({
toolAvatar: toolNode.avatar
});
if (detail) {
responseWrite({
write,
event: SseResponseEventEnum.toolCall,
data: JSON.stringify({
tool: {
id: toolCall.id,
toolName: toolNode.name,
toolAvatar: toolNode.avatar,
functionName: toolCall.function.name,
params: toolCall.function.arguments,
response: ''
}
})
});
}
workflowStreamResponse?.({
event: SseResponseEventEnum.toolCall,
data: {
tool: {
id: toolCall.id,
toolName: toolNode.name,
toolAvatar: toolNode.avatar,
functionName: toolCall.function.name,
params: toolCall.function.arguments,
response: ''
}
}
});
continue;
}
@@ -437,21 +438,19 @@ async function streamResponse({
if (currentTool) {
currentTool.function.arguments += arg;
if (detail) {
responseWrite({
write,
event: SseResponseEventEnum.toolParams,
data: JSON.stringify({
tool: {
id: currentTool.id,
toolName: '',
toolAvatar: '',
params: arg,
response: ''
}
})
});
}
workflowStreamResponse?.({
write,
event: SseResponseEventEnum.toolParams,
data: {
tool: {
id: currentTool.id,
toolName: '',
toolAvatar: '',
params: arg,
response: ''
}
}
});
}
}
}

View File

@@ -31,7 +31,7 @@ import {
import type { AIChatNodeProps } from '@fastgpt/global/core/workflow/runtime/type.d';
import { replaceVariable } from '@fastgpt/global/common/string/tools';
import type { ModuleDispatchProps } from '@fastgpt/global/core/workflow/runtime/type';
import { responseWrite, responseWriteController } from '../../../../common/response';
import { responseWriteController } from '../../../../common/response';
import { getLLMModel, ModelTypeEnum } from '../../../ai/model';
import type { SearchDataResponseItemType } from '@fastgpt/global/core/dataset/type';
import { NodeInputKeyEnum, NodeOutputKeyEnum } from '@fastgpt/global/core/workflow/constants';
@@ -41,6 +41,7 @@ import { filterSearchResultsByMaxChars } from '../../utils';
import { getHistoryPreview } from '@fastgpt/global/core/chat/utils';
import { addLog } from '../../../../common/system/log';
import { computedMaxToken, computedTemperature } from '../../../ai/utils';
import { WorkflowResponseType } from '../type';
export type ChatProps = ModuleDispatchProps<
AIChatNodeProps & {
@@ -60,11 +61,11 @@ export const dispatchChatCompletion = async (props: ChatProps): Promise<ChatResp
res,
requestOrigin,
stream = false,
detail = false,
user,
histories,
node: { name },
query,
workflowStreamResponse,
params: {
model,
temperature = 0,
@@ -179,8 +180,8 @@ export const dispatchChatCompletion = async (props: ChatProps): Promise<ChatResp
// sse response
const { answer } = await streamResponse({
res,
detail,
stream: response
stream: response,
workflowStreamResponse
});
if (!answer) {
@@ -340,12 +341,12 @@ async function getChatMessages({
async function streamResponse({
res,
detail,
stream
stream,
workflowStreamResponse
}: {
res: NextApiResponse;
detail: boolean;
stream: StreamChatType;
workflowStreamResponse?: WorkflowResponseType;
}) {
const write = responseWriteController({
res,
@@ -360,9 +361,9 @@ async function streamResponse({
const content = part.choices?.[0]?.delta?.content || '';
answer += content;
responseWrite({
workflowStreamResponse?.({
write,
event: detail ? SseResponseEventEnum.answer : undefined,
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: content
})

View File

@@ -1,4 +1,3 @@
import { NextApiResponse } from 'next';
import { NodeInputKeyEnum } from '@fastgpt/global/core/workflow/constants';
import {
DispatchNodeResponseKeyEnum,
@@ -21,7 +20,6 @@ import {
FlowNodeTypeEnum
} from '@fastgpt/global/core/workflow/node/constant';
import { replaceVariable } from '@fastgpt/global/common/string/tools';
import { responseWrite, responseWriteNodeStatus } from '../../../common/response';
import { getSystemTime } from '@fastgpt/global/common/time/timezone';
import { replaceVariableLabel } from '@fastgpt/global/core/workflow/utils';
@@ -41,8 +39,7 @@ import { dispatchPluginOutput } from './plugin/runOutput';
import { removeSystemVariable, valueTypeFormat } from './utils';
import {
filterWorkflowEdges,
checkNodeRunStatus,
getLastInteractiveValue
checkNodeRunStatus
} from '@fastgpt/global/core/workflow/runtime/utils';
import { ChatNodeUsageType } from '@fastgpt/global/support/wallet/bill/type';
import { dispatchRunTools } from './agent/runTool/index';
@@ -62,12 +59,11 @@ import { dispatchTextEditor } from './tools/textEditor';
import { dispatchCustomFeedback } from './tools/customFeedback';
import { dispatchReadFiles } from './tools/readFiles';
import { dispatchUserSelect } from './interactive/userSelect';
import { FlowNodeOutputItemType } from '@fastgpt/global/core/workflow/type/io';
import {
InteractiveNodeResponseItemType,
UserInteractiveType,
UserSelectInteractive
} from '@fastgpt/global/core/workflow/template/system/userSelect/type';
import { dispatchRunAppNode } from './agent/runAppModule';
const callbackMap: Record<FlowNodeTypeEnum, Function> = {
[FlowNodeTypeEnum.workflowStart]: dispatchWorkflowStart,
@@ -79,6 +75,7 @@ const callbackMap: Record<FlowNodeTypeEnum, Function> = {
[FlowNodeTypeEnum.contentExtract]: dispatchContentExtract,
[FlowNodeTypeEnum.httpRequest468]: dispatchHttp468Request,
[FlowNodeTypeEnum.runApp]: dispatchAppRequest,
[FlowNodeTypeEnum.appModule]: dispatchRunAppNode,
[FlowNodeTypeEnum.pluginModule]: dispatchRunPlugin,
[FlowNodeTypeEnum.pluginInput]: dispatchPluginInput,
[FlowNodeTypeEnum.pluginOutput]: dispatchPluginOutput,
@@ -115,7 +112,6 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
variables = {},
user,
stream = false,
detail = false,
...props
} = data;
@@ -261,13 +257,10 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
nodeOutputs
};
if (stream && res) {
responseWrite({
res,
event: SseResponseEventEnum.interactive,
data: JSON.stringify({ interactive: interactiveResult })
});
}
props.workflowStreamResponse?.({
event: SseResponseEventEnum.interactive,
data: { interactive: interactiveResult }
});
return {
type: ChatItemValueTypeEnum.interactive,
@@ -401,11 +394,13 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
}
async function nodeRunWithActive(node: RuntimeNodeItemType) {
// push run status messages
if (res && stream && detail && node.showStatus) {
responseStatus({
res,
name: node.name,
status: 'running'
if (node.showStatus) {
props.workflowStreamResponse?.({
event: SseResponseEventEnum.flowNodeStatus,
data: {
status: 'running',
name: node.name
}
});
}
const startTime = Date.now();
@@ -420,7 +415,6 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
histories,
user,
stream,
detail,
node,
runtimeNodes,
runtimeEdges,
@@ -510,23 +504,6 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
};
}
/* sse response modules staus */
export function responseStatus({
res,
status,
name
}: {
res: NextApiResponse;
status?: 'running' | 'finish';
name?: string;
}) {
if (!name) return;
responseWriteNodeStatus({
res,
name
});
}
/* get system variable */
export function getSystemVariable({
user,

View File

@@ -14,7 +14,6 @@ import type {
} from '@fastgpt/global/core/workflow/template/system/userSelect/type';
import { updateUserSelectedResult } from '../../../chat/controller';
import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils';
import { responseWrite } from '../../../../common/response';
import { chatValue2RuntimePrompt } from '@fastgpt/global/core/chat/adapt';
type Props = ModuleDispatchProps<{
@@ -29,10 +28,7 @@ type UserSelectResponse = DispatchNodeResultType<{
export const dispatchUserSelect = async (props: Props): Promise<UserSelectResponse> => {
const {
res,
detail,
histories,
stream,
workflowStreamResponse,
app: { _id: appId },
chatId,
node: { nodeId, isEntry },
@@ -43,10 +39,9 @@ export const dispatchUserSelect = async (props: Props): Promise<UserSelectRespon
// Interactive node is not the entry node, return interactive result
if (!isEntry) {
const answerText = description ? `\n${description}` : undefined;
if (res && stream && answerText) {
responseWrite({
res,
event: detail ? SseResponseEventEnum.fastAnswer : undefined,
if (answerText) {
workflowStreamResponse?.({
event: SseResponseEventEnum.fastAnswer,
data: textAdaptGptResponse({
text: answerText
})

View File

@@ -2,7 +2,6 @@ import {
DispatchNodeResponseKeyEnum,
SseResponseEventEnum
} from '@fastgpt/global/core/workflow/runtime/constants';
import { responseWrite } from '../../../../common/response';
import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils';
import type { ModuleDispatchProps } from '@fastgpt/global/core/workflow/runtime/type';
import { NodeOutputKeyEnum } from '@fastgpt/global/core/workflow/constants';
@@ -16,24 +15,19 @@ export type AnswerResponse = DispatchNodeResultType<{
export const dispatchAnswer = (props: Record<string, any>): AnswerResponse => {
const {
res,
detail,
stream,
workflowStreamResponse,
params: { text = '' }
} = props as AnswerProps;
const formatText = typeof text === 'string' ? text : JSON.stringify(text, null, 2);
const responseText = `\n${formatText}`;
if (res && stream) {
responseWrite({
res,
event: detail ? SseResponseEventEnum.fastAnswer : undefined,
data: textAdaptGptResponse({
text: responseText
})
});
}
workflowStreamResponse?.({
event: SseResponseEventEnum.fastAnswer,
data: textAdaptGptResponse({
text: responseText
})
});
return {
[NodeOutputKeyEnum.answerText]: responseText,

View File

@@ -6,7 +6,6 @@ import type { ModuleDispatchProps } from '@fastgpt/global/core/workflow/runtime/
import { NodeInputKeyEnum } from '@fastgpt/global/core/workflow/constants';
import { DispatchNodeResultType } from '@fastgpt/global/core/workflow/runtime/type';
import { addCustomFeedbacks } from '../../../chat/controller';
import { responseWrite } from '../../../../common/response';
import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils';
type Props = ModuleDispatchProps<{
@@ -16,12 +15,11 @@ type Response = DispatchNodeResultType<{}>;
export const dispatchCustomFeedback = (props: Record<string, any>): Response => {
const {
res,
app: { _id: appId },
chatId,
responseChatItemId: chatItemId,
stream,
detail,
workflowStreamResponse,
params: { system_textareaInput: feedbackText = '' }
} = props as Props;
@@ -36,9 +34,8 @@ export const dispatchCustomFeedback = (props: Record<string, any>): Response =>
if (stream) {
if (!chatId || !chatItemId) {
responseWrite({
res,
event: detail ? SseResponseEventEnum.fastAnswer : undefined,
workflowStreamResponse?.({
event: SseResponseEventEnum.fastAnswer,
data: textAdaptGptResponse({
text: `\n\n**自定义反馈成功: (仅调试模式下展示该内容)**: "${feedbackText}"\n\n`
})

View File

@@ -14,7 +14,6 @@ import { SERVICE_LOCAL_HOST } from '../../../../common/system/tools';
import { addLog } from '../../../../common/system/log';
import { DispatchNodeResultType } from '@fastgpt/global/core/workflow/runtime/type';
import { getErrText } from '@fastgpt/global/common/error/utils';
import { responseWrite } from '../../../../common/response';
import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils';
import { getSystemPluginCb } from '../../../../../plugins/register';
@@ -43,15 +42,13 @@ const UNDEFINED_SIGN = 'UNDEFINED_SIGN';
export const dispatchHttp468Request = async (props: HttpRequestProps): Promise<HttpResponse> => {
let {
res,
detail,
app: { _id: appId },
chatId,
stream,
responseChatItemId,
variables,
node: { outputs },
histories,
workflowStreamResponse,
params: {
system_httpMethod: httpMethod = 'POST',
system_httpReqUrl: httpReqUrl,
@@ -158,10 +155,9 @@ export const dispatchHttp468Request = async (props: HttpRequestProps): Promise<H
results[key] = valueTypeFormat(formatResponse[key], output.valueType);
}
if (stream && typeof formatResponse[NodeOutputKeyEnum.answerText] === 'string') {
responseWrite({
res,
event: detail ? SseResponseEventEnum.fastAnswer : undefined,
if (typeof formatResponse[NodeOutputKeyEnum.answerText] === 'string') {
workflowStreamResponse?.({
event: SseResponseEventEnum.fastAnswer,
data: textAdaptGptResponse({
text: formatResponse[NodeOutputKeyEnum.answerText]
})

View File

@@ -2,7 +2,6 @@ import type { ChatItemType } from '@fastgpt/global/core/chat/type.d';
import type { ModuleDispatchProps } from '@fastgpt/global/core/workflow/runtime/type';
import { SelectAppItemType } from '@fastgpt/global/core/workflow/template/system/runApp/type';
import { dispatchWorkFlow } from '../index';
import { responseWrite } from '../../../../common/response';
import { ChatRoleEnum } from '@fastgpt/global/core/chat/constants';
import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import {
@@ -31,10 +30,8 @@ type Response = DispatchNodeResultType<{
export const dispatchAppRequest = async (props: Props): Promise<Response> => {
const {
res,
app: workflowApp,
stream,
detail,
workflowStreamResponse,
histories,
query,
params: { userChatInput, history, app }
@@ -51,15 +48,12 @@ export const dispatchAppRequest = async (props: Props): Promise<Response> => {
per: ReadPermissionVal
});
if (res && stream) {
responseWrite({
res,
event: detail ? SseResponseEventEnum.answer : undefined,
data: textAdaptGptResponse({
text: '\n'
})
});
}
workflowStreamResponse?.({
event: SseResponseEventEnum.fastAnswer,
data: textAdaptGptResponse({
text: '\n'
})
});
const chatHistories = getHistories(history, histories);
const { files } = chatValue2RuntimePrompt(query);

View File

@@ -8,7 +8,6 @@ import { getReferenceVariableValue } from '@fastgpt/global/core/workflow/runtime
import { TUpdateListItem } from '@fastgpt/global/core/workflow/template/system/variableUpdate/type';
import { ModuleDispatchProps } from '@fastgpt/global/core/workflow/runtime/type';
import { removeSystemVariable, valueTypeFormat } from '../utils';
import { responseWrite } from '../../../../common/response';
type Props = ModuleDispatchProps<{
[NodeInputKeyEnum.updateList]: TUpdateListItem[];
@@ -16,7 +15,7 @@ type Props = ModuleDispatchProps<{
type Response = DispatchNodeResultType<{}>;
export const dispatchUpdateVariable = async (props: Props): Promise<Response> => {
const { res, detail, stream, params, variables, runtimeNodes } = props;
const { params, variables, runtimeNodes, workflowStreamResponse } = props;
const { updateList } = params;
updateList.forEach((item) => {
@@ -54,13 +53,10 @@ export const dispatchUpdateVariable = async (props: Props): Promise<Response> =>
}
});
if (detail && stream) {
responseWrite({
res,
event: SseResponseEventEnum.updateVariables,
data: JSON.stringify(removeSystemVariable(variables))
});
}
workflowStreamResponse?.({
event: SseResponseEventEnum.updateVariables,
data: removeSystemVariable(variables)
});
return {
[DispatchNodeResponseKeyEnum.nodeResponse]: {

View File

@@ -4,7 +4,10 @@ import {
ChatItemValueItemType,
ToolRunResponseItemType
} from '@fastgpt/global/core/chat/type';
import { DispatchNodeResponseKeyEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import {
DispatchNodeResponseKeyEnum,
SseResponseEventEnum
} from '@fastgpt/global/core/workflow/runtime/constants';
import { RuntimeNodeItemType } from '@fastgpt/global/core/workflow/runtime/type';
import { RuntimeEdgeItemType } from '@fastgpt/global/core/workflow/type/edge';
import { ChatNodeUsageType } from '@fastgpt/global/support/wallet/bill/type';
@@ -21,3 +24,15 @@ export type DispatchFlowResponse = {
[DispatchNodeResponseKeyEnum.assistantResponses]: AIChatItemValueItemType[];
newVariables: Record<string, string>;
};
export type WorkflowResponseType = ({
write,
event,
data,
stream
}: {
write?: ((text: string) => void) | undefined;
event: SseResponseEventEnum;
data: Record<string, any>;
stream?: boolean | undefined;
}) => void;

View File

@@ -6,6 +6,56 @@ import {
NodeOutputKeyEnum
} from '@fastgpt/global/core/workflow/constants';
import { RuntimeEdgeItemType } from '@fastgpt/global/core/workflow/runtime/type';
import { responseWrite } from '../../../common/response';
import { NextApiResponse } from 'next';
import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';
export const getWorkflowResponseWrite = ({
res,
detail,
streamResponse,
id
}: {
res?: NextApiResponse;
detail: boolean;
streamResponse: boolean;
id: string;
}) => {
return ({
write,
event,
data,
stream
}: {
write?: (text: string) => void;
event: SseResponseEventEnum;
data: Record<string, any>;
stream?: boolean; // Focus set stream response
}) => {
const useStreamResponse = stream ?? streamResponse;
if (!res || res.closed || !useStreamResponse) return;
const detailEvent = [
SseResponseEventEnum.error,
SseResponseEventEnum.flowNodeStatus,
SseResponseEventEnum.flowResponses,
SseResponseEventEnum.interactive,
SseResponseEventEnum.toolCall,
SseResponseEventEnum.toolParams,
SseResponseEventEnum.toolResponse,
SseResponseEventEnum.updateVariables
];
if (!detail && detailEvent.includes(event)) return;
responseWrite({
res,
write,
event: detail ? event : undefined,
data: JSON.stringify(data)
});
};
};
export const filterToolNodeIdByEdges = ({
nodeId,