Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion apps/application/flow/i_step_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,19 @@ def __init__(self, chat_info, tool_id):

def handler(self, workflow):
state = get_workflow_state(workflow)
ToolRecord(tool_id=self.tool_id)
record = ToolRecord(id=self.chat_info.tool_record_id, tool_id=self.tool_id,
workspace_id=self.chat_info.workspace_id,
source_type=self.chat_info.source_type,
source_id=self.chat_info.source_id,
state=state,
meta={
'output': workflow.out_context,
'details': workflow.get_runtime_details(),
'answer_text_list': workflow.get_answer_text_list()
})
self.chat_info.set_record(record)
self.chat_info = None
self.tool_id = None


def get_loop_workflow_node(node_list):
Expand Down
6 changes: 6 additions & 0 deletions apps/application/flow/knowledge_loop_workflow_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@
class KnowledgeLoopWorkflowManage(LoopWorkflowManage):
def get_params_serializer_class(self):
return KnowledgeFlowParamsSerializer

def get_source_type(self):
return "KNOWLEDGE"

def get_source_id(self):
return self.params.get('knowledge_id')
6 changes: 6 additions & 0 deletions apps/application/flow/knowledge_workflow_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,9 @@ def hand_node_result(self, current_node, node_result_future):
current_node.node_chunk.end()
QuerySet(KnowledgeAction).filter(id=self.params.get('knowledge_action_id')).update(
details=self.get_runtime_details())

def get_source_type(self):
return "KNOWLEDGE"

def get_source_id(self):
return self.params.get('knowledge_id')
6 changes: 6 additions & 0 deletions apps/application/flow/loop_workflow_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,9 @@ def generate_prompt(self, prompt: str):
prompt_template = PromptTemplate.from_template(prompt, template_format='jinja2')
value = prompt_template.format(context=context)
return value

def get_source_type(self):
return "APPLICATION"

def get_source_id(self):
return self.params.get('application_id')
3 changes: 2 additions & 1 deletion apps/application/flow/step_node/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from .text_to_video_step_node.impl.base_text_to_video_node import BaseTextToVideoNode
from .tool_lib_node import *
from .tool_node import *
from .tool_workflow_lib_node import BaseToolWorkflowLibNodeNode
from .variable_aggregation_node.impl.base_variable_aggregation_node import BaseVariableAggregationNode
from .variable_assign_node import BaseVariableAssignNode
from .variable_splitting_node import BaseVariableSplittingNode
Expand All @@ -53,7 +54,7 @@
BaseLoopContinueNode,
BaseLoopBreakNode, BaseVariableSplittingNode, BaseParameterExtractionNode, BaseVariableAggregationNode,
BaseDataSourceLocalNode, BaseDataSourceWebNode, BaseKnowledgeWriteNode, BaseDocumentSplitNode,
BaseToolStartStepNode]
BaseToolStartStepNode, BaseToolWorkflowLibNodeNode]

node_map = {n.type: {w: n for w in n.support} for n in node_list}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ def get_answer_list(self) -> List[Answer] | None:
value = prompt_template.format(form=form, context=context, runtime_node_id=self.runtime_node_id,
chat_record_id=self.flow_params_serializer.data.get("chat_record_id"),
form_field_list=form_field_list)
return [Answer(value, self.view_type, self.runtime_node_id, self.workflow_params['chat_record_id'], None,
self.runtime_node_id, '')]
return [
Answer(value, self.view_type, self.runtime_node_id, self.workflow_params.get('chat_record_id') or '', None,
self.runtime_node_id, '')]

def get_details(self, index: int, **kwargs):
form_content_format = self.context.get('form_content_format')
Expand Down
10 changes: 8 additions & 2 deletions apps/application/flow/step_node/loop_node/impl/base_loop_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,16 @@ def get_loop_context(self):
def execute(self, loop_type, array, number, loop_body, **kwargs) -> NodeResult:
from application.flow.loop_workflow_manage import LoopWorkflowManage, Workflow
from application.flow.knowledge_loop_workflow_manage import KnowledgeLoopWorkflowManage
from application.flow.tool_loop_workflow_manage import ToolLoopWorkflowManage
def workflow_manage_new_instance(loop_data, global_data, start_node_id=None,
start_node_data=None, chat_record=None, child_node=None):
workflow_mode = WorkflowMode.KNOWLEDGE_LOOP if WorkflowMode.KNOWLEDGE == self.workflow_manage.flow.workflow_mode else WorkflowMode.APPLICATION_LOOP
c = KnowledgeLoopWorkflowManage if workflow_mode == WorkflowMode.KNOWLEDGE_LOOP else LoopWorkflowManage
workflow_mode = {WorkflowMode.APPLICATION: WorkflowMode.APPLICATION_LOOP,
WorkflowMode.KNOWLEDGE: WorkflowMode.KNOWLEDGE_LOOP,
WorkflowMode.TOOL: WorkflowMode.TOOL_LOOP}.get(
self.workflow_manage.flow.workflow_mode) or WorkflowMode.APPLICATION
c = {WorkflowMode.APPLICATION_LOOP: LoopWorkflowManage,
WorkflowMode.KNOWLEDGE_LOOP: KnowledgeLoopWorkflowManage,
WorkflowMode.TOOL_LOOP: ToolLoopWorkflowManage}.get(workflow_mode) or LoopWorkflowManage
workflow_manage = c(Workflow.new_instance(loop_body, workflow_mode),
self.workflow_manage.params,
LoopWorkFlowPostHandler(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: __init__.py.py
@date:2026/3/16 13:53
@desc:
"""
from .impl import *
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎
@file: i_function_lib_node.py
@date:2024/8/8 16:21
@desc:
"""
from typing import Type

from django.db import connection
from django.db.models import QuerySet
from django.utils.translation import gettext_lazy as _
from rest_framework import serializers

from application.flow.common import WorkflowMode
from application.flow.i_step_node import INode, NodeResult
from common.field.common import ObjectField
from tools.models.tool import Tool, ToolType


class InputField(serializers.Serializer):
field = serializers.CharField(required=True, label=_('Variable Name'))
label = serializers.CharField(required=True, label=_('Variable Label'))
source = serializers.CharField(required=True, label=_('Variable Source'))
type = serializers.CharField(required=True, label=_('Variable Type'))
value = ObjectField(required=True, label=_("Variable Value"), model_type_list=[str, list, bool, dict, int, float])


class FunctionLibNodeParamsSerializer(serializers.Serializer):
tool_lib_id = serializers.UUIDField(required=True, label=_('Library ID'))
input_field_list = InputField(required=True, many=True)
is_result = serializers.BooleanField(required=False,
label=_('Whether to return content'))

def is_valid(self, *, raise_exception=False):
super().is_valid(raise_exception=True)
f_lib = QuerySet(Tool).filter(id=self.data.get('tool_lib_id'), tool_type=ToolType.WORKFLOW).first()
# 归还链接到连接池
connection.close()
if f_lib is None:
raise Exception(_('The function has been deleted'))


class IToolWorkflowLibNode(INode):
type = 'tool-workflow-lib-node'
support = [WorkflowMode.APPLICATION, WorkflowMode.APPLICATION_LOOP, WorkflowMode.KNOWLEDGE,
WorkflowMode.KNOWLEDGE_LOOP, WorkflowMode.TOOL, WorkflowMode.TOOL_LOOP]

def get_node_params_serializer_class(self) -> Type[serializers.Serializer]:
return FunctionLibNodeParamsSerializer

def _run(self):
return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)

def execute(self, tool_lib_id, input_field_list, **kwargs) -> NodeResult:
pass
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your code is well-written, but there are a few suggestions and improvements that can be made for better performance and readability:

  1. Closing Connections: Ensure that connections are properly closed after operations rather than relying on connection.close(). This method should only be used temporarily or explicitly when it's guaranteed to prevent resource leaks.

  2. Code Formatting: Adjust the indentation levels for consistency within functions like _run and execute.

  3. Error Handling in Execution: When you're fetching data using queryset methods like filter, make sure to handle cases where no results are found correctly and throw exceptions appropriately unless caught elsewhere.

Here’s how you could modify your code with these considerations:

@@ -0,0 +1,57 @@
+# coding=utf-8
+"""
+    @project: MaxKB
+    @Author:虎
+    @file: i_function_lib_node.py
+    @date:2024/8/8 16:21
+    @desc:
+"""
+from typing import Type

+from django.db.models import QuerySet
+from django.utils.translation import gettext_lazy as _
+from rest_framework import serializers

+from application.flow.common import WorkflowMode
+from application.flow.i_step_node import INode, NodeResult
+from common.field.common import ObjectField
+from tools.models.tool import Tool, ToolType


+class InputField(serializers.Serializer):
+    field = serializers.CharField(required=True, label=_('Variable Name'))
+    label = serializers.CharField(required=True, label=_('Variable Label'))
+    source = serializers.CharField(required=True, label=_('Variable Source'))
+    type = serializers.CharField(required=True, label=_('Variable Type'))
+    value = ObjectField(required=True, label=_("Variable Value"), model_type_list=[str, list, bool, dict, int, float])

+
+class FunctionLibNodeParamsSerializer(serializers.Serializer):
+    tool_lib_id = serializers.UUIDField(required=True, label=_('Library ID'))
+    input_field_list = InputField(many=True)
+    is_result = serializers.BooleanField(required=False, label=_('Whether to return content'))

+    def is_valid(self, *, raise_exception=False):
+        super().is_valid(raise_exception=True)
+        
+        try:
+            f_lib = QuerySet(Tool).filter(
+                id=self.data.get('tool_lib_id'),
+                tool_type=ToolType.WORKFLOW
+            ).first()
            
+            if not f_lib:
+                raise Exception(_("The function has been deleted"))
+
+        except Exception as e:
+            return self.add_error(None, str(e))
+```

### Key Improvements Made:
- Improved formatting of nested structures like dictionaries and lists.
- Ensured proper handling of error messages during validation.
- Removed redundant calls to close database connections as they aren't necessary.
  
This version maintains the current functionality while providing additional clarity in error handling and structuring.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: __init__.py.py
@date:2026/3/16 13:53
@desc:
"""
from .base_tool_workflow_lib_node import *
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: base_tool_workflow_lib_node.py.py
@date:2026/3/16 13:55
@desc:
"""

import time
from typing import Dict

import uuid_utils.compat as uuid
from django.db.models import QuerySet
from django.utils.translation import gettext_lazy as _

from application.flow.common import WorkflowMode, Workflow
from application.flow.i_step_node import NodeResult, ToolWorkflowPostHandler, INode
from application.flow.step_node.tool_workflow_lib_node.i_tool_workflow_lib_node import IToolWorkflowLibNode
from application.models import ChatRecord
from application.serializers.common import ToolExecute
from common.exception.app_exception import ChatException
from common.handle.impl.response.loop_to_response import LoopToResponse
from tools.models import ToolWorkflowVersion


def _write_context(node_variable: Dict, workflow_variable: Dict, node: INode, workflow, answer: str,
reasoning_content: str):
result = node_variable.get('result')
node.context['application_node_dict'] = node_variable.get('application_node_dict')
node.context['node_dict'] = node_variable.get('node_dict', {})
node.context['is_interrupt_exec'] = node_variable.get('is_interrupt_exec')
node.context['message_tokens'] = result.get('usage', {}).get('prompt_tokens', 0)
node.context['answer_tokens'] = result.get('usage', {}).get('completion_tokens', 0)
node.context['answer'] = answer
node.context['result'] = answer
node.context['reasoning_content'] = reasoning_content
node.context['run_time'] = time.time() - node.context['start_time']
if workflow.is_result(node, NodeResult(node_variable, workflow_variable)):
node.answer_text = answer


def get_answer_list(instance, child_node_node_dict, runtime_node_id):
answer_list = instance.get_record_answer_list()
for a in answer_list:
_v = child_node_node_dict.get(a.get('runtime_node_id'))
if _v:
a['runtime_node_id'] = runtime_node_id
a['child_node'] = _v
return answer_list


def write_context_stream(node_variable: Dict, workflow_variable: Dict, node: INode, workflow):
"""
写入上下文数据 (流式)
@param node_variable: 节点数据
@param workflow_variable: 全局数据
@param node: 节点
@param workflow: 工作流管理器
"""
workflow_manage_new_instance = node_variable.get('workflow_manage_new_instance')
node_params = node.node_params
start_node_id = node_params.get('child_node', {}).get('runtime_node_id')
child_node_data = node.context.get('child_node_data') or []
start_node_data = None
chat_record = None
child_node = None
if start_node_id:
chat_record_id = node_params.get('child_node', {}).get('chat_record_id')
child_node = node_params.get('child_node', {}).get('child_node')
start_node_data = node_params.get('node_data')
chat_record = ChatRecord(id=chat_record_id, answer_text_list=[], answer_text='',
details=child_node_data)
instance = workflow_manage_new_instance(start_node_id,
start_node_data, chat_record, child_node)
answer = ''
reasoning_content = ''
usage = {}
node_child_node = {}
is_interrupt_exec = False
response = instance.stream()
child_node_node_dict = {}
for chunk in response:
response_content = chunk
content = (response_content.get('content', '') or '')
runtime_node_id = response_content.get('runtime_node_id', '')
chat_record_id = response_content.get('chat_record_id', '')
child_node = response_content.get('child_node')
node_type = response_content.get('node_type')
_reasoning_content = (response_content.get('reasoning_content', '') or '')
if node_type == 'form-node':
is_interrupt_exec = True
answer += content
reasoning_content += _reasoning_content
node_child_node = {'runtime_node_id': runtime_node_id, 'chat_record_id': chat_record_id,
'child_node': child_node}

child_node = chunk.get('child_node')
runtime_node_id = chunk.get('runtime_node_id', '')
chat_record_id = chunk.get('chat_record_id', '')
child_node_node_dict[runtime_node_id] = {
'runtime_node_id': runtime_node_id,
'chat_record_id': chat_record_id,
'child_node': child_node}
content_chunk = (chunk.get('content', '') or '')
reasoning_content_chunk = (chunk.get('reasoning_content', '') or '')
reasoning_content += reasoning_content_chunk
answer += content_chunk
yield chunk
usage = response_content.get('usage', {})
child_answer_data = get_answer_list(instance, child_node_node_dict, node.runtime_node_id)
node.context['usage'] = {'usage': usage}
node.context['is_interrupt_exec'] = is_interrupt_exec
node.context['child_node'] = node_child_node
node.context['child_node_data'] = instance.get_runtime_details()
node.context['is_interrupt_exec'] = is_interrupt_exec
node.context['child_node_data'] = instance.get_runtime_details()
node.context['child_answer_data'] = child_answer_data
node.context['run_time'] = time.time() - node.context.get("start_time")
for key, value in instance.out_context.items():
node.context[key] = value


def _is_interrupt_exec(node, node_variable: Dict, workflow_variable: Dict):
return node.context.get('is_interrupt_exec', False)


class BaseToolWorkflowLibNodeNode(IToolWorkflowLibNode):
def get_parameters(self, input_field_list):
result = {}
for input in input_field_list:
source = input.get('source')
value = input.get('value')
if source == 'reference':
value = self.workflow_manage.get_reference_field(
value[0],
value[1:])
result[input.get('field')] = value

return result

def save_context(self, details, workflow_manage):
self.context['child_answer_data'] = details.get('child_answer_data')
self.context['child_node_data'] = details.get('child_node_data')
self.context['result'] = details.get('result')
self.context['exception_message'] = details.get('err_message')
if self.node_params.get('is_result'):
self.answer_text = str(details.get('result'))

@staticmethod
def to_chat_record(record):
if record is None:
return None
return ChatRecord(
answer_text_list=record.meta.get('answer_text_list'),
details=record.meta.get('details'),
answer_text='',
)

def execute(self, tool_lib_id, input_field_list, **kwargs) -> NodeResult:
from application.flow.tool_workflow_manage import ToolWorkflowManage
workspace_id = self.workflow_manage.get_body().get('workspace_id')
tool_workflow_version = QuerySet(ToolWorkflowVersion).filter(tool_id=tool_lib_id).order_by(
'-create_time')[0:1].first()
if tool_workflow_version is None:
raise ChatException(500, _("The tool has not been published. Please use it after publishing."))
parameters = self.get_parameters(input_field_list)
tool_record_id = (self.node_params.get('child_node') or {}).get('chat_record_id') or str(uuid.uuid7())
took_execute = ToolExecute(tool_lib_id, tool_record_id,
workspace_id,
self.workflow_manage.get_source_type(),
self.workflow_manage.get_source_id(),
False)

def workflow_manage_new_instance(start_node_id=None,
start_node_data=None, chat_record=None, child_node=None):
work_flow_manage = ToolWorkflowManage(
Workflow.new_instance(tool_workflow_version.work_flow, WorkflowMode.TOOL),
{
'chat_record_id': tool_record_id,
'tool_id': tool_lib_id,
'stream': True,
'workspace_id': workspace_id,
**parameters},
ToolWorkflowPostHandler(took_execute, tool_lib_id),
base_to_response=LoopToResponse(),
start_node_id=start_node_id,
start_node_data=start_node_data,
child_node=child_node,
chat_record=self.to_chat_record(took_execute.get_record()),
is_the_task_interrupted=lambda: False)

return work_flow_manage

return NodeResult({'workflow_manage_new_instance': workflow_manage_new_instance},
{}, _write_context=write_context_stream,
_is_interrupt=_is_interrupt_exec)

def get_details(self, index: int, **kwargs):
result = self.context.get('result')

return {
'name': self.node.properties.get('stepName'),
"index": index,
"result": result,
"params": self.context.get('params'),
'run_time': self.context.get('run_time'),
'type': self.node.type,
'status': self.status,
'child_node_data': self.context.get("child_node_data"),
'child_answer_data': self.context.get("child_answer_data"),
'err_message': self.err_message,
'enableException': self.node.properties.get('enableException'),
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

Overall Structure and Comments

  • A clean structure with well-defined comments explaining key parts.
  • Documentation at the top provides context about the project, authors, file name, and description.

Import Statements

  • Properly imported dependencies that are used:
    import time
    from typing import Dict
    import uuid_utils.compat as uuid
    from django.db.models import QuerySet
    from django.utils.translation import gettext_lazy as _
    
    from application.flow.common import WorkflowMode, Workflow
    from application.flow.i_step_node import NodeResult, ToolWorkflowPostHandler, INode
    from application.flow.step_node.tool_workflow_lib_node.i_tool_workflow_lib_node import IToolWorkflowLibNode
    from application.models import ChatRecord
    from application.serializers.common import ToolExecute
    from common.exception.app_exception import ChatException
    from common.handle.impl.response.loop_to_response import LoopToResponse
    from tools.models import ToolWorkflowVersion

Function _write_context

  • This function prepares nodes’ data into a dictionary for further processing. It's useful for consistency in storing output from different processes within a workflow.
  • answer is directly assigned to result, suggesting this might be intended to store both text and metadata together.

Method get_answer_list

  • Iterates over records' answers and updates their corresponding references based on the child_node_node_dict.
  • Useful for maintaining reference clarity during multiple execution cycles of the same query.

Method write_context_stream

  • Implements stream handling using instance.stream() method which yields chunks of responses.
  • It populates fields like answer, reasoning_content, usage, etc., and writes them back into the node’s context.
  • Calls itself recursively if more responses are available.
  • Optimization Suggestions for Streaming Method:

    • Ensure efficient memory usage by caching intermediate results rather than reprocessing each chunk fully before yielding the next one.
    • Implement throttling logic where necessary to prevent overwhelming the backend system due to high-rate requests.

Static Method to_chat_record

  • Converts an abstract record object into a concrete ChatRecord. Handles edge cases (e.g., when record is null).

Class BaseToolWorkflowLibNode

  • Initializes all properties in its constructor through super().
  • Uses static methods get_parameters and save_context effectively.

Method execute

  • Constructs the tool execution plan by initializing required components including new instances, queries and handlers.
  • Ensures proper logging and message management around API calls using ToolExecute.

Method get_details

  • Creates detailed status summaries for each step, ensuring consistent access to relevant information without redundant lookups.

Potential Issues Identified

Concurrency Concerns

  • The execute method contains asynchronous operations (stream) which could lead to concurrency issues if not handled properly (i.e., thread safety and locks).
  • Consider implementing threading mechanisms or async-aware contexts in Django/Django Rest Framework.

Performance Bottlenecks

  • Stream processing requires careful consideration; ensure optimal chunk sizes sent to keep latency low while maximizing throughput efficiency.
  • Use middleware or other optimizations specific to your framework/platform to optimize performance.

Final Recommendations:

  1. Stream Handling Concurrency: Add appropriate error checking, exception handling, and lock acquisition/destruction throughout critical sections of code involving parallel streams to maintain robustness and reliability.

  2. Performance Tunings: Optimize internal buffer management strategies specifically related to streaming to reduce unnecessary computations and improve response times under heavy load conditions.

  3. Error Management Across Steps: Enhance debugging and tracing capabilities across steps so that failure messages and logs convey accurate contextual details, making root cause analysis simpler.

Overall, the provided code seems sound but shows promise once tuned and optimized according to typical development practices and constraints of production systems.

Loading
Loading