-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Python: fix: filter history providers in handoff cloning to prevent duplicate messages #5214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,17 +38,15 @@ | |
| from dataclasses import dataclass | ||
| from typing import Any | ||
|
|
||
| from agent_framework import Agent, SupportsAgentRun | ||
| from agent_framework._middleware import FunctionInvocationContext, FunctionMiddleware | ||
| from agent_framework._sessions import AgentSession | ||
| from agent_framework import Agent, AgentResponse, Message, SupportsAgentRun | ||
| from agent_framework._middleware import FunctionInvocationContext, FunctionMiddleware, MiddlewareTermination | ||
| from agent_framework._sessions import AgentSession, BaseHistoryProvider, InMemoryHistoryProvider | ||
| from agent_framework._tools import FunctionTool, tool | ||
| from agent_framework._types import AgentResponse, Content, Message | ||
| from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest | ||
| from agent_framework._workflows._agent_utils import resolve_agent_id | ||
| from agent_framework._workflows._checkpoint import CheckpointStorage | ||
| from agent_framework._workflows._events import WorkflowEvent | ||
| from agent_framework._workflows._request_info_mixin import response_handler | ||
| from agent_framework._workflows._typing_utils import is_chat_agent | ||
| from agent_framework._workflows._workflow import Workflow | ||
| from agent_framework._workflows._workflow_builder import WorkflowBuilder | ||
| from agent_framework._workflows._workflow_context import WorkflowContext | ||
|
|
@@ -138,8 +136,6 @@ async def process( | |
| await call_next() | ||
| return | ||
|
|
||
| from agent_framework._middleware import MiddlewareTermination | ||
|
|
||
| # Short-circuit execution and provide deterministic response payload for the tool call. | ||
| # Parse the result using the default parser to ensure in a form that can be passed directly to LLM APIs. | ||
| context.result = FunctionTool.parse_result({ | ||
|
|
@@ -163,15 +159,15 @@ def create_response(response: str | list[str] | Message | list[Message]) -> list | |
| """Create a HandoffAgentUserRequest from a simple text response.""" | ||
| messages: list[Message] = [] | ||
| if isinstance(response, str): | ||
| messages.append(Message(role="user", text=response)) | ||
| messages.append(Message(role="user", contents=[response])) | ||
| elif isinstance(response, Message): | ||
| messages.append(response) | ||
| elif isinstance(response, list): | ||
| for item in response: | ||
| if isinstance(item, Message): | ||
| messages.append(item) | ||
| elif isinstance(item, str): | ||
| messages.append(Message(role="user", text=item)) | ||
| messages.append(Message(role="user", contents=[item])) | ||
| else: | ||
| raise TypeError("List items must be either str or Message instances") | ||
| else: | ||
|
|
@@ -265,88 +261,6 @@ def _prepare_agent_with_handoffs( | |
|
|
||
| return cloned_agent | ||
|
|
||
| def _persist_pending_approval_function_calls(self) -> None: | ||
| """Persist pending approval function calls for stateless provider resumes. | ||
|
|
||
| Handoff workflows force ``store=False`` and replay conversation state from ``_full_conversation``. | ||
| When a run pauses on function approval, ``AgentExecutor`` returns ``None`` and the assistant | ||
| function-call message is not returned as an ``AgentResponse``. Without persisting that call, the | ||
| next turn may submit only a function result, which responses-style APIs reject. | ||
| """ | ||
| pending_calls: list[Content] = [] | ||
| for request in self._pending_agent_requests.values(): | ||
| if request.type != "function_approval_request": | ||
| continue | ||
| function_call = getattr(request, "function_call", None) | ||
| if isinstance(function_call, Content) and function_call.type == "function_call": | ||
| pending_calls.append(function_call) | ||
|
|
||
| if not pending_calls: | ||
| return | ||
|
|
||
| self._full_conversation.append( | ||
| Message( | ||
| role="assistant", | ||
| contents=pending_calls, | ||
| author_name=self._agent.name, | ||
| ) | ||
| ) | ||
|
|
||
| def _persist_missing_approved_function_results( | ||
| self, | ||
| *, | ||
| runtime_tool_messages: list[Message], | ||
| response_messages: list[Message], | ||
| ) -> None: | ||
| """Persist fallback function_result entries for approved calls when missing. | ||
|
|
||
| In approval resumes, function invocation can execute approved tools without | ||
| always surfacing those tool outputs in the returned ``AgentResponse.messages``. | ||
| For stateless handoff replays, we must keep call/output pairs balanced. | ||
| """ | ||
| candidate_results: dict[str, Content] = {} | ||
| for message in runtime_tool_messages: | ||
| for content in message.contents: | ||
| if content.type == "function_result": | ||
| call_id = getattr(content, "call_id", None) | ||
| if isinstance(call_id, str) and call_id: | ||
| candidate_results[call_id] = content | ||
| continue | ||
|
|
||
| if content.type != "function_approval_response" or not content.approved: | ||
| continue | ||
|
|
||
| function_call = getattr(content, "function_call", None) | ||
| call_id = getattr(function_call, "call_id", None) or getattr(content, "id", None) | ||
| if isinstance(call_id, str) and call_id and call_id not in candidate_results: | ||
| # Fallback content for approved calls when runtime messages do not include | ||
| # a concrete function_result payload. | ||
| candidate_results[call_id] = Content.from_function_result( | ||
| call_id=call_id, | ||
| result='{"status":"approved"}', | ||
| ) | ||
|
|
||
| if not candidate_results: | ||
| return | ||
|
|
||
| observed_result_call_ids: set[str] = set() | ||
| for message in [*self._full_conversation, *response_messages]: | ||
| for content in message.contents: | ||
| if content.type == "function_result" and isinstance(content.call_id, str) and content.call_id: | ||
| observed_result_call_ids.add(content.call_id) | ||
|
|
||
| missing_call_ids = sorted(set(candidate_results.keys()) - observed_result_call_ids) | ||
| if not missing_call_ids: | ||
| return | ||
|
|
||
| self._full_conversation.append( | ||
| Message( | ||
| role="tool", | ||
| contents=[candidate_results[call_id] for call_id in missing_call_ids], | ||
| author_name=self._agent.name, | ||
| ) | ||
| ) | ||
|
|
||
| def _clone_chat_agent(self, agent: Agent[Any]) -> Agent[Any]: | ||
| """Produce a deep copy of the Agent while preserving runtime configuration.""" | ||
| options = agent.default_options | ||
|
|
@@ -362,19 +276,39 @@ def _clone_chat_agent(self, agent: Agent[Any]) -> Agent[Any]: | |
| cloned_options = deepcopy(options) | ||
| # Disable parallel tool calls to prevent the agent from invoking multiple handoff tools at once. | ||
| cloned_options["allow_multiple_tool_calls"] = False | ||
| cloned_options["store"] = False | ||
| cloned_options["tools"] = new_tools | ||
|
|
||
| # restore the original tools, in case they are shared between agents | ||
| options["tools"] = tools_from_options | ||
|
|
||
| # Filter out history providers to prevent duplicate messages. | ||
| # The HandoffAgentExecutor manages conversation history via _full_conversation, | ||
| # so history providers would re-inject previously stored messages on each | ||
| # agent.run() call, causing the entire conversation to appear twice. | ||
| # A no-op InMemoryHistoryProvider placeholder prevents the agent from | ||
| # auto-injecting a default one at runtime. | ||
| filtered_providers = [ | ||
| p for p in agent.context_providers | ||
| if not isinstance(p, BaseHistoryProvider) | ||
| ] | ||
| # Always add a no-op placeholder to prevent the agent from | ||
| # auto-injecting a default InMemoryHistoryProvider at runtime. | ||
| filtered_providers.append( | ||
| InMemoryHistoryProvider( | ||
| load_messages=False, | ||
| store_inputs=False, | ||
| store_outputs=False, | ||
| ) | ||
| ) | ||
|
|
||
| return Agent( | ||
| client=agent.client, | ||
| id=agent.id, | ||
| name=agent.name, | ||
| description=agent.description, | ||
| context_providers=agent.context_providers, | ||
| context_providers=filtered_providers, | ||
| middleware=agent.agent_middleware, | ||
| require_per_service_call_history_persistence=agent.require_per_service_call_history_persistence, | ||
| default_options=cloned_options, # type: ignore[assignment] | ||
|
Comment on lines
304
to
312
|
||
| ) | ||
|
|
||
|
|
@@ -427,45 +361,15 @@ def _handoff_tool() -> None: | |
| @override | ||
| async def _run_agent_and_emit(self, ctx: WorkflowContext[Any, Any]) -> None: | ||
| """Override to support handoff.""" | ||
| incoming_messages = list(self._cache) | ||
| cleaned_incoming_messages = clean_conversation_for_handoff(incoming_messages) | ||
| runtime_tool_messages = [ | ||
| message | ||
| for message in incoming_messages | ||
| if any( | ||
| content.type | ||
| in { | ||
| "function_result", | ||
| "function_approval_response", | ||
| } | ||
| for content in message.contents | ||
| ) | ||
| or message.role == "tool" | ||
| ] | ||
|
|
||
| # When the full conversation is empty, it means this is the first run. | ||
| # Broadcast the initial cache to all other agents. Subsequent runs won't | ||
| # need this since responses are broadcast after each agent run and user input. | ||
| if self._is_start_agent and not self._full_conversation: | ||
| await self._broadcast_messages(cleaned_incoming_messages, ctx) | ||
|
|
||
| # Persist only cleaned chat history between turns to avoid replaying stale tool calls. | ||
| self._full_conversation.extend(cleaned_incoming_messages) | ||
|
|
||
| # Always run with full conversation context for request_info resumes. | ||
| # Keep runtime tool-control messages for this run only (e.g., approval responses). | ||
| self._cache = list(self._full_conversation) | ||
| self._cache.extend(runtime_tool_messages) | ||
|
|
||
| # Handoff workflows are orchestrator-stateful and provider-stateless by design. | ||
| # If an existing session still has a service conversation id, clear it to avoid | ||
| # replaying stale unresolved tool calls across resumed turns. | ||
| if ( | ||
| is_chat_agent(self._agent) | ||
| and self._agent.default_options.get("store") is False | ||
| and self._session.service_session_id is not None | ||
| ): | ||
| self._session.service_session_id = None | ||
| await self._broadcast_messages(self._cache.copy(), ctx) | ||
|
|
||
| # Full conversation maintains the chat history between agents across handoffs, | ||
| # excluding internal agent messages such as tool calls and results. | ||
| self._full_conversation.extend(self._cache.copy()) | ||
|
|
||
| # Check termination condition before running the agent | ||
| if await self._check_terminate_and_yield(ctx): | ||
|
|
@@ -484,36 +388,35 @@ async def _run_agent_and_emit(self, ctx: WorkflowContext[Any, Any]) -> None: | |
|
|
||
| # A function approval request is issued by the base AgentExecutor | ||
| if response is None: | ||
| if is_chat_agent(self._agent) and self._agent.default_options.get("store") is False: | ||
| self._persist_pending_approval_function_calls() | ||
| # Agent did not complete (e.g., waiting for user input); do not emit response | ||
| logger.debug("AgentExecutor %s: Agent did not complete, awaiting user input", self.id) | ||
| return | ||
|
|
||
| # Remove function call related content from the agent response for broadcast. | ||
| # This prevents replaying stale tool artifacts to other agents. | ||
| # Remove function call related content from the agent response for full conversation history | ||
| cleaned_response = clean_conversation_for_handoff(response.messages) | ||
|
|
||
| # For internal tracking, preserve the full response (including function_calls) | ||
| # in _full_conversation so that Azure OpenAI can match function_calls with | ||
| # function_results when the workflow resumes after user approvals. | ||
| self._full_conversation.extend(response.messages) | ||
| self._persist_missing_approved_function_results( | ||
| runtime_tool_messages=runtime_tool_messages, | ||
| response_messages=response.messages, | ||
| ) | ||
| # Append the agent response to the full conversation history. This list removes | ||
| # function call related content such that the result stays consistent regardless | ||
| # of which agent yields the final output. | ||
| self._full_conversation.extend(cleaned_response) | ||
|
|
||
| # Broadcast only the cleaned response to other agents (without function_calls/results) | ||
| await self._broadcast_messages(cleaned_response, ctx) | ||
|
|
||
| # Check if a handoff was requested | ||
| if handoff_target := self._is_handoff_requested(response): | ||
| if is_handoff_requested := self._is_handoff_requested(response): | ||
| handoff_target, handoff_message = is_handoff_requested | ||
| if handoff_target not in self._handoff_targets: | ||
| raise ValueError( | ||
| f"Agent '{resolve_agent_id(self._agent)}' attempted to handoff to unknown " | ||
| f"target '{handoff_target}'. Valid targets are: {', '.join(self._handoff_targets)}" | ||
| ) | ||
|
|
||
| # Add the handoff message to the cache so that the next invocation of the agent includes | ||
| # the tool call result. This is necessary because each tool call must have a corresponding | ||
| # tool result. | ||
| self._cache.append(handoff_message) | ||
|
|
||
| await ctx.send_message( | ||
| AgentExecutorRequest(messages=[], should_respond=True), | ||
| target_id=handoff_target, | ||
|
|
@@ -536,7 +439,7 @@ async def _run_agent_and_emit(self, ctx: WorkflowContext[Any, Any]) -> None: | |
| # or a termination condition is met. | ||
| # This allows the agent to perform long-running tasks without returning control | ||
| # to the coordinator or user prematurely. | ||
| self._cache.extend([Message(role="user", text=self._autonomous_mode_prompt)]) | ||
| self._cache.extend([Message(role="user", contents=[self._autonomous_mode_prompt])]) | ||
| self._autonomous_mode_turns += 1 | ||
| await self._run_agent_and_emit(ctx) | ||
| else: | ||
|
|
@@ -590,12 +493,25 @@ async def _broadcast_messages( | |
| # Since all agents are connected via fan-out, we can directly send the message | ||
| await ctx.send_message(agent_executor_request) | ||
|
|
||
| def _is_handoff_requested(self, response: AgentResponse) -> str | None: | ||
| def _is_handoff_requested(self, response: AgentResponse) -> tuple[str, Message] | None: | ||
| """Determine if the agent response includes a handoff request. | ||
|
|
||
| If a handoff tool is invoked, the middleware will short-circuit execution | ||
| and provide a synthetic result that includes the target agent ID. The message | ||
| that contains the function result will be the last message in the response. | ||
|
|
||
| Args: | ||
| response: The AgentResponse to inspect for handoff requests | ||
|
|
||
| Returns: | ||
| A tuple of (target_agent_id, message) if a handoff is requested, or None if no handoff is requested | ||
|
|
||
| Note: | ||
| The returned message is the full message that contains the handoff function result content. This is | ||
| needed to complete the agent's chat history due to the `_AutoHandoffMiddleware` short-circuiting | ||
| behavior, which prevents the handoff tool call and result from being included in the agent response | ||
| messages. By returning the full message, we can ensure the agent's chat history remains valid with | ||
| a function result for the handoff tool call. | ||
| """ | ||
| if not response.messages: | ||
| return None | ||
|
|
@@ -618,7 +534,7 @@ def _is_handoff_requested(self, response: AgentResponse) -> str | None: | |
| if parsed_payload: | ||
| handoff_target = parsed_payload.get(HANDOFF_FUNCTION_RESULT_KEY) | ||
| if isinstance(handoff_target, str): | ||
| return handoff_target | ||
| return handoff_target, last_message | ||
| else: | ||
| continue | ||
|
|
||
|
|
@@ -1035,6 +951,25 @@ def build(self) -> Workflow: | |
| # Resolve agents (either from instances or factories) | ||
| # The returned map keys are either executor IDs or factory names, which is need to resolve handoff configs | ||
| resolved_agents = self._resolve_agents() | ||
|
|
||
| # Validate that all agents have require_per_service_call_history_persistence enabled. | ||
| # Handoff workflows use middleware that short-circuits tool calls (MiddlewareTermination), | ||
| # which means the service never sees those tool results. Without per-service-call | ||
| # history persistence, local history providers would persist tool results that | ||
| # the service has no record of, causing call/result mismatches on subsequent turns. | ||
| agents_missing_flag = [ | ||
| resolve_agent_id(agent) | ||
| for agent in resolved_agents.values() | ||
| if not agent.require_per_service_call_history_persistence | ||
| ] | ||
| if agents_missing_flag: | ||
| raise ValueError( | ||
| f"Handoff workflows require all participant agents to have " | ||
| f"'require_per_service_call_history_persistence=True'. " | ||
| f"The following agents are missing this setting: {', '.join(agents_missing_flag)}. " | ||
| f"Set this flag when constructing each Agent to ensure local history stays " | ||
| f"consistent with the service across handoff tool-call short-circuits." | ||
| ) | ||
| # Resolve handoff configurations to use agent display names | ||
| # The returned map keys are executor IDs | ||
| resolved_handoffs = self._resolve_handoffs(resolved_agents) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Behavior change: cloning now filters out
BaseHistoryProviderinstances and injects a no-opInMemoryHistoryProviderto avoid duplicate message injection. There are existing orchestration tests in this repo for handoff behavior; please add/adjust a regression test that asserts no duplicated messages are sent after handoff + resume when agents originally had history providers configured.