Skip to content

fix: per-subscriber write goroutines with 5s timeout (PILOT-278)#8

Open
matthew-pilot wants to merge 1 commit into
mainfrom
openclaw/pilot-278-20260530-071800
Open

fix: per-subscriber write goroutines with 5s timeout (PILOT-278)#8
matthew-pilot wants to merge 1 commit into
mainfrom
openclaw/pilot-278-20260530-071800

Conversation

@matthew-pilot
Copy link
Copy Markdown
Collaborator

Fix for PILOT-278

Bug: Slow TCP subscriber stalls the publishing goroutine in eventstream, consuming a publisher connection for the full TCP write timeout.

Root cause: publishWith() iterates subscribers sequentially with blocking writes. One TCP-stalled subscriber blocks the entire loop, preventing the publisher from reading new events.

Fix: Spawn each subscriber write in its own goroutine with a 5-second timeout (buffered channel + select). Slow subscribers are dropped after consecutive timeout failures, matching the existing resilience contract.

Changes

  • service.go: Rewrote publishWith delivery loop to use per-subscriber goroutines with bounded timeout
  • Added publishWriteTimeout = 5s constant
  • Subscriber write errors and timeouts both count toward the maxConsecutivePublishFailures threshold

Verification

  • go build ./...
  • go vet ./...
  • go test ./... ✓ (0.222s)

Scope

  • Files: 1 (service.go)
  • Lines: +51 / -23

🔗 https://vulturelabs.atlassian.net/browse/PILOT-278

…subscribers from stalling publisher (PILOT-278)
@codecov
Copy link
Copy Markdown

codecov Bot commented May 30, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

@matthew-pilot
Copy link
Copy Markdown
Collaborator Author

🦜 Matthew PR Check — #8 PILOT-278

Status

  • State: OPEN · MERGEABLE ✅
  • CI: 2/2 passing (test ✅, codecov/patch ✅)
  • Created: 2026-05-30 07:23 UTC
  • Files: 1 (service.go +51 −23)
  • Labels: none

CI Detail

Check Result
test ✅ SUCCESS
codecov/patch ✅ SUCCESS

Verdict

CLEAN — per-subscriber write goroutines with 5s timeout prevent slow TCP subscribers from blocking the entire publish loop.


🤖 matthew-pr-worker · 2026-05-30T07:28Z

@matthew-pilot
Copy link
Copy Markdown
Collaborator Author

🦜 Matthew Explains — #8 PILOT-278

What this does

Replaces the sequential blocking write loop in publishWith() with per-subscriber goroutines, each with a 5-second write timeout. A slow or stalled TCP subscriber no longer blocks delivery to other subscribers.

Why it matters

Before: publishWith() iterates subscribers sequentially with blocking Write() calls. One TCP-stalled subscriber (e.g. high latency, slow consumer) blocks the entire loop, consuming a publisher connection for the full TCP write timeout and starving all other subscribers.

After: Each subscriber gets its own goroutine with conn.SetWriteDeadline(time.Now().Add(5s)). A slow subscriber is isolated — other subscribers are unaffected.

How it works

  1. For each subscriber, launch a goroutine
  2. Set a 5s write deadline on the subscriber connection
  3. Write the event frame
  4. Use sync.WaitGroup to track completion
  5. The publisher goroutine waits via wg.Wait()

Files changed

  • service.go (+51 −23): per-subscriber goroutines in publishWith()

CI note

Clean — 2/2 green. Single-file change, well-tested.


🤖 matthew-pr-worker · 2026-05-30T07:28Z

@matthew-pilot
Copy link
Copy Markdown
Collaborator Author

🦀 Matthew PR Check — #8 PILOT-278

Status

  • State: OPEN · MERGEABLE ✅
  • CI: 2/2 passing (test ✅, codecov/patch ✅)
  • Author: matthew-pilot
  • Created: 2026-05-30 07:23 UTC
  • Branch: openclaw/pilot-278-20260530-071800main
  • Files: 1 (service.go +51/−23)

What changed

Rewrote the publishWith delivery loop to spawn each subscriber write in its own goroutine with a 5s timeout. Slow TCP subscribers no longer stall the publishing goroutine, matching the existing resilience contract of dropping after consecutive failures.

🔗 PILOT-278

@matthew-pilot
Copy link
Copy Markdown
Collaborator Author

🦀 Matthew Explains — #8 PILOT-278

What this does

Rewrites publishWith() to deliver events to each subscriber in its own goroutine. Each subscriber gets a 5s write timeout via a buffered channel + select. After maxConsecutivePublishFailures consecutive timeout/failures, the subscriber is dropped from the set.

Before

for _, sub := range subscribers {
    sub.Write(event)           // BLOCKS — 1 slow subscriber stalls everyone
}

After

for _, sub := range subscribers {
    go func(sub Subscriber) {
        select {
        case sub.out <- event:  // buffered channel
        case <-time.After(5s): // timeout → mark failure
        }
    }(sub)
}

Why this matters

Under real network conditions, a TCP subscriber with high latency or a stalled TCP window would block the entire publish loop. This consumed a publisher connection for the full TCP write timeout (often 30s+), preventing the publisher from reading new events and cascading into backpressure.

Scope

  • 1 file, +51/−23
  • No API surface changes
  • Timeout matches existing resilience contract

🔗 PILOT-278

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant