Skip to content

fix(streaming): CoalescingBuffer hogs CPU — idle/orphaned ticker spins at 1/FLUSH_INTERVAL forever#418

Open
eberki-scale wants to merge 8 commits into
nextfrom
endre/improve-buffer
Open

fix(streaming): CoalescingBuffer hogs CPU — idle/orphaned ticker spins at 1/FLUSH_INTERVAL forever#418
eberki-scale wants to merge 8 commits into
nextfrom
endre/improve-buffer

Conversation

@eberki-scale

@eberki-scale eberki-scale commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Summary

CoalescingBuffer (agentex/lib/core/services/adk/streaming.py) runs a per-instance background ticker that wakes every FLUSH_INTERVAL_S (50 ms) whether or not there is buffered data. Because one buffer + ticker is created per streaming task-message context, any buffer that outlives its stream without a clean close() becomes an orphaned task that polls at 20 Hz forever. In a long-lived worker that handles many streaming tasks, these accumulate and ratchet CPU up until a core is saturated, with no memory growth and no log output, clearing only on process restart.

This PR fixes two related issues:

  1. The idle/orphaned ticker spinning at 20 Hz (the CPU hog).
  2. A path that orphans the ticker even on a clean exit: a StreamTaskMessageFull marked the context done without closing the buffer, so close() later short-circuited on _is_closed and never stopped the ticker.

Symptoms

  • Worker CPU climbs steadily with cumulative streamed tasks and never recovers between idle periods (e.g. ~0.04 cores fresh → ~0.55 after a load run → pinned at the CPU limit after sustained traffic). Latency degrades in lockstep as the dead tickers steal the event loop / GIL from live work.
  • Memory stays flat; nothing is logged.
  • A process restart resets it.

Root cause

# CoalescingBuffer._run (before)
while True:
    try:
        await asyncio.wait_for(self._flush_signal.wait(), timeout=self.FLUSH_INTERVAL_S)
    except asyncio.TimeoutError:
        pass
    async with self._lock:
        self._flush_signal.clear()
        drained = self._drain_locked()   # empty when idle
    ...
    if self._closed:
        return

When the buffer is empty and _closed is False, the wait_for times out every 50 ms, drains nothing, and loops — a permanent 20 Hz busy-loop. The loop only exits on _closed, which is set by close(). If close() never runs, or is interrupted by cancellation while awaiting the ticker, the ticker is orphaned and spins indefinitely. With N orphaned/idle buffers, the event loop spends most of its time arming/cancelling TimerHandles.

Separately, StreamingTaskMessageContext.stream_update handled a StreamTaskMessageFull by setting _is_closed = True without closing the buffer. The subsequent __aexit__ → close() then hit its if self._is_closed: return guard and skipped the buffer teardown — leaving a live ticker behind on an otherwise clean stream end.

Evidence (py-spy on an affected worker)

py-spy top:  GIL 69%, Active 73%
 41%  _run (asyncio/events.py)            # event loop running timer callbacks
 22%  _run (.../adk/streaming.py)         # CoalescingBuffer._run
 17%  wait_for (asyncio/tasks.py)
 11%  reschedule/__aenter__ (asyncio/timeouts.py)
  9%  __init__ (asyncio/events.py)        # TimerHandle churn

py-spy dump (event loop thread):
    wait_for (asyncio/tasks.py:506)
    _run (.../adk/streaming.py:190)       # the 50 ms wait_for

CPU was ~linearly proportional to the number of streamed tasks since the last restart; terminating the in-flight tasks/workflows did not release it (the leak is the worker-process asyncio tasks, independent of task/workflow state).

Fix

1. Park-on-idle ticker. Replace the single _flush_signal + fixed-interval wait with two events so the ticker parks at zero CPU when idle:

  • _wake — set by add() only when the buffer goes empty → non-empty. _run blocks on await self._wake.wait() when idle (no polling).
  • _flush_now — set on first delta / size threshold / close → immediate flush, bypassing the coalescing delay.

_run parks on _wake; on wake it flushes immediately if _flush_now is set, otherwise coalesces for up to FLUSH_INTERVAL_S, drains, and exits on _closed. close() additionally force-cancels the ticker if close() itself is cancelled, so it can never be orphaned on the cancellation path.

2. StreamTaskMessageFull closes the buffer. The Full branch now drains and closes the buffer (stopping the ticker) before publishing the Full, and close() reaps the buffer before its _is_closed short-circuit. Two effects:

  • The ticker is always stopped, even when a Full ends the stream — no orphaned task.
  • Leftover buffered deltas publish before the terminal Full (deltas → Full), never after it. Publishing them after would look like a stale duplicate tail to a consumer treating Full as the final message.

Behaviour preserved: first-delta-immediate flush (latency-critical), the MAX_BUFFERED_CHARS early flush, the FLUSH_INTERVAL_S coalescing window for trailing partials, and the exactly-once final drain on close(). The only difference: an idle or orphaned buffer parks instead of polling, and under light streaming (buffer empties between deltas) a delta may flush slightly sooner — coalescing under sustained streaming is unchanged.

Tests

  • New TestCoalescingBufferIdleParks:
    • test_idle_buffer_does_not_spin — no data added → 0 drain cycles over ~8 windows (was ~8).
    • test_orphaned_buffer_parks_after_flush — buffer flushed then never closed0 drain cycles afterward.
  • New TestFullMessageClosesBuffer:
    • test_full_message_stops_ticker — a Full stops the ticker (task is done()); buffer is reaped.
    • test_full_is_terminal_publish_no_trailing_deltas — buffered deltas publish before the Full; the Full is the terminal publish.
    • test_close_reaps_buffer_even_if_already_marked_closedclose() stops the ticker even when _is_closed is already set.
  • Full streaming suite: 36 passed (tests/lib/core/services/adk/test_streaming.py).

Risk

Low. Hot-path semantics (first-flush, size/time coalescing, exactly-once close drain) are covered by the existing tests and unchanged. The change is local to CoalescingBuffer and StreamingTaskMessageContext.

Greptile Summary

This PR fixes a CPU-leak in CoalescingBuffer where the per-instance background ticker polled at 20 Hz even when idle, and a related code path in StreamingTaskMessageContext that could orphan the ticker on a clean stream end. The fix replaces the fixed-interval _flush_signal with a _wake (park-on-idle) / _flush_now (bypass coalescing) dual-event design, and uses asyncio.shield so a cancelled close() can no longer force-cancel the ticker mid-flush.

  • Park-on-idle ticker: _run now blocks at await self._wake.wait() and only runs the coalescing window when add() signals data; orphaned/idle buffers consume zero CPU.
  • StreamTaskMessageFull closes the buffer: deltas are drained and the ticker stopped before the terminal Full is published, fixing both the stale-tail ordering problem and the ticker-orphan-on-clean-exit bug.
  • asyncio.shield for close cancellation: a cancelled close() no longer propagates cancellation into the ticker mid-flush; the ticker finishes its current batch and exits naturally on _closed.

Confidence Score: 5/5

Safe to merge — the fix is local to CoalescingBuffer and StreamingTaskMessageContext, hot-path semantics (first-flush immediacy, coalescing window, exactly-once drain) are unchanged and covered by the existing suite, and the new tests directly exercise all four previously-broken scenarios.

The dual-event design correctly parks the ticker at zero CPU when idle. The asyncio.shield usage is appropriate: it protects the ticker's in-flight batch from a cancelled close() while still propagating CancelledError to the caller. The final drain after shield is safe because _closed=True prevents new additions before the ticker exits, so the buffer is empty when the ticker returns. Terminal-message ordering (deltas before Full/Done, Done published exactly once with its original metadata) is verified end-to-end by the new test suite. No data-loss, ordering, or resource-leak paths were found in the changed code.

No files require special attention — both changed files are well-scoped and thoroughly tested.

Important Files Changed

Filename Overview
src/agentex/lib/core/services/adk/streaming.py Core fix: replaces polling ticker with park-on-idle dual-event design (_wake/_flush_now), closes buffer on Full/Done, uses asyncio.shield to prevent ticker cancellation mid-flush. Logic is correct — idle/orphaned buffers park at zero CPU, deltas always precede terminal messages, and the Done metadata (index/parent) is preserved via the done_event parameter.
tests/lib/core/services/adk/test_streaming.py New test classes cover all four fixed scenarios: idle-park, orphaned-park, cancelled-close data safety, Full-stops-ticker, Full-ordering (deltas before Full), Done-ordering (deltas before Done, exactly-once, index preserved), and close-reaps-buffer-when-already-marked-closed. Tests are behaviorally correct and complete.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant P as Producer (stream_update)
    participant B as CoalescingBuffer
    participant T as Ticker (_run)
    participant S as StreamingService

    Note over T: parked at _wake.wait() (zero CPU)

    P->>B: add(delta₁) [first delta]
    B->>B: _flush_now.set() + _wake.set()
    B-->>T: wakes
    T->>T: _wake.clear()
    T->>T: _flush_now.is_set() → skip coalesce window
    T->>B: _drain_locked() [acquires lock]
    T->>S: _on_flush(delta₁)

    P->>B: add(delta₂)
    Note over B: was_empty=True → _wake.set()
    T->>B: _drain_locked() [next loop]
    T->>S: _on_flush(delta₂)
    T->>T: _closed? No → back to _wake.wait() (parks)

    Note over T: idle — zero CPU, no polling

    P->>P: stream_update(StreamTaskMessageFull)
    P->>B: buffer.close()
    B->>B: "_closed=True, _wake.set(), _flush_now.set()"
    B-->>T: wakes (final pass)
    T->>B: _drain_locked() [empty]
    T->>T: "_closed=True → return"
    B->>B: asyncio.shield(task) resolves
    B->>B: final drain (empty)
    P->>S: stream_update(Full) ← after all deltas

    Note over P,S: Done path: close(done_event=update) drains buffer first, then publishes Done exactly once
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant P as Producer (stream_update)
    participant B as CoalescingBuffer
    participant T as Ticker (_run)
    participant S as StreamingService

    Note over T: parked at _wake.wait() (zero CPU)

    P->>B: add(delta₁) [first delta]
    B->>B: _flush_now.set() + _wake.set()
    B-->>T: wakes
    T->>T: _wake.clear()
    T->>T: _flush_now.is_set() → skip coalesce window
    T->>B: _drain_locked() [acquires lock]
    T->>S: _on_flush(delta₁)

    P->>B: add(delta₂)
    Note over B: was_empty=True → _wake.set()
    T->>B: _drain_locked() [next loop]
    T->>S: _on_flush(delta₂)
    T->>T: _closed? No → back to _wake.wait() (parks)

    Note over T: idle — zero CPU, no polling

    P->>P: stream_update(StreamTaskMessageFull)
    P->>B: buffer.close()
    B->>B: "_closed=True, _wake.set(), _flush_now.set()"
    B-->>T: wakes (final pass)
    T->>B: _drain_locked() [empty]
    T->>T: "_closed=True → return"
    B->>B: asyncio.shield(task) resolves
    B->>B: final drain (empty)
    P->>S: stream_update(Full) ← after all deltas

    Note over P,S: Done path: close(done_event=update) drains buffer first, then publishes Done exactly once
Loading

Comments Outside Diff (2)

  1. src/agentex/lib/core/services/adk/streaming.py, line 514-515 (link)

    P1 Drain Before Done

    This Done branch still publishes the terminal update before reaping the coalesced buffer. When a provider emits StreamTaskMessageDone while a delta is still inside the 50ms coalescing window, callers receive Done, then the buffered delta from close(), then the Done created by close(). A consumer that treats Done as terminal can therefore see stale trailing content and duplicate terminal events. The Done path needs the same pre-terminal buffer drain ordering as the Full path.

    Artifacts

    Repro: focused pytest harness for Done before coalesced buffer drain

    • Contains supporting evidence from the run (text/x-python; charset=utf-8).

    Repro: verbose pytest output showing publish order and duplicate Done

    • Keeps the command output available without making the summary code-heavy.

    View artifacts

    T-Rex Ran code and verified through T-Rex

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: src/agentex/lib/core/services/adk/streaming.py
    Line: 514-515
    
    Comment:
    **Drain Before Done**
    
    This `Done` branch still publishes the terminal update before reaping the coalesced buffer. When a provider emits `StreamTaskMessageDone` while a delta is still inside the 50ms coalescing window, callers receive `Done`, then the buffered delta from `close()`, then the `Done` created by `close()`. A consumer that treats `Done` as terminal can therefore see stale trailing content and duplicate terminal events. The `Done` path needs the same pre-terminal buffer drain ordering as the `Full` path.
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Cursor Fix in Claude Code Fix in Codex

  2. src/agentex/lib/core/services/adk/streaming.py, line 518-519 (link)

    P1 Done can precede deltas

    StreamTaskMessageDone still publishes the terminal event before close() drains the coalescing buffer. When a coalesced stream has already used the first immediate flush, then receives a small buffered delta followed by an explicit Done before the 50ms window expires, consumers can see Done -> delta -> Done or at least a delta after terminal Done. Drain and close the buffer before publishing the incoming Done, and avoid sending a second Done from close().

    Artifacts

    Repro: focused async pytest exercising explicit Done before coalescing window drains

    • Contains supporting evidence from the run (text/x-python; charset=utf-8).

    Repro: verbose pytest failure output with captured published order

    • Keeps the command output available without making the summary code-heavy.

    View artifacts

    T-Rex Ran code and verified through T-Rex

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: src/agentex/lib/core/services/adk/streaming.py
    Line: 518-519
    
    Comment:
    **Done can precede deltas**
    
    `StreamTaskMessageDone` still publishes the terminal event before `close()` drains the coalescing buffer. When a coalesced stream has already used the first immediate flush, then receives a small buffered delta followed by an explicit Done before the 50ms window expires, consumers can see `Done -> delta -> Done` or at least a delta after terminal Done. Drain and close the buffer before publishing the incoming Done, and avoid sending a second Done from `close()`.
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Cursor Fix in Claude Code Fix in Codex

Reviews (4): Last reviewed commit: "fix(streaming): preserve Done metadata a..." | Re-trigger Greptile

@eberki-scale eberki-scale changed the base branch from main to next June 22, 2026 08:50
@eberki-scale eberki-scale marked this pull request as ready for review June 22, 2026 09:08
eberki-scale and others added 2 commits June 22, 2026 11:15
The _closed-after-drain and close() exit comments are unchanged from next
and should not have been shortened.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The Done path published the terminal update before reaping the buffer, so a
delta still in the coalescing window arrived after Done, and close() emitted a
second Done — consumers saw a stale trailing delta and a duplicate terminal.
Drain/stop the buffer before the terminal for Done as well as Full, and let
close() be the sole Done emitter so it publishes exactly once.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Comment thread src/agentex/lib/core/services/adk/streaming.py
Comment thread src/agentex/lib/core/services/adk/streaming.py Outdated
… close

Two issues flagged in review:

- The Done dedupe routed the streamed Done through close(), which synthesized a
  fresh StreamTaskMessageDone and dropped caller fields like `index`. close()
  now publishes the caller's Done as-is when provided, synthesizing only for
  implicit (__aexit__) closes.
- CoalescingBuffer.close() awaited the ticker bare; a cancelled close()
  propagates the cancel into the ticker mid-flush, _run swallows it, and an
  already-drained batch is lost. Shield the await so the ticker finishes its
  in-flight flush and exits on _closed while the cancel still propagates.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@eberki-scale eberki-scale changed the title fix(streaming): CoalescingBuffer leaks CPU — idle/orphaned ticker spins at 1/FLUSH_INTERVAL forever fix(streaming): CoalescingBuffer hogs CPU — idle/orphaned ticker spins at 1/FLUSH_INTERVAL forever Jun 22, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant