Skip to content

feat(codex): event-stream parser tap for the unified harness surface#421

Merged
declan-scale merged 10 commits into
nextfrom
declan-scale/pr8-codex
Jun 22, 2026
Merged

feat(codex): event-stream parser tap for the unified harness surface#421
declan-scale merged 10 commits into
nextfrom
declan-scale/pr8-codex

Conversation

@declan-scale

@declan-scale declan-scale commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Summary

  • Ports the golden agent's _CodexEventProcessor to a pure SDK tap: convert_codex_to_agentex_events + CodexTurn + usage normalization (codex_usage_to_turn_usage)
  • _CodexStreamProcessor in _codex_sync.py maps all codex exec JSON event types (agent_message, reasoning, command_execution, file_change, mcp_tool_call, web_search, todo_list, collab_tool_call, error events) to canonical StreamTaskMessage* events
  • CodexTurn in _codex_turn.py wraps the parser as a HarnessTurn so callers can pass it directly to UnifiedEmitter.yield_turn / UnifiedEmitter.auto_send_turn
  • Cross-channel conformance fixtures added in test_codex_conformance.py (4 fixtures: text, tool, reasoning, multi-step)
  • Orchestration (subprocess management, sandbox provisioning, secret injection, MCP config) remains in the golden agent — this module is a pure library tap with no deployable agent

🤖 Generated with Claude Code

Greptile Summary

This PR ports the golden agent's _CodexEventProcessor into a reusable SDK tap: _CodexStreamProcessor / convert_codex_to_agentex_events maps every codex exec --json event type to canonical StreamTaskMessage* events, CodexTurn wraps it as a HarnessTurn, and codex_usage_to_turn_usage normalises OpenAI-style token counts. Three tutorial variants (sync, async-base, Temporal) demonstrate the tap against a live subprocess, and a cross-channel conformance suite verifies deterministic span derivation alongside other harness taps.

  • _codex_sync.py: stateful _CodexStreamProcessor handles agent_message (delta streaming), reasoning, all tool types (command_execution, file_change, mcp_tool_call, web_search, todo_list, collab_tool_call), and error events; previous review issues with tool_call_id consistency and collab_tool_call args/errors are now resolved.
  • _codex_turn.py: CodexTurn caches the events generator so _on_result fires exactly once; codex_usage_to_turn_usage defensively distinguishes None ("not reported") from explicit 0.
  • Test assertions for num_llm_calls in test_openai_turn.py and test_pydantic_ai_turn.py are updated from == 0 to is None for the pre-exhaustion case, aligning with TurnUsage's None-default semantics.

Confidence Score: 5/5

Safe to merge; the core parser and turn wrapper are well-tested and the three previous review issues are all addressed.

The parser correctly handles all documented codex event types, the generator-caching fix prevents stale usage data, and the tool_call_id consistency fix prevents mismatched request/response pairs. The only gaps are the web_search/todo_list error-path question (needs clarification, not a known breakage) and the missing try/finally in the Temporal activity (tutorial code that degrades gracefully in practice).

src/agentex/lib/adk/_modules/_codex_sync.py for the web_search/todo_list error payload question; examples/tutorials/10_async/10_temporal/harness_codex/project/activities.py for the missing try/finally around auto_send_turn.

Important Files Changed

Filename Overview
src/agentex/lib/adk/_modules/_codex_sync.py Core event-stream parser tap: maps all codex exec JSON event types to canonical StreamTaskMessage* events; previous review issues (tool_call_id drift, collab_tool_call args/errors) are now fixed; web_search/todo_list error handling falls through to generic is_error=False path.
src/agentex/lib/adk/_modules/_codex_turn.py CodexTurn HarnessTurn implementation: generator caching, on_result wiring, and codex_usage_to_turn_usage normalization all look correct; previous reviewer's events-property caching issue is resolved.
examples/tutorials/10_async/10_temporal/harness_codex/project/activities.py Temporal activity for codex turn: process spawning and CodexTurn wiring look correct but process.wait() is not guarded in try/finally, which could orphan subprocesses on activity failure/retry.
tests/lib/adk/test_codex_sync.py Comprehensive offline tests covering text, tool, reasoning, error, edge-case, on_result, and multi-step scenarios including the new empty-item-id request/response ID match test.
tests/lib/adk/test_codex_turn.py Thorough tests for CodexTurn protocol compliance, usage normalization, generator stability, and on_result wiring; events_property_stable_across_accesses explicitly validates the caching fix.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Caller as Caller (ACP handler / Activity)
    participant CT as CodexTurn
    participant CS as convert_codex_to_agentex_events
    participant CSP as _CodexStreamProcessor
    participant UE as UnifiedEmitter

    Caller->>CT: "CodexTurn(events=stdout_lines, model=)"
    Caller->>UE: yield_turn(turn) / auto_send_turn(turn)
    UE->>CT: async for msg in turn.events
    CT->>CS: create generator (once, cached)
    loop per raw event line
        CS->>CS: JSON parse / pass-through dict
        CS->>CSP: process(evt)
        CSP-->>CS: "list[StreamTaskMessage*]"
        CS-->>UE: "yield StreamTaskMessage*"
        UE-->>Caller: forward / push to Redis
    end
    Note over CS,CSP: turn.completed - _pending_usage captured
    CS->>CT: "on_result({usage, session_id, counts})"
    CT->>CT: "self._result = result"
    Caller->>CT: turn.usage()
    CT->>CT: codex_usage_to_turn_usage(_result)
    CT-->>Caller: TurnUsage
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Caller as Caller (ACP handler / Activity)
    participant CT as CodexTurn
    participant CS as convert_codex_to_agentex_events
    participant CSP as _CodexStreamProcessor
    participant UE as UnifiedEmitter

    Caller->>CT: "CodexTurn(events=stdout_lines, model=)"
    Caller->>UE: yield_turn(turn) / auto_send_turn(turn)
    UE->>CT: async for msg in turn.events
    CT->>CS: create generator (once, cached)
    loop per raw event line
        CS->>CS: JSON parse / pass-through dict
        CS->>CSP: process(evt)
        CSP-->>CS: "list[StreamTaskMessage*]"
        CS-->>UE: "yield StreamTaskMessage*"
        UE-->>Caller: forward / push to Redis
    end
    Note over CS,CSP: turn.completed - _pending_usage captured
    CS->>CT: "on_result({usage, session_id, counts})"
    CT->>CT: "self._result = result"
    Caller->>CT: turn.usage()
    CT->>CT: codex_usage_to_turn_usage(_result)
    CT-->>Caller: TurnUsage
Loading

Reviews (12): Last reviewed commit: "test(harness): num_llm_calls is None (no..." | Re-trigger Greptile

@declan-scale declan-scale force-pushed the declan-scale/agx1-373-conformance-equivalence branch from 2e820c7 to 37421b6 Compare June 22, 2026 13:48
@declan-scale declan-scale force-pushed the declan-scale/pr8-codex branch from 772c593 to fb58d07 Compare June 22, 2026 13:48
Comment thread src/agentex/lib/adk/_modules/_codex_turn.py Outdated
Comment thread src/agentex/lib/adk/_modules/_codex_sync.py Outdated
Comment thread src/agentex/lib/adk/_modules/_codex_sync.py
@declan-scale declan-scale force-pushed the declan-scale/agx1-373-conformance-equivalence branch from 37421b6 to df3461c Compare June 22, 2026 14:13
@declan-scale declan-scale force-pushed the declan-scale/pr8-codex branch 2 times, most recently from e12a09f to af856a0 Compare June 22, 2026 14:37
@declan-scale declan-scale force-pushed the declan-scale/agx1-373-conformance-equivalence branch from ccbd5cf to e3fa1cc Compare June 22, 2026 15:14
@declan-scale declan-scale force-pushed the declan-scale/pr8-codex branch 2 times, most recently from fbbfc68 to f3f1cd6 Compare June 22, 2026 15:59
) as turn_span:
start_ms = int(time.monotonic() * 1000)

process = await _spawn_codex(MODEL, thread_id=state.codex_thread_id)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a new integration?

@declan-scale declan-scale force-pushed the declan-scale/agx1-373-conformance-equivalence branch from c8c63d1 to 05120f3 Compare June 22, 2026 18:47
@declan-scale declan-scale force-pushed the declan-scale/pr8-codex branch from 7e296bd to fe65135 Compare June 22, 2026 18:47
@declan-scale declan-scale force-pushed the declan-scale/agx1-373-conformance-equivalence branch from 05120f3 to c9a907c Compare June 22, 2026 19:54
@declan-scale declan-scale force-pushed the declan-scale/pr8-codex branch from fe65135 to 408c7d5 Compare June 22, 2026 19:54
@declan-scale declan-scale force-pushed the declan-scale/agx1-373-conformance-equivalence branch from c9a907c to a04bf5e Compare June 22, 2026 20:01
Comment thread src/agentex/lib/adk/_modules/_codex_sync.py
Base automatically changed from declan-scale/agx1-373-conformance-equivalence to next June 22, 2026 20:09
@declan-scale declan-scale force-pushed the declan-scale/pr8-codex branch from 408c7d5 to 8057584 Compare June 22, 2026 20:11
declan-scale and others added 9 commits June 22, 2026 18:28
…(convert_codex_to_agentex_events + CodexTurn)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…surface via local CLI subprocess

Adds three tutorial agents that demonstrate the full
convert_codex_to_agentex_events tap + CodexTurn + UnifiedEmitter
pipeline using a plain local asyncio subprocess (no Scale sandbox):

  examples/tutorials/00_sync/harness_codex/       — sync HTTP-yield
  examples/tutorials/10_async/00_base/harness_codex/  — async Redis-streaming
  examples/tutorials/10_async/10_temporal/harness_codex/ — Temporal-durable

Each agent:
- Spawns `codex exec --json` via asyncio.create_subprocess_exec with an
  injectable `_spawn` seam for offline testing.
- Wraps the stdout line iterator in CodexTurn.
- Delivers via the unified surface:
    sync      → emitter.yield_turn(turn)
    async     → emitter.auto_send_turn(turn)
    temporal  → emitter.auto_send_turn(turn, created_at=workflow.now())

Each agent ships:
- project/acp.py (+ workflow.py + run_worker.py for temporal)
- manifest.yaml / pyproject.toml / Dockerfile / README.md
- tests/test_agent.py with offline unit tests (fake async-iterator, no
  real CLI) + live-gated integration tests (CODEX_LIVE_TESTS=1)
- conftest.py that wires sys.path + minimal env vars for offline runs

The three manifest.yaml files are auto-discovered by the CI
agentex-tutorials-test.yml `find . -name "manifest.yaml"` sweep.
Live codex runs need `codex` CLI + OPENAI_API_KEY on the test runner.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Three correctness fixes flagged by greptile:

- CodexTurn.events corrupted usage on a second access: _raw_events is a
  single-consumption AsyncIterator, so re-wrapping it produced an already-
  exhausted stream that fired on_result with zeros and clobbered _result. Cache
  the generator (created at most once); on_result now fires exactly once.

- _codex_sync fallback tool_call_id diverged between request and response when
  item_id was empty: it was recomputed from tool_call_count, which advances
  between item.started and item.completed (codex_tool_1 vs codex_tool_2),
  breaking request/response pairing. Remember the id per item and reuse it;
  free the mapping on completion so a later empty-id item gets a fresh id.

- _tool_args_for dropped collab_tool_call arguments (fell through to {}). Extract
  an arguments dict when the payload carries one (mirrors mcp_tool_call),
  otherwise {} (no regression for payloads without arguments).

Adds regression tests for the events double-access and the empty-item_id pairing.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The sync and async-base harness_codex tutorials computed duration_ms before the
stream was consumed, so TurnUsage.duration_ms captured only subprocess spawn
overhead, not inference time — monitoring on that field would show wrong data.
Make CodexTurn.duration_ms / cost_usd public mutable attributes (the true
wall-clock is only known after the stream ends), and set turn.duration_ms after
process.wait() in both tutorials so it measures the full turn.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…eptile]

- Add CodexTurn.session_id property so multi-turn callers resume sessions
  without reaching into the private _result dict; update the async-base and
  temporal tutorials to use it.
- Fix the CI pyright failure in test_codex_sync.py: pull tool_call_id inside the
  isinstance comprehension so the content-union narrowing holds (a later
  attribute access on req[0].content did not narrow, tripping reportAttribute).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
conftest set AGENT_NAME to "<name>-test" while the manifest registers "<name>",
so the live test queried a non-existent agent and got 404. Drop the override
for the sync and async-base tutorials (the test's own default already matches
the manifest). For the temporal tutorial set it to the manifest name instead
of removing it, since project/workflow.py reads AGENT_NAME at import time.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…rrors

- The async-base and temporal live tests used the sync send_message pattern,
  which returns an empty result for async (event-driven) agents (assert 0 >= 1).
  Rewrite to create_task + send_event + poll task messages, matching how these
  agents actually handle events (on_task_event_send / RECEIVE_EVENT signal).
- _tool_output_for: route collab_tool_call through the same error/result
  handling as mcp_tool_call so a failed collab call is reported with
  is_error=True instead of a generic success dump [greptile].

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The temporal tutorial spawned codex exec directly in the workflow signal
handler. Temporal runs signal-handler bodies on its deterministic sandbox event
loop, which does not implement asyncio.create_subprocess_exec — so the worker
crashed with NotImplementedError. (Replay was never the issue, contrary to the
old docstring; the sandboxed loop is.)

Move the subprocess + CodexTurn + UnifiedEmitter.auto_send_turn into a new
run_codex_turn activity (project/activities.py), register it on the worker, and
have the signal handler delegate via workflow.execute_activity. The activity
returns final_text + session_id so codex-thread resume still works. Offline
tests now cover both the workflow's state update and the activity directly.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… [greptile]

Greptile flagged a deadlock: all three codex tutorials spawned `codex exec`
with stderr=PIPE but never read it, so once codex filled the OS pipe buffer
(~64 KB) the subprocess stalled and the agent hung. Redirect stderr to DEVNULL
(codex --json emits events on stdout; stderr is progress noise).

Also fix UTF-8 corruption in _process_stdout: decoding each 4 KB read
independently splits multibyte characters at chunk boundaries. Use an
incremental UTF-8 decoder so boundary-split characters decode correctly.

Applied to 00_sync, 10_async/00_base (acp.py) and 10_async/10_temporal
(activities.py).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@declan-scale declan-scale force-pushed the declan-scale/pr8-codex branch from 42aa037 to 7d2c41f Compare June 22, 2026 22:29
TurnUsage.num_llm_calls is Optional on next (None = "not reported"). The
openai / pydantic-ai turn tests still asserted == 0 for the default (no-usage /
pre-exhaustion) path, which now yields None. Update those three assertions to
`is None`; real-zero cases are unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@declan-scale declan-scale merged commit 9b2b031 into next Jun 22, 2026
62 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants