fix(streaming): CoalescingBuffer hogs CPU — idle/orphaned ticker spins at 1/FLUSH_INTERVAL forever#418
Open
eberki-scale wants to merge 8 commits into
Open
fix(streaming): CoalescingBuffer hogs CPU — idle/orphaned ticker spins at 1/FLUSH_INTERVAL forever#418eberki-scale wants to merge 8 commits into
eberki-scale wants to merge 8 commits into
Conversation
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The _closed-after-drain and close() exit comments are unchanged from next and should not have been shortened. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The Done path published the terminal update before reaping the buffer, so a delta still in the coalescing window arrived after Done, and close() emitted a second Done — consumers saw a stale trailing delta and a duplicate terminal. Drain/stop the buffer before the terminal for Done as well as Full, and let close() be the sole Done emitter so it publishes exactly once. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… close Two issues flagged in review: - The Done dedupe routed the streamed Done through close(), which synthesized a fresh StreamTaskMessageDone and dropped caller fields like `index`. close() now publishes the caller's Done as-is when provided, synthesizing only for implicit (__aexit__) closes. - CoalescingBuffer.close() awaited the ticker bare; a cancelled close() propagates the cancel into the ticker mid-flush, _run swallows it, and an already-drained batch is lost. Shield the await so the ticker finishes its in-flight flush and exits on _closed while the cancel still propagates. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
CoalescingBuffer(agentex/lib/core/services/adk/streaming.py) runs a per-instance background ticker that wakes everyFLUSH_INTERVAL_S(50 ms) whether or not there is buffered data. Because one buffer + ticker is created per streaming task-message context, any buffer that outlives its stream without a cleanclose()becomes an orphaned task that polls at 20 Hz forever. In a long-lived worker that handles many streaming tasks, these accumulate and ratchet CPU up until a core is saturated, with no memory growth and no log output, clearing only on process restart.This PR fixes two related issues:
StreamTaskMessageFullmarked the context done without closing the buffer, soclose()later short-circuited on_is_closedand never stopped the ticker.Symptoms
Root cause
When the buffer is empty and
_closedisFalse, thewait_fortimes out every 50 ms, drains nothing, and loops — a permanent 20 Hz busy-loop. The loop only exits on_closed, which is set byclose(). Ifclose()never runs, or is interrupted by cancellation while awaiting the ticker, the ticker is orphaned and spins indefinitely. With N orphaned/idle buffers, the event loop spends most of its time arming/cancellingTimerHandles.Separately,
StreamingTaskMessageContext.stream_updatehandled aStreamTaskMessageFullby setting_is_closed = Truewithout closing the buffer. The subsequent__aexit__ → close()then hit itsif self._is_closed: returnguard and skipped the buffer teardown — leaving a live ticker behind on an otherwise clean stream end.Evidence (py-spy on an affected worker)
CPU was ~linearly proportional to the number of streamed tasks since the last restart; terminating the in-flight tasks/workflows did not release it (the leak is the worker-process asyncio tasks, independent of task/workflow state).
Fix
1. Park-on-idle ticker. Replace the single
_flush_signal+ fixed-interval wait with two events so the ticker parks at zero CPU when idle:_wake— set byadd()only when the buffer goes empty → non-empty._runblocks onawait self._wake.wait()when idle (no polling)._flush_now— set on first delta / size threshold / close → immediate flush, bypassing the coalescing delay._runparks on_wake; on wake it flushes immediately if_flush_nowis set, otherwise coalesces for up toFLUSH_INTERVAL_S, drains, and exits on_closed.close()additionally force-cancels the ticker ifclose()itself is cancelled, so it can never be orphaned on the cancellation path.2.
StreamTaskMessageFullcloses the buffer. TheFullbranch now drains and closes the buffer (stopping the ticker) before publishing theFull, andclose()reaps the buffer before its_is_closedshort-circuit. Two effects:Fullends the stream — no orphaned task.Full(deltas → Full), never after it. Publishing them after would look like a stale duplicate tail to a consumer treatingFullas the final message.Behaviour preserved: first-delta-immediate flush (latency-critical), the
MAX_BUFFERED_CHARSearly flush, theFLUSH_INTERVAL_Scoalescing window for trailing partials, and the exactly-once final drain onclose(). The only difference: an idle or orphaned buffer parks instead of polling, and under light streaming (buffer empties between deltas) a delta may flush slightly sooner — coalescing under sustained streaming is unchanged.Tests
TestCoalescingBufferIdleParks:test_idle_buffer_does_not_spin— no data added → 0 drain cycles over ~8 windows (was ~8).test_orphaned_buffer_parks_after_flush— buffer flushed then never closed → 0 drain cycles afterward.TestFullMessageClosesBuffer:test_full_message_stops_ticker— aFullstops the ticker (task isdone()); buffer is reaped.test_full_is_terminal_publish_no_trailing_deltas— buffered deltas publish before theFull; theFullis the terminal publish.test_close_reaps_buffer_even_if_already_marked_closed—close()stops the ticker even when_is_closedis already set.tests/lib/core/services/adk/test_streaming.py).Risk
Low. Hot-path semantics (first-flush, size/time coalescing, exactly-once close drain) are covered by the existing tests and unchanged. The change is local to
CoalescingBufferandStreamingTaskMessageContext.Greptile Summary
This PR fixes a CPU-leak in
CoalescingBufferwhere the per-instance background ticker polled at 20 Hz even when idle, and a related code path inStreamingTaskMessageContextthat could orphan the ticker on a clean stream end. The fix replaces the fixed-interval_flush_signalwith a_wake(park-on-idle) /_flush_now(bypass coalescing) dual-event design, and usesasyncio.shieldso a cancelledclose()can no longer force-cancel the ticker mid-flush._runnow blocks atawait self._wake.wait()and only runs the coalescing window whenadd()signals data; orphaned/idle buffers consume zero CPU.StreamTaskMessageFullcloses the buffer: deltas are drained and the ticker stopped before the terminalFullis published, fixing both the stale-tail ordering problem and the ticker-orphan-on-clean-exit bug.asyncio.shieldfor close cancellation: a cancelledclose()no longer propagates cancellation into the ticker mid-flush; the ticker finishes its current batch and exits naturally on_closed.Confidence Score: 5/5
Safe to merge — the fix is local to CoalescingBuffer and StreamingTaskMessageContext, hot-path semantics (first-flush immediacy, coalescing window, exactly-once drain) are unchanged and covered by the existing suite, and the new tests directly exercise all four previously-broken scenarios.
The dual-event design correctly parks the ticker at zero CPU when idle. The asyncio.shield usage is appropriate: it protects the ticker's in-flight batch from a cancelled close() while still propagating CancelledError to the caller. The final drain after shield is safe because _closed=True prevents new additions before the ticker exits, so the buffer is empty when the ticker returns. Terminal-message ordering (deltas before Full/Done, Done published exactly once with its original metadata) is verified end-to-end by the new test suite. No data-loss, ordering, or resource-leak paths were found in the changed code.
No files require special attention — both changed files are well-scoped and thoroughly tested.
Important Files Changed
Sequence Diagram
%%{init: {'theme': 'neutral'}}%% sequenceDiagram participant P as Producer (stream_update) participant B as CoalescingBuffer participant T as Ticker (_run) participant S as StreamingService Note over T: parked at _wake.wait() (zero CPU) P->>B: add(delta₁) [first delta] B->>B: _flush_now.set() + _wake.set() B-->>T: wakes T->>T: _wake.clear() T->>T: _flush_now.is_set() → skip coalesce window T->>B: _drain_locked() [acquires lock] T->>S: _on_flush(delta₁) P->>B: add(delta₂) Note over B: was_empty=True → _wake.set() T->>B: _drain_locked() [next loop] T->>S: _on_flush(delta₂) T->>T: _closed? No → back to _wake.wait() (parks) Note over T: idle — zero CPU, no polling P->>P: stream_update(StreamTaskMessageFull) P->>B: buffer.close() B->>B: "_closed=True, _wake.set(), _flush_now.set()" B-->>T: wakes (final pass) T->>B: _drain_locked() [empty] T->>T: "_closed=True → return" B->>B: asyncio.shield(task) resolves B->>B: final drain (empty) P->>S: stream_update(Full) ← after all deltas Note over P,S: Done path: close(done_event=update) drains buffer first, then publishes Done exactly once%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%% sequenceDiagram participant P as Producer (stream_update) participant B as CoalescingBuffer participant T as Ticker (_run) participant S as StreamingService Note over T: parked at _wake.wait() (zero CPU) P->>B: add(delta₁) [first delta] B->>B: _flush_now.set() + _wake.set() B-->>T: wakes T->>T: _wake.clear() T->>T: _flush_now.is_set() → skip coalesce window T->>B: _drain_locked() [acquires lock] T->>S: _on_flush(delta₁) P->>B: add(delta₂) Note over B: was_empty=True → _wake.set() T->>B: _drain_locked() [next loop] T->>S: _on_flush(delta₂) T->>T: _closed? No → back to _wake.wait() (parks) Note over T: idle — zero CPU, no polling P->>P: stream_update(StreamTaskMessageFull) P->>B: buffer.close() B->>B: "_closed=True, _wake.set(), _flush_now.set()" B-->>T: wakes (final pass) T->>B: _drain_locked() [empty] T->>T: "_closed=True → return" B->>B: asyncio.shield(task) resolves B->>B: final drain (empty) P->>S: stream_update(Full) ← after all deltas Note over P,S: Done path: close(done_event=update) drains buffer first, then publishes Done exactly onceComments Outside Diff (2)
src/agentex/lib/core/services/adk/streaming.py, line 514-515 (link)This
Donebranch still publishes the terminal update before reaping the coalesced buffer. When a provider emitsStreamTaskMessageDonewhile a delta is still inside the 50ms coalescing window, callers receiveDone, then the buffered delta fromclose(), then theDonecreated byclose(). A consumer that treatsDoneas terminal can therefore see stale trailing content and duplicate terminal events. TheDonepath needs the same pre-terminal buffer drain ordering as theFullpath.Artifacts
Repro: focused pytest harness for Done before coalesced buffer drain
Repro: verbose pytest output showing publish order and duplicate Done
Prompt To Fix With AI
src/agentex/lib/core/services/adk/streaming.py, line 518-519 (link)StreamTaskMessageDonestill publishes the terminal event beforeclose()drains the coalescing buffer. When a coalesced stream has already used the first immediate flush, then receives a small buffered delta followed by an explicit Done before the 50ms window expires, consumers can seeDone -> delta -> Doneor at least a delta after terminal Done. Drain and close the buffer before publishing the incoming Done, and avoid sending a second Done fromclose().Artifacts
Repro: focused async pytest exercising explicit Done before coalescing window drains
Repro: verbose pytest failure output with captured published order
Prompt To Fix With AI
Reviews (4): Last reviewed commit: "fix(streaming): preserve Done metadata a..." | Re-trigger Greptile