From f40463433ed57e1135f09180ffe5a1d2abd65d24 Mon Sep 17 00:00:00 2001 From: matthew-pilot Date: Sat, 30 May 2026 07:23:02 +0000 Subject: [PATCH] fix: per-subscriber write goroutines with 5s timeout to prevent slow subscribers from stalling publisher (PILOT-278) --- service.go | 74 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 23 deletions(-) diff --git a/service.go b/service.go index a346de8..2441cee 100644 --- a/service.go +++ b/service.go @@ -35,6 +35,7 @@ const ( const ( publishRetryBackoff = 20 * time.Millisecond maxConsecutivePublishFailures = 3 + publishWriteTimeout = 5 * time.Second ) // Service is the L11 plugin adapter. cmd/daemon/main.go (L12) and @@ -317,31 +318,58 @@ func (b *broker) publishWith(evt *Event, sender *subscriber, write eventWriter) } b.mu.RUnlock() - var dead []*subscriber + var ( + wg sync.WaitGroup + deadMu sync.Mutex + dead []*subscriber + ) for _, s := range targets { - err := write(s, evt) - if err != nil { - time.Sleep(publishRetryBackoff) - err = write(s, evt) - } - if err == nil { - s.publishFailures.Store(0) - continue - } - failureCount := s.publishFailures.Add(1) - if failureCount >= maxConsecutivePublishFailures { - slog.Debug("eventstream subscriber removed after consecutive failures", - "remote", s.remote(), - "consecutive_failures", failureCount, - "last_error", err) - dead = append(dead, s) - } else { - slog.Debug("eventstream write failed, retaining subscriber", - "remote", s.remote(), - "consecutive_failures", failureCount, - "error", err) - } + wg.Add(1) + go func(s *subscriber) { + defer wg.Done() + done := make(chan error, 1) + go func() { + err := write(s, evt) + if err != nil { + time.Sleep(publishRetryBackoff) + err = write(s, evt) + } + done <- err + }() + select { + case err := <-done: + if err == nil { + s.publishFailures.Store(0) + return + } + if s.publishFailures.Add(1) >= maxConsecutivePublishFailures { + slog.Debug("eventstream subscriber removed after consecutive failures", + "remote", s.remote(), + "consecutive_failures", maxConsecutivePublishFailures, + "last_error", err) + deadMu.Lock() + dead = append(dead, s) + deadMu.Unlock() + } + case <-time.After(publishWriteTimeout): + if s.publishFailures.Add(1) >= maxConsecutivePublishFailures { + slog.Debug("eventstream subscriber removed after write timeout", + "remote", s.remote(), + "consecutive_failures", maxConsecutivePublishFailures, + "timeout", publishWriteTimeout) + deadMu.Lock() + dead = append(dead, s) + deadMu.Unlock() + } else { + slog.Debug("eventstream write timed out, retaining subscriber", + "remote", s.remote(), + "consecutive_failures", maxConsecutivePublishFailures, + "timeout", publishWriteTimeout) + } + } + }(s) } + wg.Wait() for _, s := range dead { b.removeSub(s) }