moq-net: linger upstream subscriptions across consumer churn (moq-lite)#1514
moq-net: linger upstream subscriptions across consumer churn (moq-lite)#1514kixelated wants to merge 7 commits into
Conversation
When the last viewer of a track drops, the upstream subscription used to tear down immediately. A returning viewer milliseconds later triggered a fresh Subscribe, and the publisher re-served the latest cached group. Under churn the same group got re-fetched over and over. Keep the upstream TrackProducer alive for up to 5s after the last consumer leaves. On unused() we send SubscribeUpdate(priority=0) + FIN upstream and wait for one of: the upstream FIN-back (Complete), the timeout (Cancelled), or a returning consumer (Reused). Reused reissues a fresh subscribe with start_group = latest + 1 so the publisher doesn't re-send already-cached groups. Late duplicates from the prior round get silently dropped in recv_group. Required wiring the broadcast-side track cleanup to producer close (weak.closed()) instead of consumer count (weak.unused()), so the lookup entry survives transient churn. The producer owner closes it explicitly on subscription exit, which fires the cleanup eagerly. Lite01/02 fall back to the old immediate-cancel since they lack SubscribeUpdate. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughThis PR implements relay-style subscription linger: consumers can reconnect during a grace window without losing upstream state. It adds Weak::closed(), exposes TrackProducer::latest(), replaces unused()-based cleanup with closed()-based cleanup, refactors subscriber handling into linger-aware sessions with a LINGER_TIMEOUT and SessionOutcome, threads an end_group cap through the publisher to buffer/drain groups, and adds an integration test validating resubscribe behavior. 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-net/src/lite/subscriber.rs`:
- Around line 405-411: The current SessionOutcome::Reused branch removes the old
subscription id and hands out a fresh id, which causes late-arriving group
messages for the old id to hit the unknown-subscription path in recv_group and
can skip a group; instead, do not remove the old mapping — either keep the old
id mapped until that upstream session is fully drained or alias the new id to
the same TrackEntry so both ids resolve to the same subscription state.
Concretely: in the SessionOutcome::Reused block, stop calling
self.subscribes.lock().remove(&id); generate the new id with
self.next_id.fetch_add(...), set start_group = track.latest().and_then(...), and
insert into self.subscribes a mapping for the new id that points to the same
TrackEntry (or otherwise mark the old id as pending-drain) so recv_group still
finds the existing TrackEntry for messages from the prior session while the
overlap finishes.
In `@rs/moq-net/src/model/broadcast.rs`:
- Around line 582-619: The test requested_unused currently uses a real
tokio::time::sleep which makes it wall-clock dependent; modify the test to call
tokio::time::pause() at the start of requested_unused and replace
tokio::time::sleep(std::time::Duration::from_millis(1)).await with
tokio::time::advance(std::time::Duration::from_millis(1)).await so the test uses
paused Tokio time; update any imports or futures usage if necessary but keep all
logic around Broadcast, producer1, consumer*, and producer1.abort(Error::Cancel)
unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: bb891e4a-8808-42b0-a3ca-a78d09951a5b
📒 Files selected for processing (5)
rs/conducer/src/weak.rsrs/moq-native/tests/broadcast.rsrs/moq-net/src/lite/subscriber.rsrs/moq-net/src/model/broadcast.rsrs/moq-net/src/model/track.rs
CodeRabbit caught that removing the lingering subscribe id when Reused fires sends late group streams from the previous upstream session into recv_group's unknown-subscription error path instead of the intended duplicate-drop path. Track every id this run_subscribe allocates and remove them all only when the task exits, so in-flight uni streams from a prior subscription instance still resolve to the same TrackEntry. Also pause Tokio time in the requested_unused test so the 1ms cleanup yield is wall-clock independent. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous design FIN'd the upstream subscribe stream on linger entry and opened a fresh one on resume (with start_group = latest + 1). It worked but had three rough edges: - Stream churn on every consumer-churn cycle. - A race between the publisher FIN'ing back (Complete) and a returning consumer arriving (Reused) — the test surface was timing-dependent. - Late uni streams from the previous subscription needed special duplicate-drop handling. Redesign: the subscriber stays on one subscribe stream and one id for the whole lifecycle. On linger entry, it sends `SubscribeUpdate(priority=0, end_group=Some(latest))`. On resume, `SubscribeUpdate(end_group=None, priority=original)`. On linger timeout, FIN. The publisher treats `end_group` as a serving cap, not a terminator: groups beyond the cap are held in a sorted Vec<GroupConsumer> (the GroupConsumer keeps the data reachable in the producer's cache) and drained in sequence order once the cap rises or is unset. Publisher changes: - `run_subscribe_updates` now forwards end_group through a watch channel alongside priority. - `run_track` consults the cap before spawning serve tasks and re-evaluates held groups when end_group.changed() fires. Track-finished + held-empty + tasks-drained is the only exit; only a FIN ends the subscription. Subscriber changes: - One subscribe id per `run_subscribe`; no more multi-id active set. - run_subscribe_session is now a single phase-1 / phase-2 loop driving the SubscribeUpdate cap/uncap dance on the same stream. - recv_group's special-case Duplicate drop is gone — duplicates can't happen on the same subscription with the cap design. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-net/src/lite/publisher.rs`:
- Around line 430-509: The loop currently keeps calling track.recv_group() and
accumulating unlimited GroupConsumer entries into held when end_group imposes a
cap, allowing a peer to force unbounded memory growth; to fix this, stop polling
track.recv_group() while held has reached a safe limit (e.g. const MAX_HELD) or
implement an eviction policy: when held.len() >= MAX_HELD, either await
end_group.changed() (pausing consumption) or evict oldest/least-important groups
before calling track.recv_group(); update the loop where track.recv_group() is
awaited (and the held.push logic) so it only polls when cap permits more entries
(or after eviction), and ensure any eviction logs/metrics via track_stats and
preserves existing spawn_serve/track_done behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 9f354290-e7fc-402c-9a07-fe1422bd471c
📒 Files selected for processing (2)
rs/moq-net/src/lite/publisher.rsrs/moq-net/src/lite/subscriber.rs
Both run_track and serve_group were ferrying the same 5-7 ambient values (session, subscribe id, track name, track stats, priority queue, track priority receiver, version) through long argument lists. Per the project's "4+ args is a struct waiting to happen" guideline, group them into a Subscription<S> handle. Clone is cheap — every field is either small or Arc-backed. run_track and serve_group become methods; spawn_serve takes &mut self. run_subscribe builds the struct once after subscribe_track succeeds and hands it to run_track. Drops the #[allow(clippy::too_many_arguments)] escape hatch added in the previous commit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CodeRabbit flagged that the Vec<GroupConsumer> held buffer in run_track had no bound — a peer that issues SubscribeUpdate(end_group=Some(low)) and never raises it could force unlimited publisher-side memory growth. Replace the Vec with Option<GroupConsumer> and gate track.recv_group() on pending.is_none(), so we pull at most one group beyond the cap. Anything the publisher produces while pending is set sits in the track's own cache (which is already bounded by its 5-second eviction policy), and lands on the wire normally once the cap rises and we resume consuming. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per the user review: rather than have publisher.rs manually buffer one
out-of-range group and gate track.recv_group(), push the range-awareness
down into the consumer itself.
TrackConsumer gains:
- end_sequence: Option<u64> field
- end_at(Option<u64>) setter — raisable, lowerable, removable at any time
poll_next_group is reworked to call a new sequence-based scan
(State::poll_next_in_range) that returns the smallest sequence in
[next_sequence, end_sequence] that's currently cached. When the cap rises
or unsets, previously-blocked cached groups become eligible automatically.
When it falls below the cursor, next_group parks instead of returning
Ok(None) — only track finalization (with floor past final_sequence) ends
the stream.
next_group is now sequence-ordered (smallest first) rather than
arrival-ordered with a monotonic drop filter. The one existing test that
codified the old arrival-order edge case is rewritten to verify the new
contract; the new sequence-order semantics are strictly better at not
losing data on out-of-order arrivals. recv_group is unchanged.
Publisher's run_track loses the manual pending-group buffer and the gated
recv_group call — it just track.end_at()'s the current cap and loops on
track.next_group() + end_group.changed().
Four unit tests cover the new behavior: capping, raising the cap to drain
cached groups, dropping the cap below the cursor (consumer parks), and
toggling around out-of-order arrivals.
Also a minor subscriber.rs style cleanup (`if let Err = res { return ... }`
in the linger select arms, per the same review pass).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The end_group_tx/rx watch channel was working around a perceived cancel-safety issue with decode_maybe in a hot select arm. Verified the underlying transports we ship are actually cancel-safe: - quinn::RecvStream::read documents "This operation is cancel-safe." - qmux::RecvStream::read_chunk is a tokio::mpsc::Receiver::recv (cancel-safe). - web_transport_trait::RecvStream::read_buf default impl only awaits read, with sync advance_mut after — no point at which a cancellation can lose bytes if read is cancel-safe. - Reader::decode_maybe accumulates partial bytes into self.buffer and re-enters from the same buffer position, so cancel mid-message just pauses parsing and resumes later. With that established, fold the SubscribeUpdate-reading branch directly into run_track's select. Drops one channel (end_group) and one helper function (run_subscribe_updates); the priority channel stays because serve_group tasks still need fan-out. Preserves the exit asymmetry: track-finished drains in-flight tasks (the publisher is done, deliver everything cleanly), peer-FIN drops them (the subscriber gave up, don't bother). Documents cancel-safety on Reader::decode_maybe so future transport impls know they have to preserve it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
When the last viewer of a track drops, the upstream subscription used to tear down immediately. A returning viewer milliseconds later triggered a fresh
Subscribe, and the publisher re-served the latest cached group. Under churn (page reloads, reconnects, toggles) the same group got re-fetched over and over.This PR keeps the upstream
TrackProduceralive for up to 5s after the last consumer leaves. Ontrack.unused()the lite subscriber sendsSubscribeUpdate(priority = 0)+ FIN upstream and waits for one of:start_group = latest + 1so the publisher doesn't re-send already-cached groupsLate duplicates from the prior subscription round get silently dropped in
recv_group(the data is already cached locally, so STOP_SENDING is the right move).Lite01/02 fall back to the old immediate-cancel since they lack
SubscribeUpdate. IETF transport is unchanged for now.Why the broadcast cleanup change
The auto-cleanup spawn in
BroadcastConsumer::subscribe_trackpreviously fired onweak.unused(), which raced the linger logic and yanked the lookup entry the instant consumer count hit 0 — defeating the point of linger. Re-anchored it toweak.closed()instead: the producer owner is now responsible for closing the producer (viaabort/finish/drop) once truly done, and that close fires the cleanup eagerly. No in-memory leak of dead broadcasts.The
requested_unusedtest was rewritten to match the new contract: while the producer is alive, re-subscribing to the same name returns the existing producer's consumer (no new request); a fresh request only appears after the producer itself is closed.Files
rs/conducer/src/weak.rs— addWeak::closed()(mirror ofProducer::closed)rs/moq-net/src/model/track.rs—TrackProducer::latest(),TrackWeak::closed()rs/moq-net/src/model/broadcast.rs— cleanup spawn waits onweak.closed(); test rewriters/moq-net/src/lite/subscriber.rs— restructurerun_subscribeinto a linger loop driving a newrun_subscribe_session; silently dropError::Duplicateinrecv_grouprs/moq-native/tests/broadcast.rs—linger_resubscribe_keeps_flowing_moq_lite_03smoke testTest plan
just checkcargo test --workspace --exclude moq-gst --exclude moq-ffi --exclude libmoq— all greenmodel::broadcast::test::requested_unusedupdated and passes against new contractlinger_resubscribe_keeps_flowing_moq_lite_03end-to-end test passesKnown caveats
Distinguishing the Reused vs Complete linger branches from a black-box client test over localhost turned out to be too timing-dependent (the publisher's FIN-back roundtrip races the test thread's resubscribe call). The included test verifies the end-to-end smoke — drop + resub keeps working through the linger window. A more targeted branch-specific test would need either a slow-publisher harness or wire-level interception. (Written by Claude)
🤖 Generated with Claude Code