Skip to content

fix: serialize Dial/Listen queue handoff to prevent router token loss#573

Merged
raballew merged 25 commits into
jumpstarter-dev:mainfrom
raballew:fix-listen-queue-race
May 29, 2026
Merged

fix: serialize Dial/Listen queue handoff to prevent router token loss#573
raballew merged 25 commits into
jumpstarter-dev:mainfrom
raballew:fix-listen-queue-race

Conversation

@raballew

Copy link
Copy Markdown
Member

Summary

  • Fixes the TOCTOU race in listenQueues where a reconnecting Listen() goroutine and a dying one compete for the same channel, causing ~50% token loss rate
  • Wraps the raw channel in a listenQueue struct with a done channel and sync.Once for clean shutdown signaling
  • Introduces a per-lease mutex (leaseLock) that serializes queue swaps in Listen() and token sends in Dial(), eliminating the race window entirely
  • Uses non-blocking sends to prevent mutex starvation when the channel buffer is full
  • Adds ref-counted lease lock cleanup to prevent memory leaks in the leaseLocks sync.Map
  • Enables the race detector in the controller test target

Test plan

  • go test -race -v ./internal/service/ -run TestListenQueue passes
  • make e2e-run passes with label filter core
  • Stress test workflow confirms reduced flake rate

Fixes #572

Generated with Claude Code

@coderabbitai

coderabbitai Bot commented Apr 16, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Warning

Review limit reached

@mangelajo, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 50 minutes and 42 seconds. Learn how PR review limits work.

Your organization has run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 530b010a-5253-4561-8feb-a191eb947741

📥 Commits

Reviewing files that changed from the base of the PR and between adc99dd and 4554642.

📒 Files selected for processing (1)
  • controller/internal/service/controller_service.go
📝 Walkthrough

Walkthrough

The changes fix a race in Dial/Listen handoff by adding per-lease listenQueue wrappers, refcounted lease locks, atomic queue swapping via swapListenQueue, and serialized non-blocking sends via sendToListener. Listen and Dial are updated to use these primitives; tests exercise supersession, draining, backpressure, and lease-lock concurrency. The Makefile enables Go's race detector for non-e2e tests.

Changes

Controller listen/dial coordination

Layer / File(s) Summary
Per-lease primitives and helpers
controller/internal/service/controller_service.go
Adds sync/atomic import. Implements listenQueue wrapper, refcounted leaseLock with acquireLeaseLock/releaseLeaseLock, swapListenQueue, and sendToListener which returns Unavailable or ResourceExhausted when appropriate.
Listen lifecycle and swap semantics
controller/internal/service/controller_service.go
Refactors Listen to create a session-specific listenQueue, install it via swapListenQueue, close previous wrapper done on supersession, drain buffered messages after done closes, CompareAndDelete on cleanup, and release the lease lock.
Dial: use serialized sendToListener
controller/internal/service/controller_service.go
Updates Dial to call sendToListener for token delivery instead of directly using the shared channel and select.
Tests and helpers
controller/internal/service/controller_service_test.go
Updates imports; adds testRouterToken and drainChannel; replaces custom substring helper; adds extensive concurrency and lifecycle tests (supersession, draining, backpressure, races, and lease-lock refcount tests).
Build Configuration
controller/Makefile
Adds -race flag to the test Make target to enable the Go race detector for non-e2e tests.

Sequence Diagram

sequenceDiagram
    participant Client as Dial (Client)
    participant Controller as ControllerService
    participant Exporter as Listen (Exporter)
    participant LeaseMap as listenQueues

    Note over Client,LeaseMap: OLD FLOW (Race Condition)
    Client->>LeaseMap: LoadOrStore queue (shared chan)
    Exporter->>LeaseMap: LoadOrStore queue (same chan)
    Client->>LeaseMap: Send token -> channel
    Exporter->>Exporter: old goroutine may read token
    Exporter->>Exporter: token consumed by stale reader -> lost

    Note over Client,LeaseMap: NEW FLOW (Fixed)
    Exporter->>Controller: Listen starts, acquire leaseLock
    Controller->>LeaseMap: swapListenQueue (install wrapper with done)
    Controller->>Controller: close previous wrapper.done
    Client->>Controller: Dial -> sendToListener (under leaseLock)
    Controller->>LeaseMap: check active wrapper, non-blocking send to wrapper.ch
    alt send succeeds
      Controller->>Exporter: Exporter receives token from active wrapper
    else buffer full
      Controller-->>Client: return ResourceExhausted
    end
    Exporter->>Exporter: after done closes, drain buffered messages then exit
    Controller->>LeaseMap: CompareAndDelete on graceful exit
    Controller->>Controller: release leaseLock
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related issues

  • #572: This PR addresses the flaky E2E failure caused by the listen queue race in Dial/Listen handoff by introducing per-lease coordination and atomic queue swaps.

Poem

🐰 A race was caught in the listen queue,
Where tokens vanished, old streams grew new.
Now swaps are atomic, locks hold true—
Per-lease coordination sees it through,
No more lost hops, the handoff flew! 🎯

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title directly and specifically describes the primary fix: serializing Dial/Listen queue handoff to prevent router token loss, which is the main objective of the PR.
Description check ✅ Passed The description comprehensively covers the pull request changes, including the race condition fix, the listenQueue wrapper, per-lease mutex, non-blocking sends, and ref-counting, all of which are related to the changeset.
Linked Issues check ✅ Passed The PR fully addresses issue #572 by implementing per-lease mutex serialization (leaseLock), wrapping channels in listenQueue with done signaling, using non-blocking sends, and adding comprehensive tests validating the race fix.
Out of Scope Changes check ✅ Passed All changes are directly scoped to fixing the identified race condition: Makefile race detector flag, listenQueue implementation, per-lease locking, and comprehensive test coverage for the fix.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In @.github/workflows/e2e-stress.yaml:
- Around line 84-88: The workflow's stress matrix uses strategy.fail-fast: true
which cancels remaining shards when one fails; change strategy.fail-fast to
false so the matrix (strategy.matrix.run) completes all 100 runs and reports
every failing shard instead of stopping on the first failure. Locate the
strategy block containing fail-fast and the matrix.run array and update
fail-fast to false to ensure full-run flake hunting.

In `@controller/internal/service/controller_service_test.go`:
- Around line 1032-1059: Update the two tests to assert the gRPC error code is
codes.Unavailable by invoking svc.sendToListener and inspecting the returned
status: in TestListenQueueDialReturnsUnavailableWhenNoListener call
svc.sendToListener with leaseName "nonexistent-lease" and assert the error is
non-nil and that status.Code(err) == codes.Unavailable; in
TestListenQueueDialReturnsUnavailableWhenDoneClosed keep the existing setup
(swapListenQueue + closeDone) but replace the nil check with an assertion that
status.Code(err) == codes.Unavailable (use google.golang.org/grpc/status and
codes for the check).

In `@controller/internal/service/controller_service.go`:
- Around line 541-546: When wrapper.done is closed the current select can return
without processing already-queued tokens on wrapper.ch; modify the exit path
(the select handling in the loop around wrapper.ch) to drain wrapper.ch before
returning on wrapper.done or ctx.Done: when detecting wrapper.done (or ctx.Done)
is closed, enter a non-blocking loop that repeatedly reads from wrapper.ch and
calls sendToListener for each queued token until wrapper.ch is empty, then
return. Update the logic around the select that references wrapper.done and
wrapper.ch (the loop that invokes sendToListener and the Dial handoff code) to
ensure all pending messages are processed before exiting.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ab09502c-3654-4ed4-ad56-3491d3ddf4c7

📥 Commits

Reviewing files that changed from the base of the PR and between 74794de and 160c6c1.

📒 Files selected for processing (4)
  • .github/workflows/e2e-stress.yaml
  • controller/Makefile
  • controller/internal/service/controller_service.go
  • controller/internal/service/controller_service_test.go

Comment thread .github/workflows/e2e-stress.yaml Outdated
Comment thread controller/internal/service/controller_service_test.go
Comment thread controller/internal/service/controller_service.go
raballew added a commit to raballew/jumpstarter that referenced this pull request Apr 16, 2026
…tus codes

Address CodeRabbit review feedback on PR jumpstarter-dev#573:

1. When wrapper.done fires in the Listen loop, drain wrapper.ch via a
   non-blocking loop calling stream.Send() for each queued token before
   returning. This prevents token loss when sendToListener enqueues a
   token just before swapListenQueue closes the done channel.

2. Update TestListenQueueDialReturnsUnavailableWhenNoListener to call
   sendToListener and assert status.Code(err) == codes.Unavailable
   instead of just checking the sync.Map directly.

3. Update TestListenQueueDialReturnsUnavailableWhenDoneClosed to assert
   status.Code(err) == codes.Unavailable instead of just err != nil.

4. Update TestListenQueueListenLoopDeliversTokensAndExitsOnDone to
   include drain logic matching the production code pattern.

5. Add TestListenQueueDrainsBufferedTokensOnSupersession and
   TestListenQueueListenLoopDrainsOnSupersession to verify the drain
   behavior.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
controller/internal/service/controller_service.go (1)

541-553: ⚠️ Potential issue | 🟠 Major

Drain buffered tokens when ctx.Done() races with supersession.

If ctx.Done() and wrapper.done are both ready here, the select can take Line 542 and return before the buffered handoff tokens are drained. That reopens the token-loss path this PR is trying to close: sendToListener can succeed, but the old listener still drops queued tokens during reconnect.

Possible fix
+	drainBuffered := func() error {
+		for {
+			select {
+			case msg := <-wrapper.ch:
+				if err := stream.Send(msg); err != nil {
+					return err
+				}
+			default:
+				return nil
+			}
+		}
+	}
+
 	for {
 		select {
 		case <-ctx.Done():
-			return nil
+			select {
+			case <-wrapper.done:
+				return drainBuffered()
+			default:
+				return nil
+			}
 		case <-wrapper.done:
-			for {
-				select {
-				case msg := <-wrapper.ch:
-					if err := stream.Send(msg); err != nil {
-						return err
-					}
-				default:
-					return nil
-				}
-			}
+			return drainBuffered()
 		case msg := <-wrapper.ch:
 			if err := stream.Send(msg); err != nil {
 				return err
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@controller/internal/service/controller_service.go` around lines 541 - 553,
The select between ctx.Done() and wrapper.done can pick ctx.Done() while
wrapper.done is also ready, causing buffered tokens in wrapper.ch to be lost;
change the logic so that when ctx.Done() is selected you first check
(non-blocking) whether wrapper.done is closed and, if so, drain wrapper.ch by
repeatedly receiving and calling stream.Send on each message (same behavior as
the wrapper.done branch) before returning; locate the code around the select
that references ctx.Done(), wrapper.done, wrapper.ch and stream.Send and
implement the non-blocking check-and-drain on wrapper.ch when ctx.Done() fires.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@controller/internal/service/controller_service_test.go`:
- Around line 824-903: The test
TestDialSendToListenerConcurrentWithSwapNeverLandsOnSuperseded currently treats
a non-nil error from svc.sendToListener as an acceptable "rejected" outcome;
change the test so any non-nil sendErr is treated as a test failure instead
(i.e., call t.Fatalf or t.Fatalf with context) to ensure sendToListener under
the per-lease mutex never returns an error in this race; update the branches
around sendResult <- svc.sendToListener(...) and the subsequent handling of
sendErr to assert sendErr == nil, referencing
TestDialSendToListenerConcurrentWithSwapNeverLandsOnSuperseded,
svc.sendToListener, and svc.swapListenQueue.
- Around line 327-348: The first subtest deletes the original listen queue
(wrapper) so the second subtest is running against a missing key; restore the
initial state before the second subtest by creating a fresh wrapper instance and
inserting it back into svc.listenQueues under leaseName (or re-store the
original wrapper) so that CompareAndDelete in the "queue survives when a
reconnecting Listen replaced it" case actually exercises stale-cleanup logic;
reference svc.listenQueues, wrapper, leaseName, swapListenQueue and
CompareAndDelete when making the change.

---

Duplicate comments:
In `@controller/internal/service/controller_service.go`:
- Around line 541-553: The select between ctx.Done() and wrapper.done can pick
ctx.Done() while wrapper.done is also ready, causing buffered tokens in
wrapper.ch to be lost; change the logic so that when ctx.Done() is selected you
first check (non-blocking) whether wrapper.done is closed and, if so, drain
wrapper.ch by repeatedly receiving and calling stream.Send on each message (same
behavior as the wrapper.done branch) before returning; locate the code around
the select that references ctx.Done(), wrapper.done, wrapper.ch and stream.Send
and implement the non-blocking check-and-drain on wrapper.ch when ctx.Done()
fires.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 7d8baf64-1f2a-4b0d-97a9-b3e19823f9db

📥 Commits

Reviewing files that changed from the base of the PR and between 160c6c1 and d9715ed.

📒 Files selected for processing (2)
  • controller/internal/service/controller_service.go
  • controller/internal/service/controller_service_test.go

Comment thread controller/internal/service/controller_service_test.go
Comment thread controller/internal/service/controller_service_test.go
raballew added a commit to raballew/jumpstarter that referenced this pull request Apr 17, 2026
…tus codes

Address CodeRabbit review feedback on PR jumpstarter-dev#573:

1. When wrapper.done fires in the Listen loop, drain wrapper.ch via a
   non-blocking loop calling stream.Send() for each queued token before
   returning. This prevents token loss when sendToListener enqueues a
   token just before swapListenQueue closes the done channel.

2. Update TestListenQueueDialReturnsUnavailableWhenNoListener to call
   sendToListener and assert status.Code(err) == codes.Unavailable
   instead of just checking the sync.Map directly.

3. Update TestListenQueueDialReturnsUnavailableWhenDoneClosed to assert
   status.Code(err) == codes.Unavailable instead of just err != nil.

4. Update TestListenQueueListenLoopDeliversTokensAndExitsOnDone to
   include drain logic matching the production code pattern.

5. Add TestListenQueueDrainsBufferedTokensOnSupersession and
   TestListenQueueListenLoopDrainsOnSupersession to verify the drain
   behavior.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@raballew raballew force-pushed the fix-listen-queue-race branch from d9715ed to 94d3027 Compare April 17, 2026 10:46
@raballew raballew enabled auto-merge (squash) April 17, 2026 18:51
raballew and others added 21 commits April 28, 2026 12:10
Replace the missing queue cleanup in Listen() with a single
defer s.listenQueues.CompareAndDelete(leaseName, queue) call.

This fixes issue jumpstarter-dev#414 where a race between Listen() cleanup and Dial()
token delivery causes intermittent "Connection to exporter lost" errors
in E2E tests. CompareAndDelete only removes the queue if it is still the
same channel instance that this invocation created, so a reconnecting
exporter's new queue is never accidentally deleted by an old invocation's
deferred cleanup.

Compared to the timer-based approach in PR jumpstarter-dev#417, this solution:
- Eliminates the known race at timer expiry
- Requires no additional struct fields (listenTimers) or goroutines
- Has no timing-dependent test behavior

Generated-By: Forge/20260415_224144_3227186_20142bee
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When a reconnecting Listen() inherits the same channel via LoadOrStore,
the old Listen()'s deferred CompareAndDelete would succeed because both
hold the same channel reference, incorrectly deleting the map entry that
the reconnected Listen() depends on.

By wrapping the channel in a unique listenQueue struct per Listen() call
and using CompareAndSwap on reconnect, the old Listen()'s
CompareAndDelete becomes a no-op since the pointer identity no longer
matches.

Generated-By: Forge/20260415_224144_3227186_20142bee
When two Listen() goroutines execute concurrently for the same lease,
both attempt CompareAndSwap with the same stale reference. The loser's
CAS fails, leaving its wrapper unstored. If the winner exits first,
its CompareAndDelete removes the entry while the loser still reads
from it. Add a retry path that re-loads the current map value and
attempts CAS again, ensuring the surviving goroutine always owns the
map entry.

Generated-By: Forge/20260415_224144_3227186_20142bee
…ader race

Replace the shared-channel approach (LoadOrStore + CompareAndSwap) with
per-invocation channels and done-signaling. Each Listen() call creates a
fresh listenQueue with its own ch and done channels, atomically swaps it
into the sync.Map, and closes the previous entry's done channel to signal
the old goroutine to stop reading. Dial() now uses Load (not LoadOrStore)
to send tokens to the current listener only.

This eliminates the logical race where a stale goroutine with a broken
gRPC stream could consume and discard a dial token meant for the
reconnected goroutine.

Generated-By: Forge/20260415_235731_3329604_06ed4455
…check

Add a deterministic pre-check of q.done before the send select in Dial,
preventing tokens from being silently lost when sent to a queue whose
listener has been superseded. Also add a fallback case <-q.done in the
main select for races that occur between the pre-check and the send.

Generated-By: Forge/20260415_235731_3329604_06ed4455

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The done channel was only closed when a listener was superseded by a new
Listen call. On normal exit (context cancellation or stream error), done
was left open, making it an unreliable signal for Dial's done pre-check.
Use sync.Once to close done in a deferred call, ensuring it is always
closed exactly once regardless of exit path.

Generated-By: Forge/20260415_235731_3329604_06ed4455

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The defer statements in Listen() were ordered such that
CompareAndDelete ran before closeDone (LIFO). This created a
TOCTOU window where a concurrent Dial could load a queue reference,
pass the done check (done still open), and send to a dead queue.

Swapping the defer order ensures closeDone() runs first, so any
concurrent Dial that loaded the queue reference will see the closed
done channel and reject the send before the map entry is removed.

Generated-By: Forge/20260415_235731_3329604_06ed4455
Address review findings F001, F002, F005, F006, F007, F008:
- Add tests exercising listenQueue integration with actual struct behavior
- Rename misleading TestListenQueueConcurrentReadersAreNonDeterministic
- Add test for Dial returning Unavailable with no listener
- Add concurrent Dial-during-reconnection test
- Add context cancellation test
- Run tests with -race flag

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The done-channel approach has a TOCTOU race: if Dial loads a queue
reference before Listen reconnects, then both <-q.done and q.ch <-
response are ready in the select (buffered channel), and Go may
non-deterministically pick the send, delivering the token to a
superseded queue.

Add a per-lease mutex (leaseLocks sync.Map) that serializes
swapListenQueue (Swap + closeDone) with sendToListener (Load + check
+ send). This guarantees that the queue loaded in Dial cannot be
superseded during the send.

Also replace custom contains/searchSubstring helpers with
strings.Contains and add tests covering the stale-Dial scenario
with pre-swap queue references.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…der backpressure

When the listenQueue channel buffer is full, sendToListener blocks on
the channel send while holding the per-lease mutex. This prevents a
reconnecting Listen from acquiring the mutex to swap the queue, creating
a deadlock chain. Adding the Dial caller's context to the select allows
the blocked send to be cancelled when the Dial client disconnects,
releasing the mutex for the reconnecting Listen to proceed.

Generated-By: Forge/20260416_070332_3699740_7b2bda71

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Extract channel draining logic from TestListenQueueConcurrentDialDuringReconnection
into a reusable drainChannel function, replacing the goto drained/g2drained pattern
with idiomatic Go.

Generated-By: Forge/20260416_073038_3739633_70e0127f
The leaseLocks sync.Map grew unboundedly because per-lease mutex entries
were never deleted. Now when Listen exits and CompareAndDelete successfully
removes the queue (meaning no new listener took over), the corresponding
leaseLocks entry is also deleted.

Generated-By: Forge/20260416_073038_3739633_70e0127f
Exercises the full deadlock-avoidance chain: buffer full, sendToListener
blocks holding the per-lease mutex, swapListenQueue blocks on the mutex,
context cancellation unblocks sendToListener, and swapListenQueue proceeds.

Generated-By: Forge/20260416_073038_3739633_70e0127f
Add -race flag to the go test invocation in the Makefile test target,
which is used by the controller-tests CI workflow. This ensures data
races are detected in CI.

Generated-By: Forge/20260416_073038_3739633_70e0127f
Extract repeated string literal "tok" into testRouterToken constant
and remove trailing blank line to satisfy golangci-lint checks.

Generated-By: Forge/20260416_075157_11117_6c67a5ce
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the manual Load+select TOCTOU pattern in
TestListenQueueConcurrentDialDuringReconnection with calls to
svc.sendToListener() and svc.swapListenQueue(), exercising the
actual production code path that serializes Dial with reconnecting
Listen via the per-lease mutex.

Generated-By: Forge/20260416_075157_11117_6c67a5ce
…p ops

Refactor tests to use swapListenQueue() and sendToListener() instead
of directly calling listenQueues.Store(), listenQueues.Swap(), and
close(done). This ensures tests exercise the actual production code
paths including per-lease mutex serialization, rather than testing a
different (pre-fix) code path that bypasses the TOCTOU protection.

Generated-By: Forge/20260416_075157_11117_6c67a5ce
…ks entries

The Listen cleanup path called closeDone() outside the per-lease mutex,
which allowed an in-flight sendToListener to see a partially-torn-down
queue. Worse, leaseLocks.Delete could remove the mutex while a concurrent
Listen or Dial still references it, breaking serialization guarantees.

Fix by acquiring the lease mutex before calling closeDone() in the
cleanup defer, and by never deleting leaseLocks entries (they are tiny
and bounded by the number of distinct lease names seen).

Also fix the stale-reader detection test to check done and ch
deterministically instead of relying on random select ordering.

Generated-By: Forge/20260416_105202_199878_b08a2035
Replace the blocking channel send in sendToListener with a non-blocking
send that returns ResourceExhausted when the listener buffer is full.
Previously, sendToListener held the per-lease mutex while blocking on a
full channel, which prevented swapListenQueue (reconnecting listeners)
and other Dial attempts from proceeding until the RPC context timed out.

Generated-By: Forge/20260416_105202_199878_b08a2035
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…e cleanup

The previous getLeaseLock returned raw *sync.Mutex pointers from a sync.Map
that was never cleaned up, leaking memory. Replace with acquireLeaseLock /
releaseLeaseLock using atomic ref counting so the map entry is removed when
the last listener releases. Ensure closeDone in the Listen defer runs under
the lease mutex for proper serialization.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ener test

Remove the racy counter increment from TestLeaseLockRefCountConcurrentAcquireRelease
since goroutines that acquire-release quickly may get different mutex instances.
Add TestLeaseLockRefCountConcurrentOverlappingListeners that uses a barrier to
ensure all goroutines hold a reference before using the mutex, matching the
real Listen lifecycle pattern.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
raballew and others added 3 commits April 28, 2026 12:10
Runs the E2E suite 100 times in parallel (max 20 concurrent) with
fail-fast enabled. Builds images once, then each matrix entry sets up
its own Kind cluster and runs the full test. Triggered manually via
workflow_dispatch only.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tus codes

Address CodeRabbit review feedback on PR jumpstarter-dev#573:

1. When wrapper.done fires in the Listen loop, drain wrapper.ch via a
   non-blocking loop calling stream.Send() for each queued token before
   returning. This prevents token loss when sendToListener enqueues a
   token just before swapListenQueue closes the done channel.

2. Update TestListenQueueDialReturnsUnavailableWhenNoListener to call
   sendToListener and assert status.Code(err) == codes.Unavailable
   instead of just checking the sync.Map directly.

3. Update TestListenQueueDialReturnsUnavailableWhenDoneClosed to assert
   status.Code(err) == codes.Unavailable instead of just err != nil.

4. Update TestListenQueueListenLoopDeliversTokensAndExitsOnDone to
   include drain logic matching the production code pattern.

5. Add TestListenQueueDrainsBufferedTokensOnSupersession and
   TestListenQueueListenLoopDrainsOnSupersession to verify the drain
   behavior.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@raballew raballew force-pushed the fix-listen-queue-race branch from 94d3027 to 6c185dd Compare April 28, 2026 10:10
@raballew

Copy link
Copy Markdown
Member Author

@mangelajo ptal

@raballew

Copy link
Copy Markdown
Member Author

@mangelajo bumping this one. needs review.

@mangelajo mangelajo left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now an abridged version :-P :)

Document the concurrency reasoning in the listenQueue/leaseLock
mechanism so future readers understand why each construct exists.

Co-authored-by: Cursor <cursoragent@cursor.com>
@mangelajo mangelajo force-pushed the fix-listen-queue-race branch from adc99dd to 4554642 Compare May 29, 2026 17:05

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@controller/internal/service/controller_service.go`:
- Around line 104-138: acquireLeaseLock can hand out a mutex that another
goroutine removes from the map in releaseLeaseLock, causing different callers to
get different mutexes for the same leaseName; fix by serializing map mutation
and refcount changes with a single mutex: add a controller-level lock (e.g.,
s.leaseLocksMu) and hold it around the LoadOrStore/refcount increment/backout
path in acquireLeaseLock and around the refcount decrement + CompareAndDelete
path in releaseLeaseLock so creation, increment, decrement and deletion are
atomic with respect to each other; update acquireLeaseLock and releaseLeaseLock
to use that mutex and remove the ad-hoc atomic window (and then drop the atomic
refs usage/import if no longer needed).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 75d12426-b485-4eb6-abaf-ea7981dff198

📥 Commits

Reviewing files that changed from the base of the PR and between d9715ed and adc99dd.

📒 Files selected for processing (3)
  • controller/Makefile
  • controller/internal/service/controller_service.go
  • controller/internal/service/controller_service_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • controller/internal/service/controller_service_test.go

Comment thread controller/internal/service/controller_service.go
@raballew raballew merged commit e914a82 into jumpstarter-dev:main May 29, 2026
33 of 46 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flaky E2E: listen queue race causes 'Connection to exporter lost' in Dial/Listen handoff

2 participants