From d169d158229d1931015fae5c28c06a8171fe3453 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Fri, 12 Jun 2026 15:07:11 -0700 Subject: [PATCH] Foundry hosted agent responses emit failed events --- .../_responses.py | 387 +++++++++--------- .../foundry_hosting/tests/test_responses.py | 183 ++++++++- 2 files changed, 371 insertions(+), 199 deletions(-) diff --git a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py index ec4a9d8533..3f9ae41b97 100644 --- a/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py +++ b/python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py @@ -461,59 +461,60 @@ async def _handle_inner_agent( context: ResponseContext, ) -> AsyncIterable[ResponseStreamEvent | dict[str, Any]]: """Handle the creation of a response for a regular (non-workflow) agent.""" - input_items = await context.get_input_items() - input_messages = await _items_to_messages(input_items, approval_storage=self._approval_storage) - - history = await context.get_history() - run_kwargs: dict[str, Any] = { - "messages": [ - *(await _output_items_to_messages(history, approval_storage=self._approval_storage)), - *input_messages, - ] - } - is_streaming_request = request.stream is not None and request.stream is True - - chat_options, are_options_set = _to_chat_options(request) - response_event_stream = ResponseEventStream(response_id=context.response_id, model=request.model) - yield response_event_stream.emit_created() yield response_event_stream.emit_in_progress() - if are_options_set and not isinstance(self._agent, RawAgent): - logger.warning("Agent doesn't support runtime options. They will be ignored.") - else: - run_kwargs["options"] = chat_options - - # Lazy-enter the agent (and any MCP tools it owns). The MCP client wraps gateway - # consent failures (and other connection-time errors) in AgentFrameworkException; if - # one of those is a consent error we surface the consent link to the client through - # the already-opened response stream instead of crashing the request. Other exception - # types propagate normally so the host can handle / log them. - try: - await self._ensure_agent_ready() - except AgentFrameworkException as ex: - consent_errors = consent_url_from_error(ex) - if consent_errors is None: - raise - for consent_error in consent_errors: - logger.warning("Consent URL for tool '%s': %s", consent_error.name, consent_error.consent_url) - oauth_item = OAuthConsentRequestOutputItem( - id=IdGenerator.new_id("oacr"), - consent_link=consent_error.consent_url, - server_label=consent_error.name, - ) - builder = response_event_stream.add_output_item(oauth_item.id) - yield builder.emit_added(oauth_item) - yield builder.emit_done(oauth_item) - yield response_event_stream.emit_completed() - return - # Track the current active output item builder for streaming; # lazily created on matching content, closed when a different type arrives. - tracker: _OutputItemTracker | None = _OutputItemTracker(response_event_stream) if is_streaming_request else None + tracker: _OutputItemTracker | None = None try: + input_items = await context.get_input_items() + input_messages = await _items_to_messages(input_items, approval_storage=self._approval_storage) + + history = await context.get_history() + run_kwargs: dict[str, Any] = { + "messages": [ + *(await _output_items_to_messages(history, approval_storage=self._approval_storage)), + *input_messages, + ] + } + is_streaming_request = request.stream is not None and request.stream is True + + chat_options, are_options_set = _to_chat_options(request) + + if are_options_set and not isinstance(self._agent, RawAgent): + logger.warning("Agent doesn't support runtime options. They will be ignored.") + else: + run_kwargs["options"] = chat_options + + # Lazy-enter the agent (and any MCP tools it owns). The MCP client wraps gateway + # consent failures (and other connection-time errors) in AgentFrameworkException; if + # one of those is a consent error we surface the consent link to the client through + # the already-opened response stream instead of failing the request. Other exception + # types fall through to the outer handler below and become ``response.failed``. + try: + await self._ensure_agent_ready() + except AgentFrameworkException as ex: + consent_errors = consent_url_from_error(ex) + if consent_errors is None: + raise + for consent_error in consent_errors: + logger.warning("Consent URL for tool '%s': %s", consent_error.name, consent_error.consent_url) + oauth_item = OAuthConsentRequestOutputItem( + id=IdGenerator.new_id("oacr"), + consent_link=consent_error.consent_url, + server_label=consent_error.name, + ) + builder = response_event_stream.add_output_item(oauth_item.id) + yield builder.emit_added(oauth_item) + yield builder.emit_done(oauth_item) + yield response_event_stream.emit_completed() + return + + tracker = _OutputItemTracker(response_event_stream) if is_streaming_request else None + if not is_streaming_request: # Run the agent in non-streaming mode response = await self._agent.run(stream=False, **run_kwargs) # type: ignore[reportUnknownMemberType] @@ -524,7 +525,6 @@ async def _handle_inner_agent( approval_storage=self._approval_storage, ): yield item - yield response_event_stream.emit_completed() else: if tracker is None: # pragma: no cover - defensive, set above raise RuntimeError("Streaming tracker was not initialized.") @@ -545,162 +545,158 @@ async def _handle_inner_agent( # Close any remaining active builder for event in tracker.close(): yield event - yield response_event_stream.emit_completed() - except Exception: - # Drain any in-progress streaming builder before emitting consent - # so the resulting stream stays well-formed. - if tracker is not None: - for event in tracker.close(): - yield event - yield response_event_stream.emit_completed() - raise + yield response_event_stream.emit_completed() + except Exception as ex: + logger.exception("Failed to produce response for agent") + for event in self._emit_failure(response_event_stream, tracker, ex): + yield event async def _handle_inner_workflow( self, request: CreateResponse, context: ResponseContext, ) -> AsyncIterable[ResponseStreamEvent | dict[str, Any]]: - """Handle the creation of a response for a workflow agent. + """Handle the creation of a response for a workflow agent.""" + response_event_stream = ResponseEventStream(response_id=context.response_id, model=request.model) + yield response_event_stream.emit_created() + yield response_event_stream.emit_in_progress() - Why this is required: - The sandbox may be deactivated after some period of inactivity, and only data managed - by the hosting infrastructure or files will be preserved upon deactivation. - """ - input_items = await context.get_input_items() - input_messages = await _items_to_messages(input_items, approval_storage=self._approval_storage) - is_streaming_request = request.stream is not None and request.stream is True - - _, are_options_set = _to_chat_options(request) - if are_options_set: - logger.warning("Workflow agent doesn't support runtime options. They will be ignored.") - - if request.previous_response_id is not None and context.conversation_id is not None: - raise RuntimeError("Previous response ID cannot be used in conjunction with conversation ID.") - context_id = request.previous_response_id or context.conversation_id - - # The following should never happen due to the checks above. - # This is for type safety and defensive programming. - if self._checkpoint_storage_path is None: - raise RuntimeError("Checkpoint storage path is not configured for workflow agent.") - if not isinstance(self._agent, WorkflowAgent): - raise RuntimeError("Agent is not a workflow agent.") - - # Workflow agents are not async context managers in any built-in path, - # but call _ensure_agent_ready for symmetry with the regular path so - # any future async resources owned by the workflow are entered here. - await self._ensure_agent_ready() - - # Determine the latest checkpoint (if any) so we can resume the - # workflow's prior state for this turn. The directory is keyed by - # the inbound context id (conversation_id when set, otherwise - # previous_response_id). Multi-turn declarative workflows need the - # workflow's internal state (e.g. Conversation.messages, - # intermediate Local.* variables) to survive across user turns; - # the only place that state lives is the workflow checkpoint, so - # on every turn we restore the latest checkpoint and feed the new - # input back into the start executor as a continuation rather than - # a fresh run. - latest_checkpoint_id: str | None = None - restore_storage: FileCheckpointStorage | None = None - if context_id is not None: - restore_storage = _checkpoint_storage_for_context(self._checkpoint_storage_path, context_id) - latest_checkpoint = await restore_storage.get_latest(workflow_name=self._agent.workflow.name) - if latest_checkpoint is not None: - latest_checkpoint_id = latest_checkpoint.checkpoint_id - - # Storage that will receive checkpoints written during this turn. - # When the caller chains with previous_response_id, the next turn - # will reference the current response_id as its previous_response_id, - # so new checkpoints must land under the current response_id (or the - # conversation_id when set). When conversation_id is set, this - # matches restore_storage; when only previous_response_id was - # supplied, restore_storage points at the *prior* response's - # directory and write_storage points at the *current* response's. - write_context_id = context.conversation_id or context.response_id - write_storage = _checkpoint_storage_for_context(self._checkpoint_storage_path, write_context_id) - - # Multi-turn pattern: when we have a prior checkpoint, restore it - # first (drive the workflow back to idle with prior state intact), - # then make a separate call that delivers the new user input. This - # depends on Workflow.run preserving shared state across calls. The - # restore-only call may yield events from any pending in-flight - # work in the checkpoint; we consume those internally here so they - # don't surface to the response stream as duplicates. - # - # If the restored checkpoint had pending request_info events, the - # restore-only call replays them through - # ``WorkflowAgent._convert_workflow_event_to_agent_response_updates`` - # and populates ``self._agent.pending_requests``. That is the correct - # state: those requests are genuinely outstanding, and the next - # ``run(input_messages, ...)`` call may contain ``function_call_output`` - # items (carried as FunctionResult/FunctionApprovalResponse content) - # that fulfill them via :meth:`WorkflowAgent._process_pending_requests`. - if latest_checkpoint_id is not None: - if is_streaming_request: - async for _ in self._agent.run( - stream=True, - checkpoint_id=latest_checkpoint_id, - checkpoint_storage=restore_storage, - ): - pass - else: - await self._agent.run( + # Track the current active output item builder for streaming; + # lazily created on matching content, closed when a different type arrives. + tracker: _OutputItemTracker | None = None + + try: + input_items = await context.get_input_items() + input_messages = await _items_to_messages(input_items, approval_storage=self._approval_storage) + is_streaming_request = request.stream is not None and request.stream is True + + _, are_options_set = _to_chat_options(request) + if are_options_set: + logger.warning("Workflow agent doesn't support runtime options. They will be ignored.") + + if request.previous_response_id is not None and context.conversation_id is not None: + raise RuntimeError("Previous response ID cannot be used in conjunction with conversation ID.") + context_id = request.previous_response_id or context.conversation_id + + # The following should never happen due to the checks above. + # This is for type safety and defensive programming. + if self._checkpoint_storage_path is None: + raise RuntimeError("Checkpoint storage path is not configured for workflow agent.") + if not isinstance(self._agent, WorkflowAgent): + raise RuntimeError("Agent is not a workflow agent.") + + # Workflow agents are not async context managers in any built-in path, + # but call _ensure_agent_ready for symmetry with the regular path so + # any future async resources owned by the workflow are entered here. + await self._ensure_agent_ready() + + # Determine the latest checkpoint (if any) so we can resume the + # workflow's prior state for this turn. The directory is keyed by + # the inbound context id (conversation_id when set, otherwise + # previous_response_id). Multi-turn declarative workflows need the + # workflow's internal state (e.g. Conversation.messages, + # intermediate Local.* variables) to survive across user turns; + # the only place that state lives is the workflow checkpoint, so + # on every turn we restore the latest checkpoint and feed the new + # input back into the start executor as a continuation rather than + # a fresh run. + latest_checkpoint_id: str | None = None + restore_storage: FileCheckpointStorage | None = None + if context_id is not None: + restore_storage = _checkpoint_storage_for_context(self._checkpoint_storage_path, context_id) + latest_checkpoint = await restore_storage.get_latest(workflow_name=self._agent.workflow.name) + if latest_checkpoint is not None: + latest_checkpoint_id = latest_checkpoint.checkpoint_id + + # Storage that will receive checkpoints written during this turn. + # When the caller chains with previous_response_id, the next turn + # will reference the current response_id as its previous_response_id, + # so new checkpoints must land under the current response_id (or the + # conversation_id when set). When conversation_id is set, this + # matches restore_storage; when only previous_response_id was + # supplied, restore_storage points at the *prior* response's + # directory and write_storage points at the *current* response's. + write_context_id = context.conversation_id or context.response_id + write_storage = _checkpoint_storage_for_context(self._checkpoint_storage_path, write_context_id) + + # Multi-turn pattern: when we have a prior checkpoint, restore it + # first (drive the workflow back to idle with prior state intact), + # then make a separate call that delivers the new user input. This + # depends on Workflow.run preserving shared state across calls. The + # restore-only call may yield events from any pending in-flight + # work in the checkpoint; we consume those internally here so they + # don't surface to the response stream as duplicates. + # + # If the restored checkpoint had pending request_info events, the + # restore-only call replays them through + # ``WorkflowAgent._convert_workflow_event_to_agent_response_updates`` + # and populates ``self._agent.pending_requests``. That is the correct + # state: those requests are genuinely outstanding, and the next + # ``run(input_messages, ...)`` call may contain ``function_call_output`` + # items (carried as FunctionResult/FunctionApprovalResponse content) + # that fulfill them via :meth:`WorkflowAgent._process_pending_requests`. + if latest_checkpoint_id is not None: + if is_streaming_request: + async for _ in self._agent.run( + stream=True, + checkpoint_id=latest_checkpoint_id, + checkpoint_storage=restore_storage, + ): + pass + else: + await self._agent.run( + stream=False, + checkpoint_id=latest_checkpoint_id, + checkpoint_storage=restore_storage, + ) + + if not is_streaming_request: + # Run the agent in non-streaming mode with the new user input. + response = await self._agent.run( + input_messages, stream=False, - checkpoint_id=latest_checkpoint_id, - checkpoint_storage=restore_storage, + checkpoint_storage=write_storage, ) - # Now run the agent with the latest input - response_event_stream = ResponseEventStream(response_id=context.response_id, model=request.model) + async for item in _to_outputs_for_messages( + response_event_stream, + response.messages, + approval_storage=self._approval_storage, + ): + yield item - yield response_event_stream.emit_created() - yield response_event_stream.emit_in_progress() + await self._delete_not_latest_checkpoints(write_storage, self._agent.workflow.name) + yield response_event_stream.emit_completed() + return - if not is_streaming_request: - # Run the agent in non-streaming mode with the new user input. - response = await self._agent.run( + tracker = _OutputItemTracker(response_event_stream) + + # Run the workflow agent in streaming mode with the new user input. + async for update in self._agent.run( input_messages, - stream=False, + stream=True, checkpoint_storage=write_storage, - ) - - async for item in _to_outputs_for_messages( - response_event_stream, - response.messages, - approval_storage=self._approval_storage, ): - yield item + for content in update.contents: + for event in tracker.handle(content): + yield event + if tracker.needs_async: + async for item in _to_outputs( + response_event_stream, content, approval_storage=self._approval_storage + ): + yield item + tracker.needs_async = False + + # Close any remaining active builder + for event in tracker.close(): + yield event await self._delete_not_latest_checkpoints(write_storage, self._agent.workflow.name) yield response_event_stream.emit_completed() - return - - # Track the current active output item builder for streaming; - # lazily created on matching content, closed when a different type arrives. - tracker = _OutputItemTracker(response_event_stream) - - # Run the workflow agent in streaming mode with the new user input. - async for update in self._agent.run( - input_messages, - stream=True, - checkpoint_storage=write_storage, - ): - for content in update.contents: - for event in tracker.handle(content): - yield event - if tracker.needs_async: - async for item in _to_outputs( - response_event_stream, content, approval_storage=self._approval_storage - ): - yield item - tracker.needs_async = False - - # Close any remaining active builder - for event in tracker.close(): - yield event - - await self._delete_not_latest_checkpoints(write_storage, self._agent.workflow.name) - yield response_event_stream.emit_completed() + except Exception as ex: + logger.exception("Failed to produce response for workflow agent") + for event in self._emit_failure(response_event_stream, tracker, ex): + yield event @staticmethod async def _delete_not_latest_checkpoints(checkpoint_storage: FileCheckpointStorage, workflow_name: str) -> None: @@ -715,6 +711,29 @@ async def _delete_not_latest_checkpoints(checkpoint_storage: FileCheckpointStora if checkpoint.checkpoint_id != latest_checkpoint.checkpoint_id: await checkpoint_storage.delete(checkpoint.checkpoint_id) + @staticmethod + def _emit_failure( + response_event_stream: ResponseEventStream, + tracker: _OutputItemTracker | None, + ex: BaseException, + ) -> Generator[ResponseStreamEvent]: + """Yield a terminal ``response.failed`` event for ``ex``. + + Drains any in-progress streaming output item first so the resulting + SSE stream stays well-formed, then emits ``response.failed`` carrying + the exception's message (falling back to the exception type name when + ``str(ex)`` is empty). Any error raised while draining the tracker is + logged and otherwise ignored so that the original failure is always + what the client sees. + """ + if tracker is not None: + try: + yield from tracker.close() + except Exception: + logger.exception("Error while closing streaming tracker after failure") + message = str(ex) or type(ex).__name__ + yield response_event_stream.emit_failed(message=message) + # endregion ResponsesHostServer diff --git a/python/packages/foundry_hosting/tests/test_responses.py b/python/packages/foundry_hosting/tests/test_responses.py index 8bc09c5d13..b2dd842ff9 100644 --- a/python/packages/foundry_hosting/tests/test_responses.py +++ b/python/packages/foundry_hosting/tests/test_responses.py @@ -2869,7 +2869,8 @@ async def test_round_trip_approval_response_rejected(self) -> None: async def test_approval_response_referencing_unknown_id_fails(self) -> None: """Sending an `mcp_approval_response` for a request id that was - never persisted must fail (storage raises KeyError).""" + never persisted must surface as a ``response.failed`` event whose + ``error.message`` contains the missing approval request id.""" agent = _make_agent( response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("ok")])]) ) @@ -2889,9 +2890,15 @@ async def test_approval_response_referencing_unknown_id_fails(self) -> None: "stream": False, }, ) - # The handler raises a KeyError when the storage lookup misses; - # the hosting layer surfaces this as a 5xx response. - assert resp.status_code >= 500 + # The handler converts the underlying KeyError into a terminal + # ``response.failed`` event, so non-streaming callers see HTTP 200 + # with status="failed" and a meaningful error message rather than + # a generic 5xx response. + assert resp.status_code == 200 + body = resp.json() + assert body["status"] == "failed" + error = body.get("error") or {} + assert "apr_unknown" in (error.get("message") or "") # endregion @@ -3185,11 +3192,21 @@ async def test_handle_inner_workflow_rejects_malicious_context_id( with patch.object(ResponseContext, "get_input_items", new=AsyncMock(return_value=[])): context = ResponseContext(**kwargs) before = sorted(p.name for p in tmp_path.iterdir()) - with pytest.raises(RuntimeError, match="Invalid checkpoint context id"): - async for _ in server._handle_inner_workflow(request, context): # pyright: ignore[reportPrivateUsage] - pass + # The handler converts the underlying ``RuntimeError`` into a + # terminal ``response.failed`` event whose error message names + # the rejected context id, so the SSE / non-streaming consumer + # observes a well-formed failure rather than a raw exception. + events = [event async for event in server._handle_inner_workflow(request, context)] # pyright: ignore[reportPrivateUsage] after = sorted(p.name for p in tmp_path.iterdir()) + failed = [e for e in events if getattr(e, "type", None) == "response.failed"] + assert len(failed) == 1, ( + f"Expected exactly one response.failed event, got types={[getattr(e, 'type', None) for e in events]}" + ) + response_obj = getattr(failed[0], "response", None) + error = getattr(response_obj, "error", None) if response_obj is not None else None + assert error is not None + assert "Invalid checkpoint context id" in (error.message or "") assert before == after, f"Unexpected filesystem artifacts created for {context_field}={bad_id!r}" assert list(root.iterdir()) == [], f"Checkpoint dir created inside root for {context_field}={bad_id!r}" @@ -3204,7 +3221,8 @@ async def test_handle_inner_workflow_rejects_malicious_context_id( ("previous_response_id", "caresp_x/../../service-data/api-made-dir" + "A" * 14), # Restore sink: server-issued conversation id (defense in depth). # Reaches the checkpoint code and is rejected there, surfacing as - # an HTTP 5xx without creating any filesystem artifacts. + # a terminal ``response.failed`` (HTTP 200, status="failed") + # without creating any filesystem artifacts. ("conversation", "../../escape"), ("conversation", "/tmp/escape-abs"), ], @@ -3254,12 +3272,20 @@ async def test_malicious_context_id_rejected_e2e(self, tmp_path: Any, context_fi resp = await client.post("/responses", json=payload) after = sorted(p.name for p in tmp_path.iterdir()) - # The request must not succeed; either request validation rejects it - # (4xx) or the checkpoint layer raises and the server returns 5xx. - # Either way, no successful response may be produced. - assert resp.status_code >= 400, ( - f"Expected non-2xx for {context_field}={bad_id!r}, got {resp.status_code}: {resp.text[:200]}" - ) + # The request must not succeed: either request validation rejects it + # (HTTP 4xx) before reaching the handler, or the checkpoint layer + # raises and the handler converts the failure into a + # ``response.failed`` terminal event (HTTP 200, status="failed"). + # Either way, no successful response and no filesystem artifacts. + if resp.status_code == 200: + body = resp.json() + assert body.get("status") == "failed", ( + f"Expected status='failed' for {context_field}={bad_id!r}, got {body.get('status')!r}" + ) + else: + assert resp.status_code >= 400, ( + f"Expected non-2xx for {context_field}={bad_id!r}, got {resp.status_code}: {resp.text[:200]}" + ) assert before == after, ( f"Unexpected filesystem artifacts under tmp_path for {context_field}={bad_id!r}: " f"before={before} after={after}" @@ -3445,11 +3471,14 @@ async def test_non_consent_error_during_entry_propagates(self) -> None: resp = await _post(server, input_text="hello", stream=False) # Non-consent errors are not swallowed: the response is marked failed - # and no `oauth_consent_request` item is emitted. + # and no `oauth_consent_request` item is emitted. The exception + # message is propagated to the client via ``error.message``. assert resp.status_code == 200 body = resp.json() assert body["status"] == "failed" assert not any(it["type"] == "oauth_consent_request" for it in body.get("output", [])) + error = body.get("error") or {} + assert error.get("message") == "boom" agent.run.assert_not_called() async def test_retry_after_consent_succeeds(self) -> None: @@ -3477,6 +3506,130 @@ async def test_retry_after_consent_succeeds(self) -> None: agent.run.assert_awaited_once() +# endregion + +# region Error handling (response.failed surfacing) + + +class TestResponseFailedSurfacing: + """Tests that exceptions raised by the hosted agent are converted into + terminal ``response.failed`` events carrying the exception message, + rather than propagating as 5xx HTTP errors or being replaced by the + orchestrator's generic ``"An internal server error occurred."`` + fallback. + """ + + async def test_non_streaming_run_failure_emits_response_failed(self) -> None: + agent = _make_agent( + response=AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text("hi")])]) + ) + + async def _raise(*args: Any, **kwargs: Any) -> AgentResponse: + raise RuntimeError("non-stream kaboom") + + agent.run = AsyncMock(side_effect=_raise) + server = _make_server(agent) + + resp = await _post(server, input_text="hello", stream=False) + + assert resp.status_code == 200 + body = resp.json() + assert body["status"] == "failed" + error = body.get("error") or {} + assert error.get("message") == "non-stream kaboom" + + async def test_streaming_run_failure_emits_response_failed(self) -> None: + async def _raise_stream() -> AsyncIterator[AgentResponseUpdate]: + yield AgentResponseUpdate(contents=[Content.from_text("partial ")], role="assistant") + raise RuntimeError("stream kaboom") + + agent = MagicMock(spec=RawAgent) + agent.id = "test-agent" + agent.name = "Test Agent" + agent.description = "A mock agent for testing" + agent.context_providers = [] + + def run_streaming(*args: Any, **kwargs: Any) -> Any: + if kwargs.get("stream"): + return ResponseStream(_raise_stream()) # type: ignore[arg-type] + raise NotImplementedError("Only streaming is configured on this mock") + + agent.run = MagicMock(side_effect=run_streaming) + server = _make_server(agent) + + resp = await _post(server, input_text="hello", stream=True) + + assert resp.status_code == 200 + events = _parse_sse_events(resp.text) + types = _sse_event_types(events) + assert types[0] == "response.created" + assert types[1] == "response.in_progress" + # Last lifecycle event must be ``response.failed``, never ``response.completed``. + assert types[-1] == "response.failed" + assert "response.completed" not in types + + failed = [e for e in events if e["event"] == "response.failed"] + assert len(failed) == 1 + response_payload = failed[0]["data"].get("response") or {} + error = response_payload.get("error") or {} + assert error.get("message") == "stream kaboom" + + async def test_streaming_run_failure_drains_pending_output_item(self) -> None: + """If a streaming output item was open when the failure happens, the + handler must close it before emitting ``response.failed`` so the SSE + stream stays well-formed (every ``output_item.added`` has a matching + ``output_item.done``). + """ + + async def _raise_stream() -> AsyncIterator[AgentResponseUpdate]: + # Open a text output item, then blow up before it closes. + yield AgentResponseUpdate(contents=[Content.from_text("hello ")], role="assistant") + raise RuntimeError("mid-item kaboom") + + agent = MagicMock(spec=RawAgent) + agent.id = "test-agent" + agent.name = "Test Agent" + agent.description = "A mock agent for testing" + agent.context_providers = [] + + def run_streaming(*args: Any, **kwargs: Any) -> Any: + return ResponseStream(_raise_stream()) # type: ignore[arg-type] + + agent.run = MagicMock(side_effect=run_streaming) + server = _make_server(agent) + + resp = await _post(server, input_text="hello", stream=True) + + assert resp.status_code == 200 + events = _parse_sse_events(resp.text) + types = _sse_event_types(events) + assert types.count("response.output_item.added") == types.count("response.output_item.done") + assert types[-1] == "response.failed" + + async def test_workflow_agent_run_failure_emits_response_failed(self) -> None: + """Exceptions raised by a hosted ``WorkflowAgent`` are converted into a + terminal ``response.failed`` event in the same way as the regular + agent path. + """ + workflow_agent = _build_text_workflow_agent("ignored") + + async def _raise(*args: Any, **kwargs: Any) -> AgentResponse: + raise RuntimeError("workflow kaboom") + + # Patch the public ``run`` to fail. ``_handle_inner_workflow`` only + # invokes the agent once (no checkpoint to restore on a fresh + # request), so this is the call that will raise. + with patch.object(workflow_agent, "run", side_effect=_raise): + server = _make_server(workflow_agent) + resp = await _post(server, input_text="hello", stream=False) + + assert resp.status_code == 200 + body = resp.json() + assert body["status"] == "failed" + error = body.get("error") or {} + assert error.get("message") == "workflow kaboom" + + # endregion # region Workflow agent hosting (end-to-end)