diff --git a/apps/application/flow/i_step_node.py b/apps/application/flow/i_step_node.py index aa1c5266ec1..cb13aa1769a 100644 --- a/apps/application/flow/i_step_node.py +++ b/apps/application/flow/i_step_node.py @@ -123,7 +123,19 @@ def __init__(self, chat_info, tool_id): def handler(self, workflow): state = get_workflow_state(workflow) - ToolRecord(tool_id=self.tool_id) + record = ToolRecord(id=self.chat_info.tool_record_id, tool_id=self.tool_id, + workspace_id=self.chat_info.workspace_id, + source_type=self.chat_info.source_type, + source_id=self.chat_info.source_id, + state=state, + meta={ + 'output': workflow.out_context, + 'details': workflow.get_runtime_details(), + 'answer_text_list': workflow.get_answer_text_list() + }) + self.chat_info.set_record(record) + self.chat_info = None + self.tool_id = None def get_loop_workflow_node(node_list): diff --git a/apps/application/flow/knowledge_loop_workflow_manage.py b/apps/application/flow/knowledge_loop_workflow_manage.py index fbda319c1ad..31d3ab4df25 100644 --- a/apps/application/flow/knowledge_loop_workflow_manage.py +++ b/apps/application/flow/knowledge_loop_workflow_manage.py @@ -13,3 +13,9 @@ class KnowledgeLoopWorkflowManage(LoopWorkflowManage): def get_params_serializer_class(self): return KnowledgeFlowParamsSerializer + + def get_source_type(self): + return "KNOWLEDGE" + + def get_source_id(self): + return self.params.get('knowledge_id') diff --git a/apps/application/flow/knowledge_workflow_manage.py b/apps/application/flow/knowledge_workflow_manage.py index 4a19803a367..98212c9ee5a 100644 --- a/apps/application/flow/knowledge_workflow_manage.py +++ b/apps/application/flow/knowledge_workflow_manage.py @@ -122,3 +122,9 @@ def hand_node_result(self, current_node, node_result_future): current_node.node_chunk.end() QuerySet(KnowledgeAction).filter(id=self.params.get('knowledge_action_id')).update( details=self.get_runtime_details()) + + def get_source_type(self): + return "KNOWLEDGE" + + def get_source_id(self): + return self.params.get('knowledge_id') diff --git a/apps/application/flow/loop_workflow_manage.py b/apps/application/flow/loop_workflow_manage.py index 27c84f4dc66..c236b15dcc5 100644 --- a/apps/application/flow/loop_workflow_manage.py +++ b/apps/application/flow/loop_workflow_manage.py @@ -191,3 +191,9 @@ def generate_prompt(self, prompt: str): prompt_template = PromptTemplate.from_template(prompt, template_format='jinja2') value = prompt_template.format(context=context) return value + + def get_source_type(self): + return "APPLICATION" + + def get_source_id(self): + return self.params.get('application_id') diff --git a/apps/application/flow/step_node/__init__.py b/apps/application/flow/step_node/__init__.py index 01b9a83570a..4c38020771e 100644 --- a/apps/application/flow/step_node/__init__.py +++ b/apps/application/flow/step_node/__init__.py @@ -35,6 +35,7 @@ from .text_to_video_step_node.impl.base_text_to_video_node import BaseTextToVideoNode from .tool_lib_node import * from .tool_node import * +from .tool_workflow_lib_node import BaseToolWorkflowLibNodeNode from .variable_aggregation_node.impl.base_variable_aggregation_node import BaseVariableAggregationNode from .variable_assign_node import BaseVariableAssignNode from .variable_splitting_node import BaseVariableSplittingNode @@ -53,7 +54,7 @@ BaseLoopContinueNode, BaseLoopBreakNode, BaseVariableSplittingNode, BaseParameterExtractionNode, BaseVariableAggregationNode, BaseDataSourceLocalNode, BaseDataSourceWebNode, BaseKnowledgeWriteNode, BaseDocumentSplitNode, - BaseToolStartStepNode] + BaseToolStartStepNode, BaseToolWorkflowLibNodeNode] node_map = {n.type: {w: n for w in n.support} for n in node_list} diff --git a/apps/application/flow/step_node/form_node/impl/base_form_node.py b/apps/application/flow/step_node/form_node/impl/base_form_node.py index 930125124fa..30dfa97229b 100644 --- a/apps/application/flow/step_node/form_node/impl/base_form_node.py +++ b/apps/application/flow/step_node/form_node/impl/base_form_node.py @@ -144,8 +144,9 @@ def get_answer_list(self) -> List[Answer] | None: value = prompt_template.format(form=form, context=context, runtime_node_id=self.runtime_node_id, chat_record_id=self.flow_params_serializer.data.get("chat_record_id"), form_field_list=form_field_list) - return [Answer(value, self.view_type, self.runtime_node_id, self.workflow_params['chat_record_id'], None, - self.runtime_node_id, '')] + return [ + Answer(value, self.view_type, self.runtime_node_id, self.workflow_params.get('chat_record_id') or '', None, + self.runtime_node_id, '')] def get_details(self, index: int, **kwargs): form_content_format = self.context.get('form_content_format') diff --git a/apps/application/flow/step_node/loop_node/impl/base_loop_node.py b/apps/application/flow/step_node/loop_node/impl/base_loop_node.py index 7ffbaf26d58..6b03d628664 100644 --- a/apps/application/flow/step_node/loop_node/impl/base_loop_node.py +++ b/apps/application/flow/step_node/loop_node/impl/base_loop_node.py @@ -268,10 +268,16 @@ def get_loop_context(self): def execute(self, loop_type, array, number, loop_body, **kwargs) -> NodeResult: from application.flow.loop_workflow_manage import LoopWorkflowManage, Workflow from application.flow.knowledge_loop_workflow_manage import KnowledgeLoopWorkflowManage + from application.flow.tool_loop_workflow_manage import ToolLoopWorkflowManage def workflow_manage_new_instance(loop_data, global_data, start_node_id=None, start_node_data=None, chat_record=None, child_node=None): - workflow_mode = WorkflowMode.KNOWLEDGE_LOOP if WorkflowMode.KNOWLEDGE == self.workflow_manage.flow.workflow_mode else WorkflowMode.APPLICATION_LOOP - c = KnowledgeLoopWorkflowManage if workflow_mode == WorkflowMode.KNOWLEDGE_LOOP else LoopWorkflowManage + workflow_mode = {WorkflowMode.APPLICATION: WorkflowMode.APPLICATION_LOOP, + WorkflowMode.KNOWLEDGE: WorkflowMode.KNOWLEDGE_LOOP, + WorkflowMode.TOOL: WorkflowMode.TOOL_LOOP}.get( + self.workflow_manage.flow.workflow_mode) or WorkflowMode.APPLICATION + c = {WorkflowMode.APPLICATION_LOOP: LoopWorkflowManage, + WorkflowMode.KNOWLEDGE_LOOP: KnowledgeLoopWorkflowManage, + WorkflowMode.TOOL_LOOP: ToolLoopWorkflowManage}.get(workflow_mode) or LoopWorkflowManage workflow_manage = c(Workflow.new_instance(loop_body, workflow_mode), self.workflow_manage.params, LoopWorkFlowPostHandler( diff --git a/apps/application/flow/step_node/tool_workflow_lib_node/__init__.py b/apps/application/flow/step_node/tool_workflow_lib_node/__init__.py new file mode 100644 index 00000000000..d417d531251 --- /dev/null +++ b/apps/application/flow/step_node/tool_workflow_lib_node/__init__.py @@ -0,0 +1,9 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎虎 + @file: __init__.py.py + @date:2026/3/16 13:53 + @desc: +""" +from .impl import * \ No newline at end of file diff --git a/apps/application/flow/step_node/tool_workflow_lib_node/i_tool_workflow_lib_node.py b/apps/application/flow/step_node/tool_workflow_lib_node/i_tool_workflow_lib_node.py new file mode 100644 index 00000000000..82b73d0904b --- /dev/null +++ b/apps/application/flow/step_node/tool_workflow_lib_node/i_tool_workflow_lib_node.py @@ -0,0 +1,57 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎 + @file: i_function_lib_node.py + @date:2024/8/8 16:21 + @desc: +""" +from typing import Type + +from django.db import connection +from django.db.models import QuerySet +from django.utils.translation import gettext_lazy as _ +from rest_framework import serializers + +from application.flow.common import WorkflowMode +from application.flow.i_step_node import INode, NodeResult +from common.field.common import ObjectField +from tools.models.tool import Tool, ToolType + + +class InputField(serializers.Serializer): + field = serializers.CharField(required=True, label=_('Variable Name')) + label = serializers.CharField(required=True, label=_('Variable Label')) + source = serializers.CharField(required=True, label=_('Variable Source')) + type = serializers.CharField(required=True, label=_('Variable Type')) + value = ObjectField(required=True, label=_("Variable Value"), model_type_list=[str, list, bool, dict, int, float]) + + +class FunctionLibNodeParamsSerializer(serializers.Serializer): + tool_lib_id = serializers.UUIDField(required=True, label=_('Library ID')) + input_field_list = InputField(required=True, many=True) + is_result = serializers.BooleanField(required=False, + label=_('Whether to return content')) + + def is_valid(self, *, raise_exception=False): + super().is_valid(raise_exception=True) + f_lib = QuerySet(Tool).filter(id=self.data.get('tool_lib_id'), tool_type=ToolType.WORKFLOW).first() + # 归还链接到连接池 + connection.close() + if f_lib is None: + raise Exception(_('The function has been deleted')) + + +class IToolWorkflowLibNode(INode): + type = 'tool-workflow-lib-node' + support = [WorkflowMode.APPLICATION, WorkflowMode.APPLICATION_LOOP, WorkflowMode.KNOWLEDGE, + WorkflowMode.KNOWLEDGE_LOOP, WorkflowMode.TOOL, WorkflowMode.TOOL_LOOP] + + def get_node_params_serializer_class(self) -> Type[serializers.Serializer]: + return FunctionLibNodeParamsSerializer + + def _run(self): + return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data) + + def execute(self, tool_lib_id, input_field_list, **kwargs) -> NodeResult: + pass diff --git a/apps/application/flow/step_node/tool_workflow_lib_node/impl/__init__.py b/apps/application/flow/step_node/tool_workflow_lib_node/impl/__init__.py new file mode 100644 index 00000000000..0b593554784 --- /dev/null +++ b/apps/application/flow/step_node/tool_workflow_lib_node/impl/__init__.py @@ -0,0 +1,9 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎虎 + @file: __init__.py.py + @date:2026/3/16 13:53 + @desc: +""" +from .base_tool_workflow_lib_node import * diff --git a/apps/application/flow/step_node/tool_workflow_lib_node/impl/base_tool_workflow_lib_node.py b/apps/application/flow/step_node/tool_workflow_lib_node/impl/base_tool_workflow_lib_node.py new file mode 100644 index 00000000000..49b1501ffef --- /dev/null +++ b/apps/application/flow/step_node/tool_workflow_lib_node/impl/base_tool_workflow_lib_node.py @@ -0,0 +1,214 @@ +# coding=utf-8 +""" + @project: MaxKB + @Author:虎虎 + @file: base_tool_workflow_lib_node.py.py + @date:2026/3/16 13:55 + @desc: +""" + +import time +from typing import Dict + +import uuid_utils.compat as uuid +from django.db.models import QuerySet +from django.utils.translation import gettext_lazy as _ + +from application.flow.common import WorkflowMode, Workflow +from application.flow.i_step_node import NodeResult, ToolWorkflowPostHandler, INode +from application.flow.step_node.tool_workflow_lib_node.i_tool_workflow_lib_node import IToolWorkflowLibNode +from application.models import ChatRecord +from application.serializers.common import ToolExecute +from common.exception.app_exception import ChatException +from common.handle.impl.response.loop_to_response import LoopToResponse +from tools.models import ToolWorkflowVersion + + +def _write_context(node_variable: Dict, workflow_variable: Dict, node: INode, workflow, answer: str, + reasoning_content: str): + result = node_variable.get('result') + node.context['application_node_dict'] = node_variable.get('application_node_dict') + node.context['node_dict'] = node_variable.get('node_dict', {}) + node.context['is_interrupt_exec'] = node_variable.get('is_interrupt_exec') + node.context['message_tokens'] = result.get('usage', {}).get('prompt_tokens', 0) + node.context['answer_tokens'] = result.get('usage', {}).get('completion_tokens', 0) + node.context['answer'] = answer + node.context['result'] = answer + node.context['reasoning_content'] = reasoning_content + node.context['run_time'] = time.time() - node.context['start_time'] + if workflow.is_result(node, NodeResult(node_variable, workflow_variable)): + node.answer_text = answer + + +def get_answer_list(instance, child_node_node_dict, runtime_node_id): + answer_list = instance.get_record_answer_list() + for a in answer_list: + _v = child_node_node_dict.get(a.get('runtime_node_id')) + if _v: + a['runtime_node_id'] = runtime_node_id + a['child_node'] = _v + return answer_list + + +def write_context_stream(node_variable: Dict, workflow_variable: Dict, node: INode, workflow): + """ + 写入上下文数据 (流式) + @param node_variable: 节点数据 + @param workflow_variable: 全局数据 + @param node: 节点 + @param workflow: 工作流管理器 + """ + workflow_manage_new_instance = node_variable.get('workflow_manage_new_instance') + node_params = node.node_params + start_node_id = node_params.get('child_node', {}).get('runtime_node_id') + child_node_data = node.context.get('child_node_data') or [] + start_node_data = None + chat_record = None + child_node = None + if start_node_id: + chat_record_id = node_params.get('child_node', {}).get('chat_record_id') + child_node = node_params.get('child_node', {}).get('child_node') + start_node_data = node_params.get('node_data') + chat_record = ChatRecord(id=chat_record_id, answer_text_list=[], answer_text='', + details=child_node_data) + instance = workflow_manage_new_instance(start_node_id, + start_node_data, chat_record, child_node) + answer = '' + reasoning_content = '' + usage = {} + node_child_node = {} + is_interrupt_exec = False + response = instance.stream() + child_node_node_dict = {} + for chunk in response: + response_content = chunk + content = (response_content.get('content', '') or '') + runtime_node_id = response_content.get('runtime_node_id', '') + chat_record_id = response_content.get('chat_record_id', '') + child_node = response_content.get('child_node') + node_type = response_content.get('node_type') + _reasoning_content = (response_content.get('reasoning_content', '') or '') + if node_type == 'form-node': + is_interrupt_exec = True + answer += content + reasoning_content += _reasoning_content + node_child_node = {'runtime_node_id': runtime_node_id, 'chat_record_id': chat_record_id, + 'child_node': child_node} + + child_node = chunk.get('child_node') + runtime_node_id = chunk.get('runtime_node_id', '') + chat_record_id = chunk.get('chat_record_id', '') + child_node_node_dict[runtime_node_id] = { + 'runtime_node_id': runtime_node_id, + 'chat_record_id': chat_record_id, + 'child_node': child_node} + content_chunk = (chunk.get('content', '') or '') + reasoning_content_chunk = (chunk.get('reasoning_content', '') or '') + reasoning_content += reasoning_content_chunk + answer += content_chunk + yield chunk + usage = response_content.get('usage', {}) + child_answer_data = get_answer_list(instance, child_node_node_dict, node.runtime_node_id) + node.context['usage'] = {'usage': usage} + node.context['is_interrupt_exec'] = is_interrupt_exec + node.context['child_node'] = node_child_node + node.context['child_node_data'] = instance.get_runtime_details() + node.context['is_interrupt_exec'] = is_interrupt_exec + node.context['child_node_data'] = instance.get_runtime_details() + node.context['child_answer_data'] = child_answer_data + node.context['run_time'] = time.time() - node.context.get("start_time") + for key, value in instance.out_context.items(): + node.context[key] = value + + +def _is_interrupt_exec(node, node_variable: Dict, workflow_variable: Dict): + return node.context.get('is_interrupt_exec', False) + + +class BaseToolWorkflowLibNodeNode(IToolWorkflowLibNode): + def get_parameters(self, input_field_list): + result = {} + for input in input_field_list: + source = input.get('source') + value = input.get('value') + if source == 'reference': + value = self.workflow_manage.get_reference_field( + value[0], + value[1:]) + result[input.get('field')] = value + + return result + + def save_context(self, details, workflow_manage): + self.context['child_answer_data'] = details.get('child_answer_data') + self.context['child_node_data'] = details.get('child_node_data') + self.context['result'] = details.get('result') + self.context['exception_message'] = details.get('err_message') + if self.node_params.get('is_result'): + self.answer_text = str(details.get('result')) + + @staticmethod + def to_chat_record(record): + if record is None: + return None + return ChatRecord( + answer_text_list=record.meta.get('answer_text_list'), + details=record.meta.get('details'), + answer_text='', + ) + + def execute(self, tool_lib_id, input_field_list, **kwargs) -> NodeResult: + from application.flow.tool_workflow_manage import ToolWorkflowManage + workspace_id = self.workflow_manage.get_body().get('workspace_id') + tool_workflow_version = QuerySet(ToolWorkflowVersion).filter(tool_id=tool_lib_id).order_by( + '-create_time')[0:1].first() + if tool_workflow_version is None: + raise ChatException(500, _("The tool has not been published. Please use it after publishing.")) + parameters = self.get_parameters(input_field_list) + tool_record_id = (self.node_params.get('child_node') or {}).get('chat_record_id') or str(uuid.uuid7()) + took_execute = ToolExecute(tool_lib_id, tool_record_id, + workspace_id, + self.workflow_manage.get_source_type(), + self.workflow_manage.get_source_id(), + False) + + def workflow_manage_new_instance(start_node_id=None, + start_node_data=None, chat_record=None, child_node=None): + work_flow_manage = ToolWorkflowManage( + Workflow.new_instance(tool_workflow_version.work_flow, WorkflowMode.TOOL), + { + 'chat_record_id': tool_record_id, + 'tool_id': tool_lib_id, + 'stream': True, + 'workspace_id': workspace_id, + **parameters}, + ToolWorkflowPostHandler(took_execute, tool_lib_id), + base_to_response=LoopToResponse(), + start_node_id=start_node_id, + start_node_data=start_node_data, + child_node=child_node, + chat_record=self.to_chat_record(took_execute.get_record()), + is_the_task_interrupted=lambda: False) + + return work_flow_manage + + return NodeResult({'workflow_manage_new_instance': workflow_manage_new_instance}, + {}, _write_context=write_context_stream, + _is_interrupt=_is_interrupt_exec) + + def get_details(self, index: int, **kwargs): + result = self.context.get('result') + + return { + 'name': self.node.properties.get('stepName'), + "index": index, + "result": result, + "params": self.context.get('params'), + 'run_time': self.context.get('run_time'), + 'type': self.node.type, + 'status': self.status, + 'child_node_data': self.context.get("child_node_data"), + 'child_answer_data': self.context.get("child_answer_data"), + 'err_message': self.err_message, + 'enableException': self.node.properties.get('enableException'), + } diff --git a/apps/application/flow/tool_loop_workflow_manage.py b/apps/application/flow/tool_loop_workflow_manage.py new file mode 100644 index 00000000000..9fc2425f014 --- /dev/null +++ b/apps/application/flow/tool_loop_workflow_manage.py @@ -0,0 +1,21 @@ +# coding=utf-8 +""" + @project: maxkb + @Author:虎 + @file: workflow_manage.py + @date:2024/1/9 17:40 + @desc: +""" +from application.flow.i_step_node import ToolFlowParamsSerializer +from application.flow.loop_workflow_manage import LoopWorkflowManage + + +class ToolLoopWorkflowManage(LoopWorkflowManage): + def get_params_serializer_class(self): + return ToolFlowParamsSerializer + + def get_source_type(self): + return "TOOL" + + def get_source_id(self): + return self.params.get('tool_id') diff --git a/apps/application/flow/tool_workflow_manage.py b/apps/application/flow/tool_workflow_manage.py index 431045d716a..4b42983e592 100644 --- a/apps/application/flow/tool_workflow_manage.py +++ b/apps/application/flow/tool_workflow_manage.py @@ -8,6 +8,9 @@ """ from concurrent.futures import ThreadPoolExecutor +from django.db import close_old_connections +from django.utils.translation import get_language + from application.flow.common import Workflow from application.flow.i_step_node import WorkFlowPostHandler, ToolFlowParamsSerializer from application.flow.workflow_manage import WorkflowManage @@ -29,6 +32,12 @@ def __init__(self, flow: Workflow, params, work_flow_post_handler: WorkFlowPostH def get_params_serializer_class(self): return ToolFlowParamsSerializer + def stream(self): + close_old_connections() + language = get_language() + self.run_chain_async(self.start_node, None, language) + return self.await_result(is_cleanup=False) + def get_start_node(self): return self.flow.get_node('tool-start-node') @@ -38,3 +47,9 @@ def get_base_node(self): @return: """ return self.flow.get_node('tool-base-node') + + def get_source_type(self): + return "TOOL" + + def get_source_id(self): + return self.params.get('tool_id') diff --git a/apps/application/flow/workflow_manage.py b/apps/application/flow/workflow_manage.py index 0bffd0c5b3d..3d5ebdd7ec6 100644 --- a/apps/application/flow/workflow_manage.py +++ b/apps/application/flow/workflow_manage.py @@ -192,9 +192,7 @@ def load_node(self, chat_record, start_node_id, start_node_data): if node_details.get('runtime_node_id') == start_node_id: def get_node_params(n): is_result = False - if n.type == 'application-node': - is_result = True - if n.type == 'loop-node': + if ['application-node', 'loop-node', 'tool-workflow-lib-node'].__contains__(n.type): is_result = True return {**n.properties.get('node_data'), 'form_data': start_node_data, 'node_data': start_node_data, 'child_node': self.child_node, 'is_result': is_result} @@ -798,3 +796,9 @@ def get_node_reference(self, reference_address: Dict): def get_params_serializer_class(self): return FlowParamsSerializer + + def get_source_type(self): + return "APPLICATION" + + def get_source_id(self): + return self.params.get('application_id') diff --git a/apps/application/serializers/common.py b/apps/application/serializers/common.py index 68f1a9e7de0..a4d193cdacc 100644 --- a/apps/application/serializers/common.py +++ b/apps/application/serializers/common.py @@ -22,6 +22,75 @@ from models_provider.models import Model from models_provider.tools import get_model_credential from system_manage.models.resource_mapping import ResourceMapping +from tools.models import ToolRecord + + +class ToolExecute: + def __init__(self, tool_id: str, + tool_record_id: str, + workspace_id: str, + source_type, + source_id, + debug=False): + self.tool_id = tool_id + self.workspace_id = workspace_id + self.source_type = source_type + self.source_id = source_id + self.tool_record_id = tool_record_id + self.debug = debug + + def get_record(self): + if self.tool_record_id: + if self.debug: + return self.to_record(cache.get(Cache_Version.TOOL_WORKFLOW_EXECUTE.get_key(key=self.tool_record_id), + version=Cache_Version.TOOL_WORKFLOW_EXECUTE.get_version())) + else: + return QuerySet(ToolRecord).filter(tool_id=self.tool_id, id=self.tool_record_id).first() + return None + + def to_record(self, tool_record_dict): + if tool_record_dict is None: + return None + return ToolRecord(id=tool_record_dict.get('id'), + tool_id=tool_record_dict.get('tool_id'), + workspace_id=tool_record_dict.get('workspace_id'), + source_type=tool_record_dict.get('source_type'), + source_id=tool_record_dict.get('source_id'), + meta=tool_record_dict.get('meta'), + state=tool_record_dict.get('state'), + run_time=tool_record_dict.get('run_time')) + + def to_dict(self, tool_record): + return {'id': tool_record.id, + 'tool_id': tool_record.tool_id, + 'workspace_id': tool_record.workspace_id, + 'source_type': tool_record.source_type, + 'source_id': tool_record.source_id, + 'meta': tool_record.meta, + 'state': tool_record.state, + 'run_time': tool_record.run_time} + + def set_record(self, tool_record): + cache.set(Cache_Version.TOOL_WORKFLOW_EXECUTE.get_key(key=self.tool_record_id), self.to_dict(tool_record), + version=Cache_Version.TOOL_WORKFLOW_EXECUTE.get_version(), + timeout=60 * 30) + if not self.debug: + QuerySet(ToolRecord).update_or_create(id=tool_record.id, + create_defaults={'id': tool_record.id, + 'tool_id': tool_record.tool_id, + 'workspace_id': tool_record.workspace_id, + "source_type": tool_record.source_type, + 'source_id': tool_record.source_id, + 'meta': tool_record.meta, + 'run_time': tool_record.run_time}, + defaults={ + 'workspace_id': tool_record.workspace_id, + 'tool_id': tool_record.tool_id, + "source_type": tool_record.source_type, + 'source_id': tool_record.source_id, + 'meta': tool_record.meta, + 'run_time': tool_record.run_time + }) class ChatInfo: diff --git a/apps/common/constants/cache_version.py b/apps/common/constants/cache_version.py index 1b202dc8e07..0aed4715e2e 100644 --- a/apps/common/constants/cache_version.py +++ b/apps/common/constants/cache_version.py @@ -39,6 +39,8 @@ class Cache_Version(Enum): CHAT_USER_TOKEN = "CHAT_USER_TOKEN", lambda token: token + TOOL_WORKFLOW_EXECUTE = "TOOL_WORKFLOW_EXECUTE", lambda key: key + def get_version(self): return self.value[0] diff --git a/apps/tools/models/tool.py b/apps/tools/models/tool.py index 99ecfb08069..5f4d5bac334 100644 --- a/apps/tools/models/tool.py +++ b/apps/tools/models/tool.py @@ -42,6 +42,7 @@ class ToolType(models.TextChoices): class ToolTaskTypeChoices(models.TextChoices): APPLICATION = 'APPLICATION' KNOWLEDGE = 'KNOWLEDGE' + TOOL = 'TOOL' TRIGGER = 'TRIGGER' diff --git a/apps/tools/serializers/tool_workflow.py b/apps/tools/serializers/tool_workflow.py index 674cb375983..cffe8c2a6db 100644 --- a/apps/tools/serializers/tool_workflow.py +++ b/apps/tools/serializers/tool_workflow.py @@ -23,6 +23,8 @@ from application.flow.common import Workflow, WorkflowMode from application.flow.i_step_node import ToolWorkflowPostHandler from application.flow.tool_workflow_manage import ToolWorkflowManage +from application.models import ChatRecord +from application.serializers.common import ToolExecute from common.exception.app_exception import AppApiException from common.field.common import UploadedFileField from common.result import result @@ -220,19 +222,43 @@ def debug(self, instance: Dict, user, with_valid=True): if with_valid: self.is_valid(raise_exception=True) tool_workflow = QuerySet(ToolWorkflow).filter(tool_id=self.data.get("tool_id")).first() + tool_record_id = instance.get('chat_record_id') or str(uuid.uuid7()) + took_execute = ToolExecute(self.data.get("tool_id"), tool_record_id, + self.data.get("workspace_id"), + None, + None, + True) + record = took_execute.get_record() work_flow_manage = ToolWorkflowManage( Workflow.new_instance(tool_workflow.work_flow, WorkflowMode.TOOL), { + 'chat_record_id': tool_record_id, 'tool_id': self.data.get("tool_id"), 'stream': True, 'workspace_id': self.data.get("workspace_id"), **instance}, - ToolWorkflowPostHandler(None, self.data.get("tool_id")), - is_the_task_interrupted=lambda: False) + + ToolWorkflowPostHandler(took_execute, self.data.get("tool_id")), + is_the_task_interrupted=lambda: False, + child_node=instance.get('child_node'), + start_node_id=instance.get('runtime_node_id'), + start_node_data=instance.get('node_data'), + chat_record=self.to_chat_record(record) + ) r = work_flow_manage.run() return r + @staticmethod + def to_chat_record(record): + if record is None: + return None + return ChatRecord( + answer_text_list=record.meta.get('answer_text_list'), + details=record.meta.get('details'), + answer_text='', + ) + def publish(self, with_valid=True): if with_valid: self.is_valid() diff --git a/ui/src/views/tool-workflow/component/debug/result/index.vue b/ui/src/views/tool-workflow/component/debug/result/index.vue index 7a556be3a6e..891e20568e8 100644 --- a/ui/src/views/tool-workflow/component/debug/result/index.vue +++ b/ui/src/views/tool-workflow/component/debug/result/index.vue @@ -35,7 +35,12 @@ const details = { show_avatar: false, show_user_avatar: false, } +const currentToolId = ref() +const currentData = ref({}) const execute = (toolId: string, data: any) => { + console.log('execute') + currentToolId.value = toolId + currentData.value = data ChatManagement.addChatRecord(currentChat, 50, loading) ChatManagement.write(currentChat.id) return loadSharedApi({ type: 'tool', isShared: props.isShared, systemType: props.apiType }) @@ -158,25 +163,34 @@ const getWrite = (chat: any, reader: any, stream: boolean) => { return stream ? write_stream : write_json } -function chatMessage(toolId: string, chat?: any, other_params_data?: any) { - if (!chat) { - chat = reactive({}) - chatList.value.push(chat) - ChatManagement.addChatRecord(chat, 50, loading) - ChatManagement.write(chat.id) - } - if (chat.run_time) { - ChatManagement.addChatRecord(chat, 50, loading) - ChatManagement.write(chat.id) - } - const obj = { - ...other_params_data, - } - // 对话 - execute(toolId, obj) -} -const sendMessage = () => { - console.log('ss') + +const sendMessage = (val: string, other_params_data?: any, chat?: chatType) => { + loadSharedApi({ type: 'tool', isShared: props.isShared, systemType: props.apiType }) + .debugToolWorkflow(currentToolId.value, { ...other_params_data, ...currentData.value }) + .then((response: any) => { + if (response.status === 460) { + return Promise.reject(t('chat.tip.errorIdentifyMessage')) + } else if (response.status === 461) { + return Promise.reject(t('chat.tip.errorLimitMessage')) + } else { + const reader = response.body.getReader() + // 处理流数据 + const write = getWrite( + currentChat, + reader, + response.headers.get('Content-Type') !== 'application/json', + ) + return write() + } + }) + .finally(() => { + console.log('close') + ChatManagement.close(currentChat.id) + }) + .catch((e: any) => { + console.log(e) + }) + return Promise.resolve(true) } defineExpose({ execute, diff --git a/ui/src/workflow/nodes/tool-start-node/index.ts b/ui/src/workflow/nodes/tool-start-node/index.ts index 2ff937d0699..5191bf4ec34 100644 --- a/ui/src/workflow/nodes/tool-start-node/index.ts +++ b/ui/src/workflow/nodes/tool-start-node/index.ts @@ -15,7 +15,7 @@ class ToolBaseNode extends AppNode { }) const tbn = this.props.graphModel.getNodeModelById('tool-base-node') console.log(tbn) - const output = tbn.properties.user_output_field_list.map((i: any) => { + const output = tbn.properties?.user_output_field_list?.map((i: any) => { return { label: i.label || i.name, value: i.field } }) diff --git a/ui/src/workflow/nodes/tool-workflow-lib-node/index.vue b/ui/src/workflow/nodes/tool-workflow-lib-node/index.vue index 91933a99807..ad579d50461 100644 --- a/ui/src/workflow/nodes/tool-workflow-lib-node/index.vue +++ b/ui/src/workflow/nodes/tool-workflow-lib-node/index.vue @@ -195,6 +195,9 @@ const update_field = () => { } onMounted(() => { + if (props.nodeModel.properties.config?.fields?.length) { + set(props.nodeModel.properties.config, 'fields', props.nodeModel.properties.config.fields) + } if (typeof props.nodeModel.properties.node_data?.is_result === 'undefined') { if (isLastNode(props.nodeModel)) { set(props.nodeModel.properties.node_data, 'is_result', true)