Skip to content

feat: Tool workflow node execution#4895

Merged
shaohuzhang1 merged 2 commits intotool-workflowfrom
pr@tool-workflow@feat_tool_workflow_node
Mar 17, 2026
Merged

feat: Tool workflow node execution#4895
shaohuzhang1 merged 2 commits intotool-workflowfrom
pr@tool-workflow@feat_tool_workflow_node

Conversation

@shaohuzhang1
Copy link
Contributor

feat: Tool workflow node execution

@f2c-ci-robot
Copy link

f2c-ci-robot bot commented Mar 17, 2026

Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@f2c-ci-robot
Copy link

f2c-ci-robot bot commented Mar 17, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@shaohuzhang1 shaohuzhang1 merged commit 2fe7b3b into tool-workflow Mar 17, 2026
3 checks passed
@shaohuzhang1 shaohuzhang1 deleted the pr@tool-workflow@feat_tool_workflow_node branch March 17, 2026 11:29
'child_answer_data': self.context.get("child_answer_data"),
'err_message': self.err_message,
'enableException': self.node.properties.get('enableException'),
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

Overall Structure and Comments

  • A clean structure with well-defined comments explaining key parts.
  • Documentation at the top provides context about the project, authors, file name, and description.

Import Statements

  • Properly imported dependencies that are used:
    import time
    from typing import Dict
    import uuid_utils.compat as uuid
    from django.db.models import QuerySet
    from django.utils.translation import gettext_lazy as _
    
    from application.flow.common import WorkflowMode, Workflow
    from application.flow.i_step_node import NodeResult, ToolWorkflowPostHandler, INode
    from application.flow.step_node.tool_workflow_lib_node.i_tool_workflow_lib_node import IToolWorkflowLibNode
    from application.models import ChatRecord
    from application.serializers.common import ToolExecute
    from common.exception.app_exception import ChatException
    from common.handle.impl.response.loop_to_response import LoopToResponse
    from tools.models import ToolWorkflowVersion

Function _write_context

  • This function prepares nodes’ data into a dictionary for further processing. It's useful for consistency in storing output from different processes within a workflow.
  • answer is directly assigned to result, suggesting this might be intended to store both text and metadata together.

Method get_answer_list

  • Iterates over records' answers and updates their corresponding references based on the child_node_node_dict.
  • Useful for maintaining reference clarity during multiple execution cycles of the same query.

Method write_context_stream

  • Implements stream handling using instance.stream() method which yields chunks of responses.
  • It populates fields like answer, reasoning_content, usage, etc., and writes them back into the node’s context.
  • Calls itself recursively if more responses are available.
  • Optimization Suggestions for Streaming Method:

    • Ensure efficient memory usage by caching intermediate results rather than reprocessing each chunk fully before yielding the next one.
    • Implement throttling logic where necessary to prevent overwhelming the backend system due to high-rate requests.

Static Method to_chat_record

  • Converts an abstract record object into a concrete ChatRecord. Handles edge cases (e.g., when record is null).

Class BaseToolWorkflowLibNode

  • Initializes all properties in its constructor through super().
  • Uses static methods get_parameters and save_context effectively.

Method execute

  • Constructs the tool execution plan by initializing required components including new instances, queries and handlers.
  • Ensures proper logging and message management around API calls using ToolExecute.

Method get_details

  • Creates detailed status summaries for each step, ensuring consistent access to relevant information without redundant lookups.

Potential Issues Identified

Concurrency Concerns

  • The execute method contains asynchronous operations (stream) which could lead to concurrency issues if not handled properly (i.e., thread safety and locks).
  • Consider implementing threading mechanisms or async-aware contexts in Django/Django Rest Framework.

Performance Bottlenecks

  • Stream processing requires careful consideration; ensure optimal chunk sizes sent to keep latency low while maximizing throughput efficiency.
  • Use middleware or other optimizations specific to your framework/platform to optimize performance.

Final Recommendations:

  1. Stream Handling Concurrency: Add appropriate error checking, exception handling, and lock acquisition/destruction throughout critical sections of code involving parallel streams to maintain robustness and reliability.

  2. Performance Tunings: Optimize internal buffer management strategies specifically related to streaming to reduce unnecessary computations and improve response times under heavy load conditions.

  3. Error Management Across Steps: Enhance debugging and tracing capabilities across steps so that failure messages and logs convey accurate contextual details, making root cause analysis simpler.

Overall, the provided code seems sound but shows promise once tuned and optimized according to typical development practices and constraints of production systems.

})


class ChatInfo:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code has several areas that need improvement:

  1. Unused Imports: The import sys line is not used anywhere in the code and can be removed.

  2. Typographical Errors: There are some typos in variable names like Cach_ion.TOOL_WORKFLOW_EXECUTE, which should likely be cache.CACHE_VERSION.TOOL_WORKFLOW_EXECUTE.

  3. Redundant Code: The method set_record could be simplified using Django ORM's built-in functionality rather than manually querying and updating the database.

  4. Debugging Output: The debugging output seems to log cache values, but it doesn't directly contribute to the core logic of the code.

Here’s an optimized version of the ToolExecute class:

from apps.models import CacheVersion
from apps.db_tool.model import ToolRecord
from django.db.models.querysets import QuerySet

class ToolExecute:
    def __init__(self, tool_id: str,
                 tool_record_id: str,
                 workspace_id: str,
                 source_type,
                 source_id,
                 debug=False):
        self.tool_id = tool_id
        self.tool_record_id = tool_record_id
        self.workspace_id = workspace_id
        self.source_type = source_type
        self.source_id = source_id
        self.debug = debug

    def get_record(self):
        if self.tool_record_id:
            if self.debug:
                # Assuming this function exists to fetch cached data
                return self.to_record(cache.get(
                    CacheVersion.TOOL_WORKFLOW_EXECUTE.get_key(key=self.tool_record_id),
                    version=CacheVersion.TOOL_WORKFLOW_EXECUTE.get_version()
                ))
            else:
                try:
                    return (
                        ToolRecord.objects.prefetch_related().get(
                            tool_id=self.tool_id, id=self.tool_record_id
                        )
                    )
                except ToolRecord.DoesNotExist:
                    pass
        return None

    @staticmethod
    def to_record(tool_record_dict):
        if not tool_record_dict:
            return None
        return ToolRecord(**tool_record_dict)

    @staticmethod
    def to_dict(tool_record):
        return {
            'id': tool_record.id,
            'tool_id': tool_record.tool_id,
            'workspace_id': tool_record.workspace_id,
            'source_type': tool_record.source_type,
            'source_id': tool_record.source_id,
            'meta': tool_record.meta,
            'state': tool_record.state,
            'run_time': tool_record.run_time
        }

    def set_record(self, tool_record):
        record_id = tool_record.id
        cache.set(
            CacheVersion.TOOL_WORKFLOW_EXECUTE.get_key(key=str(record_id)),
            self.to_dict(tool_record),
            version=CacheVersion.TOOL_WORKFLOW_EXECUTE.version,
            timeout=60 * 30
        )
        ToolRecord.objects.update_or_create(
            id=record_id,
            defaults={
                'workspac_id': tool_record.workspace_id,
                'tool_id': tool_record.tool_id,
                'source_type': tool_record.source_type,
                'source_id': tool_record.source_id,
                'meta': tool_record.meta,
                'run_time': tool_record.run_time
            }
        )

Key Changes:

  • Removed unused imports (sys).
  • Corrected typographical errors.
  • Simplified the use of try-except blocks in get_record.
  • Extracted conversion methods (e.g., to_record) for better readability and separation of concerns.
  • Used static methods for common conversion functions (to_record, to_dict).

return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data)

def execute(self, tool_lib_id, input_field_list, **kwargs) -> NodeResult:
pass
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your code is well-written, but there are a few suggestions and improvements that can be made for better performance and readability:

  1. Closing Connections: Ensure that connections are properly closed after operations rather than relying on connection.close(). This method should only be used temporarily or explicitly when it's guaranteed to prevent resource leaks.

  2. Code Formatting: Adjust the indentation levels for consistency within functions like _run and execute.

  3. Error Handling in Execution: When you're fetching data using queryset methods like filter, make sure to handle cases where no results are found correctly and throw exceptions appropriately unless caught elsewhere.

Here’s how you could modify your code with these considerations:

@@ -0,0 +1,57 @@
+# coding=utf-8
+"""
+    @project: MaxKB
+    @Author:虎
+    @file: i_function_lib_node.py
+    @date:2024/8/8 16:21
+    @desc:
+"""
+from typing import Type

+from django.db.models import QuerySet
+from django.utils.translation import gettext_lazy as _
+from rest_framework import serializers

+from application.flow.common import WorkflowMode
+from application.flow.i_step_node import INode, NodeResult
+from common.field.common import ObjectField
+from tools.models.tool import Tool, ToolType


+class InputField(serializers.Serializer):
+    field = serializers.CharField(required=True, label=_('Variable Name'))
+    label = serializers.CharField(required=True, label=_('Variable Label'))
+    source = serializers.CharField(required=True, label=_('Variable Source'))
+    type = serializers.CharField(required=True, label=_('Variable Type'))
+    value = ObjectField(required=True, label=_("Variable Value"), model_type_list=[str, list, bool, dict, int, float])

+
+class FunctionLibNodeParamsSerializer(serializers.Serializer):
+    tool_lib_id = serializers.UUIDField(required=True, label=_('Library ID'))
+    input_field_list = InputField(many=True)
+    is_result = serializers.BooleanField(required=False, label=_('Whether to return content'))

+    def is_valid(self, *, raise_exception=False):
+        super().is_valid(raise_exception=True)
+        
+        try:
+            f_lib = QuerySet(Tool).filter(
+                id=self.data.get('tool_lib_id'),
+                tool_type=ToolType.WORKFLOW
+            ).first()
            
+            if not f_lib:
+                raise Exception(_("The function has been deleted"))
+
+        except Exception as e:
+            return self.add_error(None, str(e))
+```

### Key Improvements Made:
- Improved formatting of nested structures like dictionaries and lists.
- Ensured proper handling of error messages during validation.
- Removed redundant calls to close database connections as they aren't necessary.
  
This version maintains the current functionality while providing additional clarity in error handling and structuring.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant