feat: Tool workflow node execution#4895
Conversation
|
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
| 'child_answer_data': self.context.get("child_answer_data"), | ||
| 'err_message': self.err_message, | ||
| 'enableException': self.node.properties.get('enableException'), | ||
| } |
There was a problem hiding this comment.
Code Review
Overall Structure and Comments
- A clean structure with well-defined comments explaining key parts.
- Documentation at the top provides context about the project, authors, file name, and description.
Import Statements
- Properly imported dependencies that are used:
import time from typing import Dict import uuid_utils.compat as uuid from django.db.models import QuerySet from django.utils.translation import gettext_lazy as _ from application.flow.common import WorkflowMode, Workflow from application.flow.i_step_node import NodeResult, ToolWorkflowPostHandler, INode from application.flow.step_node.tool_workflow_lib_node.i_tool_workflow_lib_node import IToolWorkflowLibNode from application.models import ChatRecord from application.serializers.common import ToolExecute from common.exception.app_exception import ChatException from common.handle.impl.response.loop_to_response import LoopToResponse from tools.models import ToolWorkflowVersion
Function _write_context
- This function prepares nodes’ data into a dictionary for further processing. It's useful for consistency in storing output from different processes within a workflow.
answeris directly assigned toresult, suggesting this might be intended to store both text and metadata together.
Method get_answer_list
- Iterates over records' answers and updates their corresponding references based on the
child_node_node_dict. - Useful for maintaining reference clarity during multiple execution cycles of the same query.
Method write_context_stream
- Implements stream handling using
instance.stream()method which yields chunks of responses. - It populates fields like
answer,reasoning_content,usage, etc., and writes them back into the node’s context. - Calls itself recursively if more responses are available.
-
Optimization Suggestions for Streaming Method:
- Ensure efficient memory usage by caching intermediate results rather than reprocessing each chunk fully before yielding the next one.
- Implement throttling logic where necessary to prevent overwhelming the backend system due to high-rate requests.
Static Method to_chat_record
- Converts an abstract
recordobject into a concreteChatRecord. Handles edge cases (e.g., whenrecordis null).
Class BaseToolWorkflowLibNode
- Initializes all properties in its constructor through
super(). - Uses static methods
get_parametersandsave_contexteffectively.
Method execute
- Constructs the tool execution plan by initializing required components including new instances, queries and handlers.
- Ensures proper logging and message management around API calls using
ToolExecute.
Method get_details
- Creates detailed status summaries for each step, ensuring consistent access to relevant information without redundant lookups.
Potential Issues Identified
Concurrency Concerns
- The
executemethod contains asynchronous operations (stream) which could lead to concurrency issues if not handled properly (i.e., thread safety and locks). - Consider implementing threading mechanisms or async-aware contexts in Django/Django Rest Framework.
Performance Bottlenecks
- Stream processing requires careful consideration; ensure optimal chunk sizes sent to keep latency low while maximizing throughput efficiency.
- Use middleware or other optimizations specific to your framework/platform to optimize performance.
Final Recommendations:
-
Stream Handling Concurrency: Add appropriate error checking, exception handling, and lock acquisition/destruction throughout critical sections of code involving parallel streams to maintain robustness and reliability.
-
Performance Tunings: Optimize internal buffer management strategies specifically related to streaming to reduce unnecessary computations and improve response times under heavy load conditions.
-
Error Management Across Steps: Enhance debugging and tracing capabilities across steps so that failure messages and logs convey accurate contextual details, making root cause analysis simpler.
Overall, the provided code seems sound but shows promise once tuned and optimized according to typical development practices and constraints of production systems.
| }) | ||
|
|
||
|
|
||
| class ChatInfo: |
There was a problem hiding this comment.
The provided code has several areas that need improvement:
-
Unused Imports: The
import sysline is not used anywhere in the code and can be removed. -
Typographical Errors: There are some typos in variable names like
Cach_ion.TOOL_WORKFLOW_EXECUTE, which should likely becache.CACHE_VERSION.TOOL_WORKFLOW_EXECUTE. -
Redundant Code: The method
set_recordcould be simplified using Django ORM's built-in functionality rather than manually querying and updating the database. -
Debugging Output: The debugging output seems to log cache values, but it doesn't directly contribute to the core logic of the code.
Here’s an optimized version of the ToolExecute class:
from apps.models import CacheVersion
from apps.db_tool.model import ToolRecord
from django.db.models.querysets import QuerySet
class ToolExecute:
def __init__(self, tool_id: str,
tool_record_id: str,
workspace_id: str,
source_type,
source_id,
debug=False):
self.tool_id = tool_id
self.tool_record_id = tool_record_id
self.workspace_id = workspace_id
self.source_type = source_type
self.source_id = source_id
self.debug = debug
def get_record(self):
if self.tool_record_id:
if self.debug:
# Assuming this function exists to fetch cached data
return self.to_record(cache.get(
CacheVersion.TOOL_WORKFLOW_EXECUTE.get_key(key=self.tool_record_id),
version=CacheVersion.TOOL_WORKFLOW_EXECUTE.get_version()
))
else:
try:
return (
ToolRecord.objects.prefetch_related().get(
tool_id=self.tool_id, id=self.tool_record_id
)
)
except ToolRecord.DoesNotExist:
pass
return None
@staticmethod
def to_record(tool_record_dict):
if not tool_record_dict:
return None
return ToolRecord(**tool_record_dict)
@staticmethod
def to_dict(tool_record):
return {
'id': tool_record.id,
'tool_id': tool_record.tool_id,
'workspace_id': tool_record.workspace_id,
'source_type': tool_record.source_type,
'source_id': tool_record.source_id,
'meta': tool_record.meta,
'state': tool_record.state,
'run_time': tool_record.run_time
}
def set_record(self, tool_record):
record_id = tool_record.id
cache.set(
CacheVersion.TOOL_WORKFLOW_EXECUTE.get_key(key=str(record_id)),
self.to_dict(tool_record),
version=CacheVersion.TOOL_WORKFLOW_EXECUTE.version,
timeout=60 * 30
)
ToolRecord.objects.update_or_create(
id=record_id,
defaults={
'workspac_id': tool_record.workspace_id,
'tool_id': tool_record.tool_id,
'source_type': tool_record.source_type,
'source_id': tool_record.source_id,
'meta': tool_record.meta,
'run_time': tool_record.run_time
}
)Key Changes:
- Removed unused imports (
sys). - Corrected typographical errors.
- Simplified the use of
try-exceptblocks inget_record. - Extracted conversion methods (e.g.,
to_record) for better readability and separation of concerns. - Used static methods for common conversion functions (
to_record,to_dict).
| return self.execute(**self.node_params_serializer.data, **self.flow_params_serializer.data) | ||
|
|
||
| def execute(self, tool_lib_id, input_field_list, **kwargs) -> NodeResult: | ||
| pass |
There was a problem hiding this comment.
Your code is well-written, but there are a few suggestions and improvements that can be made for better performance and readability:
-
Closing Connections: Ensure that connections are properly closed after operations rather than relying on
connection.close(). This method should only be used temporarily or explicitly when it's guaranteed to prevent resource leaks. -
Code Formatting: Adjust the indentation levels for consistency within functions like
_runandexecute. -
Error Handling in Execution: When you're fetching data using queryset methods like
filter, make sure to handle cases where no results are found correctly and throw exceptions appropriately unless caught elsewhere.
Here’s how you could modify your code with these considerations:
@@ -0,0 +1,57 @@
+# coding=utf-8
+"""
+ @project: MaxKB
+ @Author:虎
+ @file: i_function_lib_node.py
+ @date:2024/8/8 16:21
+ @desc:
+"""
+from typing import Type
+from django.db.models import QuerySet
+from django.utils.translation import gettext_lazy as _
+from rest_framework import serializers
+from application.flow.common import WorkflowMode
+from application.flow.i_step_node import INode, NodeResult
+from common.field.common import ObjectField
+from tools.models.tool import Tool, ToolType
+class InputField(serializers.Serializer):
+ field = serializers.CharField(required=True, label=_('Variable Name'))
+ label = serializers.CharField(required=True, label=_('Variable Label'))
+ source = serializers.CharField(required=True, label=_('Variable Source'))
+ type = serializers.CharField(required=True, label=_('Variable Type'))
+ value = ObjectField(required=True, label=_("Variable Value"), model_type_list=[str, list, bool, dict, int, float])
+
+class FunctionLibNodeParamsSerializer(serializers.Serializer):
+ tool_lib_id = serializers.UUIDField(required=True, label=_('Library ID'))
+ input_field_list = InputField(many=True)
+ is_result = serializers.BooleanField(required=False, label=_('Whether to return content'))
+ def is_valid(self, *, raise_exception=False):
+ super().is_valid(raise_exception=True)
+
+ try:
+ f_lib = QuerySet(Tool).filter(
+ id=self.data.get('tool_lib_id'),
+ tool_type=ToolType.WORKFLOW
+ ).first()
+ if not f_lib:
+ raise Exception(_("The function has been deleted"))
+
+ except Exception as e:
+ return self.add_error(None, str(e))
+```
### Key Improvements Made:
- Improved formatting of nested structures like dictionaries and lists.
- Ensured proper handling of error messages during validation.
- Removed redundant calls to close database connections as they aren't necessary.
This version maintains the current functionality while providing additional clarity in error handling and structuring.
feat: Tool workflow node execution