diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_run_common.py b/python/packages/ag-ui/agent_framework_ag_ui/_run_common.py index cec86bdcf3..997c375ed1 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_run_common.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_run_common.py @@ -195,6 +195,23 @@ def _emit_tool_call( delta = ( content.arguments if isinstance(content.arguments, str) else json.dumps(make_json_safe(content.arguments)) ) + + if tool_call_id in flow.tool_calls_by_id: + accumulated = flow.tool_calls_by_id[tool_call_id]["function"]["arguments"] + # Guard against full-argument replay: if the accumulated arguments + # already equal the incoming delta, this is a non-delta replay of + # the complete arguments string (some providers send the full + # arguments again after streaming deltas). Skip the event emission + # and accumulation to prevent doubling in MESSAGES_SNAPSHOT. + # This mirrors the early-return behaviour of _emit_text(). + # (Fixes #4194) + if accumulated and delta == accumulated: + logger.debug( + "Skipping duplicate full-arguments replay for tool_call_id=%s", + tool_call_id, + ) + return events + events.append(ToolCallArgsEvent(tool_call_id=tool_call_id, delta=delta)) if tool_call_id in flow.tool_calls_by_id: diff --git a/python/packages/ag-ui/tests/ag_ui/test_run.py b/python/packages/ag-ui/tests/ag_ui/test_run.py index 4abd63c799..73c9648c02 100644 --- a/python/packages/ag-ui/tests/ag_ui/test_run.py +++ b/python/packages/ag-ui/tests/ag_ui/test_run.py @@ -6,6 +6,7 @@ from ag_ui.core import ( TextMessageEndEvent, TextMessageStartEvent, + ToolCallArgsEvent, ) from agent_framework import AgentResponseUpdate, Content, Message, ResponseStream from agent_framework.exceptions import AgentInvalidResponseException @@ -416,6 +417,42 @@ def test_emit_tool_call_generates_id(): assert flow.tool_call_id is not None # ID should be generated +def test_emit_tool_call_skips_duplicate_full_arguments_replay(): + """Test _emit_tool_call skips replayed full-arguments on an existing tool call. + + This is a regression test for issue #4194 where some streaming providers + send the full arguments string again after streaming deltas, causing the + arguments to be doubled in MESSAGES_SNAPSHOT events. + + Mirrors test_emit_text_skips_duplicate_full_message_delta for consistency. + """ + flow = FlowState() + full_args = '{"city": "Seattle"}' + + # Step 1: Initial tool call with name + arguments (normal start) + content_start = Content.from_function_call( + call_id="call_dup", + name="get_weather", + arguments=full_args, + ) + events_start = _emit_tool_call(content_start, flow) + + # Should emit ToolCallStartEvent + ToolCallArgsEvent + assert any(isinstance(e, ToolCallArgsEvent) for e in events_start) + assert flow.tool_calls_by_id["call_dup"]["function"]["arguments"] == full_args + + # Step 2: Provider replays the full arguments (duplicate) + content_replay = Content(type="function_call", call_id="call_dup", arguments=full_args) + events_replay = _emit_tool_call(content_replay, flow) + + # Should NOT emit any ToolCallArgsEvent (early return on replay) + args_events = [e for e in events_replay if isinstance(e, ToolCallArgsEvent)] + assert args_events == [], "Duplicate full-arguments replay should not emit ToolCallArgsEvent" + + # Accumulated arguments should remain unchanged + assert flow.tool_calls_by_id["call_dup"]["function"]["arguments"] == full_args + + def test_emit_tool_result_closes_open_message(): """Test _emit_tool_result emits TextMessageEndEvent for open text message.