feat(codex): event-stream parser tap for the unified harness surface#421
Merged
Conversation
2e820c7 to
37421b6
Compare
772c593 to
fb58d07
Compare
37421b6 to
df3461c
Compare
e12a09f to
af856a0
Compare
ccbd5cf to
e3fa1cc
Compare
fbbfc68 to
f3f1cd6
Compare
danielmillerp
approved these changes
Jun 22, 2026
| ) as turn_span: | ||
| start_ms = int(time.monotonic() * 1000) | ||
|
|
||
| process = await _spawn_codex(MODEL, thread_id=state.codex_thread_id) |
Contributor
There was a problem hiding this comment.
is this a new integration?
c8c63d1 to
05120f3
Compare
7e296bd to
fe65135
Compare
05120f3 to
c9a907c
Compare
fe65135 to
408c7d5
Compare
c9a907c to
a04bf5e
Compare
Base automatically changed from
declan-scale/agx1-373-conformance-equivalence
to
next
June 22, 2026 20:09
408c7d5 to
8057584
Compare
…(convert_codex_to_agentex_events + CodexTurn) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…surface via local CLI subprocess
Adds three tutorial agents that demonstrate the full
convert_codex_to_agentex_events tap + CodexTurn + UnifiedEmitter
pipeline using a plain local asyncio subprocess (no Scale sandbox):
examples/tutorials/00_sync/harness_codex/ — sync HTTP-yield
examples/tutorials/10_async/00_base/harness_codex/ — async Redis-streaming
examples/tutorials/10_async/10_temporal/harness_codex/ — Temporal-durable
Each agent:
- Spawns `codex exec --json` via asyncio.create_subprocess_exec with an
injectable `_spawn` seam for offline testing.
- Wraps the stdout line iterator in CodexTurn.
- Delivers via the unified surface:
sync → emitter.yield_turn(turn)
async → emitter.auto_send_turn(turn)
temporal → emitter.auto_send_turn(turn, created_at=workflow.now())
Each agent ships:
- project/acp.py (+ workflow.py + run_worker.py for temporal)
- manifest.yaml / pyproject.toml / Dockerfile / README.md
- tests/test_agent.py with offline unit tests (fake async-iterator, no
real CLI) + live-gated integration tests (CODEX_LIVE_TESTS=1)
- conftest.py that wires sys.path + minimal env vars for offline runs
The three manifest.yaml files are auto-discovered by the CI
agentex-tutorials-test.yml `find . -name "manifest.yaml"` sweep.
Live codex runs need `codex` CLI + OPENAI_API_KEY on the test runner.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Three correctness fixes flagged by greptile:
- CodexTurn.events corrupted usage on a second access: _raw_events is a
single-consumption AsyncIterator, so re-wrapping it produced an already-
exhausted stream that fired on_result with zeros and clobbered _result. Cache
the generator (created at most once); on_result now fires exactly once.
- _codex_sync fallback tool_call_id diverged between request and response when
item_id was empty: it was recomputed from tool_call_count, which advances
between item.started and item.completed (codex_tool_1 vs codex_tool_2),
breaking request/response pairing. Remember the id per item and reuse it;
free the mapping on completion so a later empty-id item gets a fresh id.
- _tool_args_for dropped collab_tool_call arguments (fell through to {}). Extract
an arguments dict when the payload carries one (mirrors mcp_tool_call),
otherwise {} (no regression for payloads without arguments).
Adds regression tests for the events double-access and the empty-item_id pairing.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The sync and async-base harness_codex tutorials computed duration_ms before the stream was consumed, so TurnUsage.duration_ms captured only subprocess spawn overhead, not inference time — monitoring on that field would show wrong data. Make CodexTurn.duration_ms / cost_usd public mutable attributes (the true wall-clock is only known after the stream ends), and set turn.duration_ms after process.wait() in both tutorials so it measures the full turn. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…eptile] - Add CodexTurn.session_id property so multi-turn callers resume sessions without reaching into the private _result dict; update the async-base and temporal tutorials to use it. - Fix the CI pyright failure in test_codex_sync.py: pull tool_call_id inside the isinstance comprehension so the content-union narrowing holds (a later attribute access on req[0].content did not narrow, tripping reportAttribute). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
conftest set AGENT_NAME to "<name>-test" while the manifest registers "<name>", so the live test queried a non-existent agent and got 404. Drop the override for the sync and async-base tutorials (the test's own default already matches the manifest). For the temporal tutorial set it to the manifest name instead of removing it, since project/workflow.py reads AGENT_NAME at import time. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…rrors - The async-base and temporal live tests used the sync send_message pattern, which returns an empty result for async (event-driven) agents (assert 0 >= 1). Rewrite to create_task + send_event + poll task messages, matching how these agents actually handle events (on_task_event_send / RECEIVE_EVENT signal). - _tool_output_for: route collab_tool_call through the same error/result handling as mcp_tool_call so a failed collab call is reported with is_error=True instead of a generic success dump [greptile]. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The temporal tutorial spawned codex exec directly in the workflow signal handler. Temporal runs signal-handler bodies on its deterministic sandbox event loop, which does not implement asyncio.create_subprocess_exec — so the worker crashed with NotImplementedError. (Replay was never the issue, contrary to the old docstring; the sandboxed loop is.) Move the subprocess + CodexTurn + UnifiedEmitter.auto_send_turn into a new run_codex_turn activity (project/activities.py), register it on the worker, and have the signal handler delegate via workflow.execute_activity. The activity returns final_text + session_id so codex-thread resume still works. Offline tests now cover both the workflow's state update and the activity directly. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… [greptile] Greptile flagged a deadlock: all three codex tutorials spawned `codex exec` with stderr=PIPE but never read it, so once codex filled the OS pipe buffer (~64 KB) the subprocess stalled and the agent hung. Redirect stderr to DEVNULL (codex --json emits events on stdout; stderr is progress noise). Also fix UTF-8 corruption in _process_stdout: decoding each 4 KB read independently splits multibyte characters at chunk boundaries. Use an incremental UTF-8 decoder so boundary-split characters decode correctly. Applied to 00_sync, 10_async/00_base (acp.py) and 10_async/10_temporal (activities.py). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
42aa037 to
7d2c41f
Compare
TurnUsage.num_llm_calls is Optional on next (None = "not reported"). The openai / pydantic-ai turn tests still asserted == 0 for the default (no-usage / pre-exhaustion) path, which now yields None. Update those three assertions to `is None`; real-zero cases are unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
_CodexEventProcessorto a pure SDK tap:convert_codex_to_agentex_events+CodexTurn+ usage normalization (codex_usage_to_turn_usage)_CodexStreamProcessorin_codex_sync.pymaps all codex exec JSON event types (agent_message,reasoning,command_execution,file_change,mcp_tool_call,web_search,todo_list,collab_tool_call, error events) to canonicalStreamTaskMessage*eventsCodexTurnin_codex_turn.pywraps the parser as aHarnessTurnso callers can pass it directly toUnifiedEmitter.yield_turn/UnifiedEmitter.auto_send_turntest_codex_conformance.py(4 fixtures: text, tool, reasoning, multi-step)🤖 Generated with Claude Code
Greptile Summary
This PR ports the golden agent's
_CodexEventProcessorinto a reusable SDK tap:_CodexStreamProcessor/convert_codex_to_agentex_eventsmaps everycodex exec --jsonevent type to canonicalStreamTaskMessage*events,CodexTurnwraps it as aHarnessTurn, andcodex_usage_to_turn_usagenormalises OpenAI-style token counts. Three tutorial variants (sync, async-base, Temporal) demonstrate the tap against a live subprocess, and a cross-channel conformance suite verifies deterministic span derivation alongside other harness taps._codex_sync.py: stateful_CodexStreamProcessorhandlesagent_message(delta streaming),reasoning, all tool types (command_execution,file_change,mcp_tool_call,web_search,todo_list,collab_tool_call), and error events; previous review issues withtool_call_idconsistency andcollab_tool_callargs/errors are now resolved._codex_turn.py:CodexTurncaches the events generator so_on_resultfires exactly once;codex_usage_to_turn_usagedefensively distinguishesNone("not reported") from explicit0.num_llm_callsintest_openai_turn.pyandtest_pydantic_ai_turn.pyare updated from== 0tois Nonefor the pre-exhaustion case, aligning withTurnUsage'sNone-default semantics.Confidence Score: 5/5
Safe to merge; the core parser and turn wrapper are well-tested and the three previous review issues are all addressed.
The parser correctly handles all documented codex event types, the generator-caching fix prevents stale usage data, and the tool_call_id consistency fix prevents mismatched request/response pairs. The only gaps are the web_search/todo_list error-path question (needs clarification, not a known breakage) and the missing try/finally in the Temporal activity (tutorial code that degrades gracefully in practice).
src/agentex/lib/adk/_modules/_codex_sync.py for the web_search/todo_list error payload question; examples/tutorials/10_async/10_temporal/harness_codex/project/activities.py for the missing try/finally around auto_send_turn.
Important Files Changed
Sequence Diagram
%%{init: {'theme': 'neutral'}}%% sequenceDiagram participant Caller as Caller (ACP handler / Activity) participant CT as CodexTurn participant CS as convert_codex_to_agentex_events participant CSP as _CodexStreamProcessor participant UE as UnifiedEmitter Caller->>CT: "CodexTurn(events=stdout_lines, model=)" Caller->>UE: yield_turn(turn) / auto_send_turn(turn) UE->>CT: async for msg in turn.events CT->>CS: create generator (once, cached) loop per raw event line CS->>CS: JSON parse / pass-through dict CS->>CSP: process(evt) CSP-->>CS: "list[StreamTaskMessage*]" CS-->>UE: "yield StreamTaskMessage*" UE-->>Caller: forward / push to Redis end Note over CS,CSP: turn.completed - _pending_usage captured CS->>CT: "on_result({usage, session_id, counts})" CT->>CT: "self._result = result" Caller->>CT: turn.usage() CT->>CT: codex_usage_to_turn_usage(_result) CT-->>Caller: TurnUsage%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%% sequenceDiagram participant Caller as Caller (ACP handler / Activity) participant CT as CodexTurn participant CS as convert_codex_to_agentex_events participant CSP as _CodexStreamProcessor participant UE as UnifiedEmitter Caller->>CT: "CodexTurn(events=stdout_lines, model=)" Caller->>UE: yield_turn(turn) / auto_send_turn(turn) UE->>CT: async for msg in turn.events CT->>CS: create generator (once, cached) loop per raw event line CS->>CS: JSON parse / pass-through dict CS->>CSP: process(evt) CSP-->>CS: "list[StreamTaskMessage*]" CS-->>UE: "yield StreamTaskMessage*" UE-->>Caller: forward / push to Redis end Note over CS,CSP: turn.completed - _pending_usage captured CS->>CT: "on_result({usage, session_id, counts})" CT->>CT: "self._result = result" Caller->>CT: turn.usage() CT->>CT: codex_usage_to_turn_usage(_result) CT-->>Caller: TurnUsageReviews (12): Last reviewed commit: "test(harness): num_llm_calls is None (no..." | Re-trigger Greptile