Skip to content

moq-net: linger upstream subscriptions across consumer churn (moq-lite)#1514

Open
kixelated wants to merge 7 commits into
devfrom
claude/beautiful-raman-582da6
Open

moq-net: linger upstream subscriptions across consumer churn (moq-lite)#1514
kixelated wants to merge 7 commits into
devfrom
claude/beautiful-raman-582da6

Conversation

@kixelated
Copy link
Copy Markdown
Collaborator

Summary

When the last viewer of a track drops, the upstream subscription used to tear down immediately. A returning viewer milliseconds later triggered a fresh Subscribe, and the publisher re-served the latest cached group. Under churn (page reloads, reconnects, toggles) the same group got re-fetched over and over.

This PR keeps the upstream TrackProducer alive for up to 5s after the last consumer leaves. On track.unused() the lite subscriber sends SubscribeUpdate(priority = 0) + FIN upstream and waits for one of:

  • Complete — the upstream FINs back (subscription is done)
  • Cancelled — the 5s linger timeout expires (no one came back)
  • Reused — a returning consumer arrives during the wait; reissue with start_group = latest + 1 so the publisher doesn't re-send already-cached groups

Late duplicates from the prior subscription round get silently dropped in recv_group (the data is already cached locally, so STOP_SENDING is the right move).

Lite01/02 fall back to the old immediate-cancel since they lack SubscribeUpdate. IETF transport is unchanged for now.

Why the broadcast cleanup change

The auto-cleanup spawn in BroadcastConsumer::subscribe_track previously fired on weak.unused(), which raced the linger logic and yanked the lookup entry the instant consumer count hit 0 — defeating the point of linger. Re-anchored it to weak.closed() instead: the producer owner is now responsible for closing the producer (via abort/finish/drop) once truly done, and that close fires the cleanup eagerly. No in-memory leak of dead broadcasts.

The requested_unused test was rewritten to match the new contract: while the producer is alive, re-subscribing to the same name returns the existing producer's consumer (no new request); a fresh request only appears after the producer itself is closed.

Files

  • rs/conducer/src/weak.rs — add Weak::closed() (mirror of Producer::closed)
  • rs/moq-net/src/model/track.rsTrackProducer::latest(), TrackWeak::closed()
  • rs/moq-net/src/model/broadcast.rs — cleanup spawn waits on weak.closed(); test rewrite
  • rs/moq-net/src/lite/subscriber.rs — restructure run_subscribe into a linger loop driving a new run_subscribe_session; silently drop Error::Duplicate in recv_group
  • rs/moq-native/tests/broadcast.rslinger_resubscribe_keeps_flowing_moq_lite_03 smoke test

Test plan

  • just check
  • cargo test --workspace --exclude moq-gst --exclude moq-ffi --exclude libmoq — all green
  • Existing model::broadcast::test::requested_unused updated and passes against new contract
  • New linger_resubscribe_keeps_flowing_moq_lite_03 end-to-end test passes

Known caveats

Distinguishing the Reused vs Complete linger branches from a black-box client test over localhost turned out to be too timing-dependent (the publisher's FIN-back roundtrip races the test thread's resubscribe call). The included test verifies the end-to-end smoke — drop + resub keeps working through the linger window. A more targeted branch-specific test would need either a slow-publisher harness or wire-level interception. (Written by Claude)

🤖 Generated with Claude Code

When the last viewer of a track drops, the upstream subscription used to
tear down immediately. A returning viewer milliseconds later triggered a
fresh Subscribe, and the publisher re-served the latest cached group.
Under churn the same group got re-fetched over and over.

Keep the upstream TrackProducer alive for up to 5s after the last
consumer leaves. On unused() we send SubscribeUpdate(priority=0) + FIN
upstream and wait for one of: the upstream FIN-back (Complete), the
timeout (Cancelled), or a returning consumer (Reused). Reused reissues a
fresh subscribe with start_group = latest + 1 so the publisher doesn't
re-send already-cached groups. Late duplicates from the prior round get
silently dropped in recv_group.

Required wiring the broadcast-side track cleanup to producer close
(weak.closed()) instead of consumer count (weak.unused()), so the lookup
entry survives transient churn. The producer owner closes it explicitly
on subscription exit, which fires the cleanup eagerly. Lite01/02 fall
back to the old immediate-cancel since they lack SubscribeUpdate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 27, 2026

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

This PR implements relay-style subscription linger: consumers can reconnect during a grace window without losing upstream state. It adds Weak::closed(), exposes TrackProducer::latest(), replaces unused()-based cleanup with closed()-based cleanup, refactors subscriber handling into linger-aware sessions with a LINGER_TIMEOUT and SessionOutcome, threads an end_group cap through the publisher to buffer/drain groups, and adds an integration test validating resubscribe behavior.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main change: implementing linger behavior for upstream subscriptions in moq-lite to reduce re-fetching during consumer churn.
Description check ✅ Passed The description is comprehensive and directly related to the changeset, explaining the motivation, implementation, files changed, and testing performed.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch claude/beautiful-raman-582da6

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/moq-net/src/lite/subscriber.rs`:
- Around line 405-411: The current SessionOutcome::Reused branch removes the old
subscription id and hands out a fresh id, which causes late-arriving group
messages for the old id to hit the unknown-subscription path in recv_group and
can skip a group; instead, do not remove the old mapping — either keep the old
id mapped until that upstream session is fully drained or alias the new id to
the same TrackEntry so both ids resolve to the same subscription state.
Concretely: in the SessionOutcome::Reused block, stop calling
self.subscribes.lock().remove(&id); generate the new id with
self.next_id.fetch_add(...), set start_group = track.latest().and_then(...), and
insert into self.subscribes a mapping for the new id that points to the same
TrackEntry (or otherwise mark the old id as pending-drain) so recv_group still
finds the existing TrackEntry for messages from the prior session while the
overlap finishes.

In `@rs/moq-net/src/model/broadcast.rs`:
- Around line 582-619: The test requested_unused currently uses a real
tokio::time::sleep which makes it wall-clock dependent; modify the test to call
tokio::time::pause() at the start of requested_unused and replace
tokio::time::sleep(std::time::Duration::from_millis(1)).await with
tokio::time::advance(std::time::Duration::from_millis(1)).await so the test uses
paused Tokio time; update any imports or futures usage if necessary but keep all
logic around Broadcast, producer1, consumer*, and producer1.abort(Error::Cancel)
unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: bb891e4a-8808-42b0-a3ca-a78d09951a5b

📥 Commits

Reviewing files that changed from the base of the PR and between b6e7f4c and c35bf08.

📒 Files selected for processing (5)
  • rs/conducer/src/weak.rs
  • rs/moq-native/tests/broadcast.rs
  • rs/moq-net/src/lite/subscriber.rs
  • rs/moq-net/src/model/broadcast.rs
  • rs/moq-net/src/model/track.rs

Comment thread rs/moq-net/src/lite/subscriber.rs Outdated
Comment thread rs/moq-net/src/model/broadcast.rs Outdated
kixelated and others added 2 commits May 26, 2026 21:21
CodeRabbit caught that removing the lingering subscribe id when Reused
fires sends late group streams from the previous upstream session into
recv_group's unknown-subscription error path instead of the intended
duplicate-drop path. Track every id this run_subscribe allocates and
remove them all only when the task exits, so in-flight uni streams
from a prior subscription instance still resolve to the same
TrackEntry.

Also pause Tokio time in the requested_unused test so the 1ms cleanup
yield is wall-clock independent.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous design FIN'd the upstream subscribe stream on linger entry and
opened a fresh one on resume (with start_group = latest + 1). It worked but
had three rough edges:

- Stream churn on every consumer-churn cycle.
- A race between the publisher FIN'ing back (Complete) and a returning
  consumer arriving (Reused) — the test surface was timing-dependent.
- Late uni streams from the previous subscription needed special
  duplicate-drop handling.

Redesign: the subscriber stays on one subscribe stream and one id for the
whole lifecycle. On linger entry, it sends `SubscribeUpdate(priority=0,
end_group=Some(latest))`. On resume, `SubscribeUpdate(end_group=None,
priority=original)`. On linger timeout, FIN. The publisher treats
`end_group` as a serving cap, not a terminator: groups beyond the cap are
held in a sorted Vec<GroupConsumer> (the GroupConsumer keeps the data
reachable in the producer's cache) and drained in sequence order once the
cap rises or is unset.

Publisher changes:
- `run_subscribe_updates` now forwards end_group through a watch channel
  alongside priority.
- `run_track` consults the cap before spawning serve tasks and re-evaluates
  held groups when end_group.changed() fires. Track-finished + held-empty +
  tasks-drained is the only exit; only a FIN ends the subscription.

Subscriber changes:
- One subscribe id per `run_subscribe`; no more multi-id active set.
- run_subscribe_session is now a single phase-1 / phase-2 loop driving the
  SubscribeUpdate cap/uncap dance on the same stream.
- recv_group's special-case Duplicate drop is gone — duplicates can't
  happen on the same subscription with the cap design.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/moq-net/src/lite/publisher.rs`:
- Around line 430-509: The loop currently keeps calling track.recv_group() and
accumulating unlimited GroupConsumer entries into held when end_group imposes a
cap, allowing a peer to force unbounded memory growth; to fix this, stop polling
track.recv_group() while held has reached a safe limit (e.g. const MAX_HELD) or
implement an eviction policy: when held.len() >= MAX_HELD, either await
end_group.changed() (pausing consumption) or evict oldest/least-important groups
before calling track.recv_group(); update the loop where track.recv_group() is
awaited (and the held.push logic) so it only polls when cap permits more entries
(or after eviction), and ensure any eviction logs/metrics via track_stats and
preserves existing spawn_serve/track_done behavior.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9f354290-e7fc-402c-9a07-fe1422bd471c

📥 Commits

Reviewing files that changed from the base of the PR and between a09e1e8 and adfd03e.

📒 Files selected for processing (2)
  • rs/moq-net/src/lite/publisher.rs
  • rs/moq-net/src/lite/subscriber.rs

Comment thread rs/moq-net/src/lite/publisher.rs Outdated
kixelated and others added 2 commits May 27, 2026 13:53
Both run_track and serve_group were ferrying the same 5-7 ambient values
(session, subscribe id, track name, track stats, priority queue, track
priority receiver, version) through long argument lists. Per the project's
"4+ args is a struct waiting to happen" guideline, group them into a
Subscription<S> handle. Clone is cheap — every field is either small or
Arc-backed.

run_track and serve_group become methods; spawn_serve takes &mut self.
run_subscribe builds the struct once after subscribe_track succeeds and
hands it to run_track. Drops the #[allow(clippy::too_many_arguments)]
escape hatch added in the previous commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CodeRabbit flagged that the Vec<GroupConsumer> held buffer in run_track had
no bound — a peer that issues SubscribeUpdate(end_group=Some(low)) and
never raises it could force unlimited publisher-side memory growth.

Replace the Vec with Option<GroupConsumer> and gate track.recv_group() on
pending.is_none(), so we pull at most one group beyond the cap. Anything
the publisher produces while pending is set sits in the track's own cache
(which is already bounded by its 5-second eviction policy), and lands on
the wire normally once the cap rises and we resume consuming.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread rs/moq-net/src/lite/publisher.rs Outdated
Comment thread rs/moq-net/src/lite/subscriber.rs Outdated
kixelated and others added 2 commits May 27, 2026 14:52
Per the user review: rather than have publisher.rs manually buffer one
out-of-range group and gate track.recv_group(), push the range-awareness
down into the consumer itself.

TrackConsumer gains:
- end_sequence: Option<u64> field
- end_at(Option<u64>) setter — raisable, lowerable, removable at any time

poll_next_group is reworked to call a new sequence-based scan
(State::poll_next_in_range) that returns the smallest sequence in
[next_sequence, end_sequence] that's currently cached. When the cap rises
or unsets, previously-blocked cached groups become eligible automatically.
When it falls below the cursor, next_group parks instead of returning
Ok(None) — only track finalization (with floor past final_sequence) ends
the stream.

next_group is now sequence-ordered (smallest first) rather than
arrival-ordered with a monotonic drop filter. The one existing test that
codified the old arrival-order edge case is rewritten to verify the new
contract; the new sequence-order semantics are strictly better at not
losing data on out-of-order arrivals. recv_group is unchanged.

Publisher's run_track loses the manual pending-group buffer and the gated
recv_group call — it just track.end_at()'s the current cap and loops on
track.next_group() + end_group.changed().

Four unit tests cover the new behavior: capping, raising the cap to drain
cached groups, dropping the cap below the cursor (consumer parks), and
toggling around out-of-order arrivals.

Also a minor subscriber.rs style cleanup (`if let Err = res { return ... }`
in the linger select arms, per the same review pass).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The end_group_tx/rx watch channel was working around a perceived
cancel-safety issue with decode_maybe in a hot select arm. Verified the
underlying transports we ship are actually cancel-safe:

- quinn::RecvStream::read documents "This operation is cancel-safe."
- qmux::RecvStream::read_chunk is a tokio::mpsc::Receiver::recv (cancel-safe).
- web_transport_trait::RecvStream::read_buf default impl only awaits read,
  with sync advance_mut after — no point at which a cancellation can lose
  bytes if read is cancel-safe.
- Reader::decode_maybe accumulates partial bytes into self.buffer and
  re-enters from the same buffer position, so cancel mid-message just
  pauses parsing and resumes later.

With that established, fold the SubscribeUpdate-reading branch directly
into run_track's select. Drops one channel (end_group) and one helper
function (run_subscribe_updates); the priority channel stays because
serve_group tasks still need fan-out.

Preserves the exit asymmetry: track-finished drains in-flight tasks (the
publisher is done, deliver everything cleanly), peer-FIN drops them
(the subscriber gave up, don't bother).

Documents cancel-safety on Reader::decode_maybe so future transport impls
know they have to preserve it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@kixelated kixelated changed the base branch from main to dev May 28, 2026 16:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant