From 0465a21a17153f49c7981c9326ceacc3d08046fc Mon Sep 17 00:00:00 2001 From: LEDazzio01 <170764058+LEDazzio01@users.noreply.github.com> Date: Fri, 10 Apr 2026 17:49:24 -0400 Subject: [PATCH 1/2] fix: filter history providers in handoff to prevent duplicate messages (#4695) --- .../_handoff.py | 223 +++++++----------- 1 file changed, 79 insertions(+), 144 deletions(-) diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py b/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py index 4352a8af47..2065e89a82 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py @@ -38,17 +38,15 @@ from dataclasses import dataclass from typing import Any -from agent_framework import Agent, SupportsAgentRun -from agent_framework._middleware import FunctionInvocationContext, FunctionMiddleware -from agent_framework._sessions import AgentSession +from agent_framework import Agent, AgentResponse, Message, SupportsAgentRun +from agent_framework._middleware import FunctionInvocationContext, FunctionMiddleware, MiddlewareTermination +from agent_framework._sessions import AgentSession, BaseHistoryProvider, InMemoryHistoryProvider from agent_framework._tools import FunctionTool, tool -from agent_framework._types import AgentResponse, Content, Message from agent_framework._workflows._agent_executor import AgentExecutor, AgentExecutorRequest from agent_framework._workflows._agent_utils import resolve_agent_id from agent_framework._workflows._checkpoint import CheckpointStorage from agent_framework._workflows._events import WorkflowEvent from agent_framework._workflows._request_info_mixin import response_handler -from agent_framework._workflows._typing_utils import is_chat_agent from agent_framework._workflows._workflow import Workflow from agent_framework._workflows._workflow_builder import WorkflowBuilder from agent_framework._workflows._workflow_context import WorkflowContext @@ -138,8 +136,6 @@ async def process( await call_next() return - from agent_framework._middleware import MiddlewareTermination - # Short-circuit execution and provide deterministic response payload for the tool call. # Parse the result using the default parser to ensure in a form that can be passed directly to LLM APIs. context.result = FunctionTool.parse_result({ @@ -163,7 +159,7 @@ def create_response(response: str | list[str] | Message | list[Message]) -> list """Create a HandoffAgentUserRequest from a simple text response.""" messages: list[Message] = [] if isinstance(response, str): - messages.append(Message(role="user", text=response)) + messages.append(Message(role="user", contents=[response])) elif isinstance(response, Message): messages.append(response) elif isinstance(response, list): @@ -171,7 +167,7 @@ def create_response(response: str | list[str] | Message | list[Message]) -> list if isinstance(item, Message): messages.append(item) elif isinstance(item, str): - messages.append(Message(role="user", text=item)) + messages.append(Message(role="user", contents=[item])) else: raise TypeError("List items must be either str or Message instances") else: @@ -265,88 +261,6 @@ def _prepare_agent_with_handoffs( return cloned_agent - def _persist_pending_approval_function_calls(self) -> None: - """Persist pending approval function calls for stateless provider resumes. - - Handoff workflows force ``store=False`` and replay conversation state from ``_full_conversation``. - When a run pauses on function approval, ``AgentExecutor`` returns ``None`` and the assistant - function-call message is not returned as an ``AgentResponse``. Without persisting that call, the - next turn may submit only a function result, which responses-style APIs reject. - """ - pending_calls: list[Content] = [] - for request in self._pending_agent_requests.values(): - if request.type != "function_approval_request": - continue - function_call = getattr(request, "function_call", None) - if isinstance(function_call, Content) and function_call.type == "function_call": - pending_calls.append(function_call) - - if not pending_calls: - return - - self._full_conversation.append( - Message( - role="assistant", - contents=pending_calls, - author_name=self._agent.name, - ) - ) - - def _persist_missing_approved_function_results( - self, - *, - runtime_tool_messages: list[Message], - response_messages: list[Message], - ) -> None: - """Persist fallback function_result entries for approved calls when missing. - - In approval resumes, function invocation can execute approved tools without - always surfacing those tool outputs in the returned ``AgentResponse.messages``. - For stateless handoff replays, we must keep call/output pairs balanced. - """ - candidate_results: dict[str, Content] = {} - for message in runtime_tool_messages: - for content in message.contents: - if content.type == "function_result": - call_id = getattr(content, "call_id", None) - if isinstance(call_id, str) and call_id: - candidate_results[call_id] = content - continue - - if content.type != "function_approval_response" or not content.approved: - continue - - function_call = getattr(content, "function_call", None) - call_id = getattr(function_call, "call_id", None) or getattr(content, "id", None) - if isinstance(call_id, str) and call_id and call_id not in candidate_results: - # Fallback content for approved calls when runtime messages do not include - # a concrete function_result payload. - candidate_results[call_id] = Content.from_function_result( - call_id=call_id, - result='{"status":"approved"}', - ) - - if not candidate_results: - return - - observed_result_call_ids: set[str] = set() - for message in [*self._full_conversation, *response_messages]: - for content in message.contents: - if content.type == "function_result" and isinstance(content.call_id, str) and content.call_id: - observed_result_call_ids.add(content.call_id) - - missing_call_ids = sorted(set(candidate_results.keys()) - observed_result_call_ids) - if not missing_call_ids: - return - - self._full_conversation.append( - Message( - role="tool", - contents=[candidate_results[call_id] for call_id in missing_call_ids], - author_name=self._agent.name, - ) - ) - def _clone_chat_agent(self, agent: Agent[Any]) -> Agent[Any]: """Produce a deep copy of the Agent while preserving runtime configuration.""" options = agent.default_options @@ -362,19 +276,39 @@ def _clone_chat_agent(self, agent: Agent[Any]) -> Agent[Any]: cloned_options = deepcopy(options) # Disable parallel tool calls to prevent the agent from invoking multiple handoff tools at once. cloned_options["allow_multiple_tool_calls"] = False - cloned_options["store"] = False cloned_options["tools"] = new_tools # restore the original tools, in case they are shared between agents options["tools"] = tools_from_options + # Filter out history providers to prevent duplicate messages. + # The HandoffAgentExecutor manages conversation history via _full_conversation, + # so history providers would re-inject previously stored messages on each + # agent.run() call, causing the entire conversation to appear twice. + # A no-op InMemoryHistoryProvider placeholder prevents the agent from + # auto-injecting a default one at runtime. + filtered_providers = [ + p for p in agent.context_providers + if not isinstance(p, BaseHistoryProvider) + ] + # Always add a no-op placeholder to prevent the agent from + # auto-injecting a default InMemoryHistoryProvider at runtime. + filtered_providers.append( + InMemoryHistoryProvider( + load_messages=False, + store_inputs=False, + store_outputs=False, + ) + ) + return Agent( client=agent.client, id=agent.id, name=agent.name, description=agent.description, - context_providers=agent.context_providers, + context_providers=filtered_providers, middleware=agent.agent_middleware, + require_per_service_call_history_persistence=agent.require_per_service_call_history_persistence, default_options=cloned_options, # type: ignore[assignment] ) @@ -427,45 +361,15 @@ def _handoff_tool() -> None: @override async def _run_agent_and_emit(self, ctx: WorkflowContext[Any, Any]) -> None: """Override to support handoff.""" - incoming_messages = list(self._cache) - cleaned_incoming_messages = clean_conversation_for_handoff(incoming_messages) - runtime_tool_messages = [ - message - for message in incoming_messages - if any( - content.type - in { - "function_result", - "function_approval_response", - } - for content in message.contents - ) - or message.role == "tool" - ] - # When the full conversation is empty, it means this is the first run. # Broadcast the initial cache to all other agents. Subsequent runs won't # need this since responses are broadcast after each agent run and user input. if self._is_start_agent and not self._full_conversation: - await self._broadcast_messages(cleaned_incoming_messages, ctx) - - # Persist only cleaned chat history between turns to avoid replaying stale tool calls. - self._full_conversation.extend(cleaned_incoming_messages) - - # Always run with full conversation context for request_info resumes. - # Keep runtime tool-control messages for this run only (e.g., approval responses). - self._cache = list(self._full_conversation) - self._cache.extend(runtime_tool_messages) - - # Handoff workflows are orchestrator-stateful and provider-stateless by design. - # If an existing session still has a service conversation id, clear it to avoid - # replaying stale unresolved tool calls across resumed turns. - if ( - is_chat_agent(self._agent) - and self._agent.default_options.get("store") is False - and self._session.service_session_id is not None - ): - self._session.service_session_id = None + await self._broadcast_messages(self._cache.copy(), ctx) + + # Full conversation maintains the chat history between agents across handoffs, + # excluding internal agent messages such as tool calls and results. + self._full_conversation.extend(self._cache.copy()) # Check termination condition before running the agent if await self._check_terminate_and_yield(ctx): @@ -484,36 +388,35 @@ async def _run_agent_and_emit(self, ctx: WorkflowContext[Any, Any]) -> None: # A function approval request is issued by the base AgentExecutor if response is None: - if is_chat_agent(self._agent) and self._agent.default_options.get("store") is False: - self._persist_pending_approval_function_calls() # Agent did not complete (e.g., waiting for user input); do not emit response logger.debug("AgentExecutor %s: Agent did not complete, awaiting user input", self.id) return - # Remove function call related content from the agent response for broadcast. - # This prevents replaying stale tool artifacts to other agents. + # Remove function call related content from the agent response for full conversation history cleaned_response = clean_conversation_for_handoff(response.messages) - # For internal tracking, preserve the full response (including function_calls) - # in _full_conversation so that Azure OpenAI can match function_calls with - # function_results when the workflow resumes after user approvals. - self._full_conversation.extend(response.messages) - self._persist_missing_approved_function_results( - runtime_tool_messages=runtime_tool_messages, - response_messages=response.messages, - ) + # Append the agent response to the full conversation history. This list removes + # function call related content such that the result stays consistent regardless + # of which agent yields the final output. + self._full_conversation.extend(cleaned_response) # Broadcast only the cleaned response to other agents (without function_calls/results) await self._broadcast_messages(cleaned_response, ctx) # Check if a handoff was requested - if handoff_target := self._is_handoff_requested(response): + if is_handoff_requested := self._is_handoff_requested(response): + handoff_target, handoff_message = is_handoff_requested if handoff_target not in self._handoff_targets: raise ValueError( f"Agent '{resolve_agent_id(self._agent)}' attempted to handoff to unknown " f"target '{handoff_target}'. Valid targets are: {', '.join(self._handoff_targets)}" ) + # Add the handoff message to the cache so that the next invocation of the agent includes + # the tool call result. This is necessary because each tool call must have a corresponding + # tool result. + self._cache.append(handoff_message) + await ctx.send_message( AgentExecutorRequest(messages=[], should_respond=True), target_id=handoff_target, @@ -536,7 +439,7 @@ async def _run_agent_and_emit(self, ctx: WorkflowContext[Any, Any]) -> None: # or a termination condition is met. # This allows the agent to perform long-running tasks without returning control # to the coordinator or user prematurely. - self._cache.extend([Message(role="user", text=self._autonomous_mode_prompt)]) + self._cache.extend([Message(role="user", contents=[self._autonomous_mode_prompt])]) self._autonomous_mode_turns += 1 await self._run_agent_and_emit(ctx) else: @@ -590,12 +493,25 @@ async def _broadcast_messages( # Since all agents are connected via fan-out, we can directly send the message await ctx.send_message(agent_executor_request) - def _is_handoff_requested(self, response: AgentResponse) -> str | None: + def _is_handoff_requested(self, response: AgentResponse) -> tuple[str, Message] | None: """Determine if the agent response includes a handoff request. If a handoff tool is invoked, the middleware will short-circuit execution and provide a synthetic result that includes the target agent ID. The message that contains the function result will be the last message in the response. + + Args: + response: The AgentResponse to inspect for handoff requests + + Returns: + A tuple of (target_agent_id, message) if a handoff is requested, or None if no handoff is requested + + Note: + The returned message is the full message that contains the handoff function result content. This is + needed to complete the agent's chat history due to the `_AutoHandoffMiddleware` short-circuiting + behavior, which prevents the handoff tool call and result from being included in the agent response + messages. By returning the full message, we can ensure the agent's chat history remains valid with + a function result for the handoff tool call. """ if not response.messages: return None @@ -618,7 +534,7 @@ def _is_handoff_requested(self, response: AgentResponse) -> str | None: if parsed_payload: handoff_target = parsed_payload.get(HANDOFF_FUNCTION_RESULT_KEY) if isinstance(handoff_target, str): - return handoff_target + return handoff_target, last_message else: continue @@ -1035,6 +951,25 @@ def build(self) -> Workflow: # Resolve agents (either from instances or factories) # The returned map keys are either executor IDs or factory names, which is need to resolve handoff configs resolved_agents = self._resolve_agents() + + # Validate that all agents have require_per_service_call_history_persistence enabled. + # Handoff workflows use middleware that short-circuits tool calls (MiddlewareTermination), + # which means the service never sees those tool results. Without per-service-call + # history persistence, local history providers would persist tool results that + # the service has no record of, causing call/result mismatches on subsequent turns. + agents_missing_flag = [ + resolve_agent_id(agent) + for agent in resolved_agents.values() + if not agent.require_per_service_call_history_persistence + ] + if agents_missing_flag: + raise ValueError( + f"Handoff workflows require all participant agents to have " + f"'require_per_service_call_history_persistence=True'. " + f"The following agents are missing this setting: {', '.join(agents_missing_flag)}. " + f"Set this flag when constructing each Agent to ensure local history stays " + f"consistent with the service across handoff tool-call short-circuits." + ) # Resolve handoff configurations to use agent display names # The returned map keys are executor IDs resolved_handoffs = self._resolve_handoffs(resolved_agents) From d76e1902b3b116c1de1965a76f47bbff627d8ba9 Mon Sep 17 00:00:00 2001 From: "L. Elaine Dazzio" Date: Fri, 10 Apr 2026 18:06:05 -0400 Subject: [PATCH 2/2] test: add regression tests for history provider filtering in handoff cloning --- .../tests/test_handoff_history_filtering.py | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 python/packages/orchestrations/tests/test_handoff_history_filtering.py diff --git a/python/packages/orchestrations/tests/test_handoff_history_filtering.py b/python/packages/orchestrations/tests/test_handoff_history_filtering.py new file mode 100644 index 0000000000..fc21c2aee1 --- /dev/null +++ b/python/packages/orchestrations/tests/test_handoff_history_filtering.py @@ -0,0 +1,104 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for history provider filtering in handoff agent cloning.""" + +from unittest.mock import MagicMock + +from agent_framework import Agent +from agent_framework._sessions import BaseHistoryProvider, InMemoryHistoryProvider +from agent_framework_orchestrations._handoff import HandoffAgentExecutor + + +class FakeHistoryProvider(BaseHistoryProvider): + """A concrete history provider for testing the filtering logic.""" + + def __init__(self) -> None: + super().__init__(load_messages=True, store_inputs=True, store_outputs=True) + + async def get_messages(self, *args, **kwargs): # type: ignore[no-untyped-def] + return [] + + async def add_messages(self, *args, **kwargs): # type: ignore[no-untyped-def] + pass + + +class FakeContextProvider: + """A non-history context provider that should survive filtering.""" + + async def get_context(self, *args, **kwargs): # type: ignore[no-untyped-def] + return [] + + +def _make_agent(context_providers: list | None = None) -> Agent: + """Create a minimal Agent with mocked client for testing.""" + agent = Agent( + name="test-agent", + client=MagicMock(), + context_providers=context_providers or [], + require_per_service_call_history_persistence=True, + ) + return agent + + +def test_clone_filters_history_providers() -> None: + """History providers should be filtered out during cloning.""" + history_provider = FakeHistoryProvider() + other_provider = FakeContextProvider() + + agent = _make_agent(context_providers=[history_provider, other_provider]) + executor = HandoffAgentExecutor.__new__(HandoffAgentExecutor) + + cloned = executor._clone_chat_agent(agent) + + # The FakeHistoryProvider should be filtered out + provider_types = [type(p) for p in cloned.context_providers] + assert FakeHistoryProvider not in provider_types + + # The non-history provider should survive + assert FakeContextProvider in provider_types + + +def test_clone_adds_noop_history_placeholder() -> None: + """A no-op InMemoryHistoryProvider should be added to prevent auto-injection.""" + agent = _make_agent(context_providers=[]) + executor = HandoffAgentExecutor.__new__(HandoffAgentExecutor) + + cloned = executor._clone_chat_agent(agent) + + # Should have exactly one InMemoryHistoryProvider (the no-op placeholder) + history_providers = [ + p for p in cloned.context_providers + if isinstance(p, InMemoryHistoryProvider) + ] + assert len(history_providers) == 1 + + # The placeholder should be no-op + placeholder = history_providers[0] + assert placeholder.load_messages is False + assert placeholder.store_inputs is False + assert placeholder.store_outputs is False + + +def test_clone_replaces_active_history_with_noop() -> None: + """An agent with an active history provider should get a no-op replacement after cloning.""" + active_provider = InMemoryHistoryProvider( + load_messages=True, + store_inputs=True, + store_outputs=True, + ) + agent = _make_agent(context_providers=[active_provider]) + executor = HandoffAgentExecutor.__new__(HandoffAgentExecutor) + + cloned = executor._clone_chat_agent(agent) + + history_providers = [ + p for p in cloned.context_providers + if isinstance(p, InMemoryHistoryProvider) + ] + assert len(history_providers) == 1 + + # The original active provider should be replaced with a no-op + noop = history_providers[0] + assert noop.load_messages is False + assert noop.store_inputs is False + assert noop.store_outputs is False