From 056805e1ab5e4385868bb81de514569b227a55cd Mon Sep 17 00:00:00 2001 From: Amit Mukherjee Date: Fri, 20 Feb 2026 13:37:26 -0600 Subject: [PATCH 1/3] Fix structured_output propagation in ClaudeAgent Capture structured_output from ResultMessage in _get_stream() and propagate it to AgentResponse.value via a custom finalizer. Previously structured_output was silently discarded, making output_format unusable. Fixes #4095 --- .../claude/agent_framework_claude/_agent.py | 29 +++- .../claude/tests/test_claude_agent.py | 160 ++++++++++++++++++ 2 files changed, 188 insertions(+), 1 deletion(-) diff --git a/python/packages/claude/agent_framework_claude/_agent.py b/python/packages/claude/agent_framework_claude/_agent.py index 3d94888263..8e09c6b3b4 100644 --- a/python/packages/claude/agent_framework_claude/_agent.py +++ b/python/packages/claude/agent_framework_claude/_agent.py @@ -618,12 +618,30 @@ def run( """ response = ResponseStream( self._get_stream(messages, session=session, options=options, **kwargs), - finalizer=AgentResponse.from_updates, + finalizer=self._finalize_response, ) if stream: return response return response.get_final_response() + @staticmethod + def _finalize_response(updates: Sequence[AgentResponseUpdate]) -> AgentResponse[Any]: + """Build AgentResponse and propagate structured_output as value. + + Args: + updates: The collected stream updates. + + Returns: + An AgentResponse with structured_output set as value if present. + """ + response = AgentResponse.from_updates(updates) + for update in updates: + if update.additional_properties and "structured_output" in update.additional_properties: + response._value = update.additional_properties["structured_output"] + response._value_parsed = True + break + return response + async def _get_stream( self, messages: AgentRunInputs | None = None, @@ -647,6 +665,7 @@ async def _get_stream( await self._apply_runtime_options(dict(options) if options else None) session_id: str | None = None + structured_output: Any = None await self._client.query(prompt) async for message in self._client.receive_response(): @@ -700,6 +719,14 @@ async def _get_stream( error_msg = message.result or "Unknown error from Claude API" raise AgentException(f"Claude API error: {error_msg}") session_id = message.session_id + structured_output = message.structured_output + + # Yield structured output if present + if structured_output is not None: + yield AgentResponseUpdate( + role="assistant", + additional_properties={"structured_output": structured_output}, + ) # Update session with session ID if session_id: diff --git a/python/packages/claude/tests/test_claude_agent.py b/python/packages/claude/tests/test_claude_agent.py index 042e311fd3..092a179209 100644 --- a/python/packages/claude/tests/test_claude_agent.py +++ b/python/packages/claude/tests/test_claude_agent.py @@ -785,3 +785,163 @@ async def test_apply_runtime_options_none(self) -> None: await agent._apply_runtime_options(None) # type: ignore[reportPrivateUsage] mock_client.set_model.assert_not_called() mock_client.set_permission_mode.assert_not_called() + + +# region Test ClaudeAgent Structured Output + + +class TestClaudeAgentStructuredOutput: + """Tests for ClaudeAgent structured output propagation.""" + + @staticmethod + async def _create_async_generator(items: list[Any]) -> Any: + """Helper to create async generator from list.""" + for item in items: + yield item + + def _create_mock_client(self, messages: list[Any]) -> MagicMock: + """Create a mock ClaudeSDKClient that yields given messages.""" + mock_client = MagicMock() + mock_client.connect = AsyncMock() + mock_client.disconnect = AsyncMock() + mock_client.query = AsyncMock() + mock_client.set_model = AsyncMock() + mock_client.set_permission_mode = AsyncMock() + mock_client.receive_response = MagicMock(return_value=self._create_async_generator(messages)) + return mock_client + + async def test_structured_output_propagated_to_response(self) -> None: + """Test that structured_output from ResultMessage is propagated to response.value.""" + from claude_agent_sdk import AssistantMessage, ResultMessage, TextBlock + from claude_agent_sdk.types import StreamEvent + + structured_data = {"name": "Alice", "age": 30} + messages = [ + StreamEvent( + event={ + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": '{"name": "Alice", "age": 30}'}, + }, + uuid="event-1", + session_id="session-123", + ), + AssistantMessage( + content=[TextBlock(text='{"name": "Alice", "age": 30}')], + model="claude-sonnet", + ), + ResultMessage( + subtype="success", + duration_ms=100, + duration_api_ms=50, + is_error=False, + num_turns=1, + session_id="session-123", + structured_output=structured_data, + ), + ] + mock_client = self._create_mock_client(messages) + + with patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client): + agent = ClaudeAgent() + response = await agent.run("Return structured data") + assert response.value == structured_data + + async def test_structured_output_none_when_not_present(self) -> None: + """Test that response.value is None when structured_output is not present.""" + from claude_agent_sdk import AssistantMessage, ResultMessage, TextBlock + from claude_agent_sdk.types import StreamEvent + + messages = [ + StreamEvent( + event={ + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": "Hello!"}, + }, + uuid="event-1", + session_id="session-123", + ), + AssistantMessage( + content=[TextBlock(text="Hello!")], + model="claude-sonnet", + ), + ResultMessage( + subtype="success", + duration_ms=100, + duration_api_ms=50, + is_error=False, + num_turns=1, + session_id="session-123", + ), + ] + mock_client = self._create_mock_client(messages) + + with patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client): + agent = ClaudeAgent() + response = await agent.run("Hello") + assert response.value is None + + async def test_structured_output_with_streaming(self) -> None: + """Test that structured_output is available in streaming updates.""" + from claude_agent_sdk import AssistantMessage, ResultMessage, TextBlock + from claude_agent_sdk.types import StreamEvent + + structured_data = {"key": "value"} + messages = [ + StreamEvent( + event={ + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": '{"key": "value"}'}, + }, + uuid="event-1", + session_id="session-123", + ), + AssistantMessage( + content=[TextBlock(text='{"key": "value"}')], + model="claude-sonnet", + ), + ResultMessage( + subtype="success", + duration_ms=100, + duration_api_ms=50, + is_error=False, + num_turns=1, + session_id="session-123", + structured_output=structured_data, + ), + ] + mock_client = self._create_mock_client(messages) + + with patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client): + agent = ClaudeAgent() + updates: list[AgentResponseUpdate] = [] + async for update in agent.run("Return structured data", stream=True): + updates.append(update) + # Last update should carry structured_output in additional_properties + last_update = updates[-1] + assert last_update.additional_properties is not None + assert last_update.additional_properties.get("structured_output") == structured_data + + async def test_structured_output_with_error_does_not_propagate(self) -> None: + """Test that structured_output is not propagated when ResultMessage is an error.""" + from agent_framework.exceptions import AgentException + from claude_agent_sdk import ResultMessage + + messages = [ + ResultMessage( + subtype="error", + duration_ms=100, + duration_api_ms=50, + is_error=True, + num_turns=0, + session_id="error-session", + result="Something went wrong", + structured_output={"some": "data"}, + ), + ] + mock_client = self._create_mock_client(messages) + + with patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client): + agent = ClaudeAgent() + with pytest.raises(AgentException) as exc_info: + await agent.run("Hello") + assert "Something went wrong" in str(exc_info.value) From 2b7abe24b46c1ec692dbb71de6bf97a8a5c3551a Mon Sep 17 00:00:00 2001 From: Amit Mukherjee Date: Mon, 23 Feb 2026 11:41:14 -0600 Subject: [PATCH 2/3] Address review feedback: use value parameter instead of private properties - Extend AgentResponse.from_updates() to accept optional value parameter - Remove structured_output yield from _get_stream() - Update _finalize_response() to pass value via public API - Update streaming test to use get_final_response() --- .../claude/agent_framework_claude/_agent.py | 22 +++++-------------- .../claude/tests/test_claude_agent.py | 16 +++++++------- .../packages/core/agent_framework/_types.py | 4 +++- 3 files changed, 17 insertions(+), 25 deletions(-) diff --git a/python/packages/claude/agent_framework_claude/_agent.py b/python/packages/claude/agent_framework_claude/_agent.py index 8e09c6b3b4..43f001b3db 100644 --- a/python/packages/claude/agent_framework_claude/_agent.py +++ b/python/packages/claude/agent_framework_claude/_agent.py @@ -624,8 +624,7 @@ def run( return response return response.get_final_response() - @staticmethod - def _finalize_response(updates: Sequence[AgentResponseUpdate]) -> AgentResponse[Any]: + def _finalize_response(self, updates: Sequence[AgentResponseUpdate]) -> AgentResponse[Any]: """Build AgentResponse and propagate structured_output as value. Args: @@ -634,13 +633,8 @@ def _finalize_response(updates: Sequence[AgentResponseUpdate]) -> AgentResponse[ Returns: An AgentResponse with structured_output set as value if present. """ - response = AgentResponse.from_updates(updates) - for update in updates: - if update.additional_properties and "structured_output" in update.additional_properties: - response._value = update.additional_properties["structured_output"] - response._value_parsed = True - break - return response + structured_output = getattr(self, "_structured_output", None) + return AgentResponse.from_updates(updates, value=structured_output) async def _get_stream( self, @@ -721,13 +715,9 @@ async def _get_stream( session_id = message.session_id structured_output = message.structured_output - # Yield structured output if present - if structured_output is not None: - yield AgentResponseUpdate( - role="assistant", - additional_properties={"structured_output": structured_output}, - ) - # Update session with session ID if session_id: session.service_session_id = session_id + + # Store structured output for the finalizer + self._structured_output = structured_output diff --git a/python/packages/claude/tests/test_claude_agent.py b/python/packages/claude/tests/test_claude_agent.py index 092a179209..0e126c36b9 100644 --- a/python/packages/claude/tests/test_claude_agent.py +++ b/python/packages/claude/tests/test_claude_agent.py @@ -881,7 +881,7 @@ async def test_structured_output_none_when_not_present(self) -> None: assert response.value is None async def test_structured_output_with_streaming(self) -> None: - """Test that structured_output is available in streaming updates.""" + """Test that structured_output is available via get_final_response after streaming.""" from claude_agent_sdk import AssistantMessage, ResultMessage, TextBlock from claude_agent_sdk.types import StreamEvent @@ -913,13 +913,13 @@ async def test_structured_output_with_streaming(self) -> None: with patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client): agent = ClaudeAgent() - updates: list[AgentResponseUpdate] = [] - async for update in agent.run("Return structured data", stream=True): - updates.append(update) - # Last update should carry structured_output in additional_properties - last_update = updates[-1] - assert last_update.additional_properties is not None - assert last_update.additional_properties.get("structured_output") == structured_data + stream = agent.run("Return structured data", stream=True) + # Consume the stream + async for _ in stream: + pass + # Structured output should be available via get_final_response + response = await stream.get_final_response() + assert response.value == structured_data async def test_structured_output_with_error_does_not_propagate(self) -> None: """Test that structured_output is not propagated when ResultMessage is an error.""" diff --git a/python/packages/core/agent_framework/_types.py b/python/packages/core/agent_framework/_types.py index a699a30f5f..a419107358 100644 --- a/python/packages/core/agent_framework/_types.py +++ b/python/packages/core/agent_framework/_types.py @@ -2273,6 +2273,7 @@ def from_updates( updates: Sequence[AgentResponseUpdate], *, output_format_type: type[BaseModel] | None = None, + value: Any | None = None, ) -> AgentResponseT: """Joins multiple updates into a single AgentResponse. @@ -2281,8 +2282,9 @@ def from_updates( Keyword Args: output_format_type: Optional Pydantic model type to parse the response text into structured data. + value: Optional pre-parsed structured output value to set directly on the response. """ - msg = cls(messages=[], response_format=output_format_type) + msg = cls(messages=[], response_format=output_format_type, value=value) for update in updates: _process_update(msg, update) _finalize_response(msg) From b6bc61057fbb9a47747c6557bc856070ab59074a Mon Sep 17 00:00:00 2001 From: Amit Mukherjee Date: Mon, 23 Feb 2026 12:31:35 -0600 Subject: [PATCH 3/3] Fix mypy errors: add value parameter to from_updates overloads Add value parameter to both @overload signatures of AgentResponse.from_updates() so mypy recognizes the argument. --- python/packages/core/agent_framework/_types.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/packages/core/agent_framework/_types.py b/python/packages/core/agent_framework/_types.py index a419107358..c664fd3446 100644 --- a/python/packages/core/agent_framework/_types.py +++ b/python/packages/core/agent_framework/_types.py @@ -2256,6 +2256,7 @@ def from_updates( updates: Sequence[AgentResponseUpdate], *, output_format_type: type[ResponseModelBoundT], + value: Any | None = None, ) -> AgentResponse[ResponseModelBoundT]: ... @overload @@ -2265,6 +2266,7 @@ def from_updates( updates: Sequence[AgentResponseUpdate], *, output_format_type: None = None, + value: Any | None = None, ) -> AgentResponse[Any]: ... @classmethod