fix: auto-upgrade HTTP to WebSocket for header subscriptions#36
fix: auto-upgrade HTTP to WebSocket for header subscriptions#36tac0turtle merged 5 commits intomainfrom
Conversation
go-jsonrpc requires WebSocket transport for channel-based subscriptions. When the node URL uses HTTP, SubscribeHeaders now creates a separate WS client (http→ws, https→wss) for the subscription. Falls back to polling GetNetworkHead at 1s intervals if WS also fails. Fixes crash loop when celestia_node_url is configured with http://. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughAdds WebSocket-based header subscriptions to the Celestia node fetcher with an HTTP-to-WS upgrade path and polling fallback; extends fetcher state to store original address, auth headers, and a subscription client closer; updates subscription, forwarding, polling, and cleanup logic. Changes
Sequence DiagramsequenceDiagram
participant Caller
participant Fetcher as CelestiaNodeFetcher
participant HTTP as Celestia Node<br/>(HTTP)
participant WS as Celestia Node<br/>(WebSocket)
Caller->>Fetcher: SubscribeHeaders()
activate Fetcher
Fetcher->>HTTP: Attempt RPC subscription
HTTP-->>Fetcher: Fail / not supported
Fetcher->>Fetcher: httpToWS(addr)
Fetcher->>WS: Establish WS JSON‑RPC (subscribeViaWS)
WS-->>Fetcher: WS client ready
Fetcher->>WS: Subscribe to headers
WS-->>Fetcher: Raw JSON messages
loop Header stream
Fetcher->>Fetcher: forwardHeaders(map -> *types.Header)
Fetcher-->>Caller: Typed header
end
alt WS unavailable/fails
Fetcher->>HTTP: pollHeaders (GetNetworkHead)
loop Polling
HTTP-->>Fetcher: Current head
Fetcher-->>Caller: Header on height advance
end
end
deactivate Fetcher
Estimated Code Review Effort🎯 3 (Moderate) | ⏱️ ~22 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/fetch/celestia_node.go (1)
311-324:⚠️ Potential issue | 🟡 MinorMinor race between
subscribeViaWSandClose.If
Close()runs whilesubscribeViaWSis between thejsonrpc.NewClientcall (line 198) and storing the closer (line 204), the new WS client won't be cleaned up —f.closedis alreadytrue, andsubCloseris written afterClosereleased the lock.Consider checking
f.closedinsidesubscribeViaWSunder the same lock that storessubCloser:🛡️ Proposed fix
f.mu.Lock() + if f.closed { + f.mu.Unlock() + closer() + return nil, fmt.Errorf("fetcher is closed") + } f.subCloser = closer f.mu.Unlock()(Applied in
subscribeViaWSat lines 203–205.)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/celestia_node.go` around lines 311 - 324, The Close method can race with subscribeViaWS: ensure subscribeViaWS checks f.closed and sets f.subCloser under the same mutex to avoid leaving a newly-created WS client unclosed; specifically, after creating the client (jsonrpc.NewClient) but before assigning f.subCloser, acquire f.mu, if f.closed then immediately close the new client and return, otherwise set f.subCloser and release the lock—use the existing f.mu, f.closed, and f.subCloser fields in CelestiaNodeFetcher to coordinate with Close().
🧹 Nitpick comments (1)
pkg/fetch/celestia_node.go (1)
171-185: First subscription error is silently discarded, making debugging harder.When
f.header.Subscribefails (line 173), that error is overwritten by thesubscribeViaWSresult. If both fail, only the WS error is logged at line 180. Consider logging the initial error at debug/warn level, or wrapping both errors into the fallback log message so operators can see the full picture.rawCh, err := f.header.Subscribe(ctx) if err != nil { - // The client is likely HTTP — upgrade to WS for subscriptions. - rawCh, err = f.subscribeViaWS(ctx) + // The client is likely HTTP — upgrade to WS for subscriptions. + firstErr := err + rawCh, err = f.subscribeViaWS(ctx) + if err != nil { + f.log.Warn().Err(err).AnErr("initial_err", firstErr).Msg("header.Subscribe not available, falling back to polling") + return f.pollHeaders(ctx), nil + } } - if err != nil { - // Neither worked — fall back to polling. - f.log.Warn().Err(err).Msg("header.Subscribe not available, falling back to polling") - return f.pollHeaders(ctx), nil - }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/celestia_node.go` around lines 171 - 185, The first error from f.header.Subscribe in CelestiaNodeFetcher.SubscribeHeaders is being overwritten by the call to f.subscribeViaWS, so preserve and include both failures in the fallback log: capture the initial err (from f.header.Subscribe) into a separate variable (e.g., firstErr), then if subscribeViaWS also fails, log or wrap both firstErr and the wsErr when calling f.log.Warn().Err(...).Msg so the fallback path (and functions pollHeaders/forwardHeaders) records both error contexts for debugging.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/fetch/celestia_node.go`:
- Around line 187-217: subscribeViaWS currently overwrites f.subCloser without
closing any existing closer, leaking WS clients; fix by atomically swapping in
the new closer and closing the previous one: after creating closer, acquire
f.mu, save old := f.subCloser, set f.subCloser = closer, release f.mu, then if
old != nil call old() to close the prior client (call the close outside the lock
to avoid blocking while holding f.mu). Ensure you reference subscribeViaWS and
the f.subCloser field in the change.
---
Outside diff comments:
In `@pkg/fetch/celestia_node.go`:
- Around line 311-324: The Close method can race with subscribeViaWS: ensure
subscribeViaWS checks f.closed and sets f.subCloser under the same mutex to
avoid leaving a newly-created WS client unclosed; specifically, after creating
the client (jsonrpc.NewClient) but before assigning f.subCloser, acquire f.mu,
if f.closed then immediately close the new client and return, otherwise set
f.subCloser and release the lock—use the existing f.mu, f.closed, and
f.subCloser fields in CelestiaNodeFetcher to coordinate with Close().
---
Nitpick comments:
In `@pkg/fetch/celestia_node.go`:
- Around line 171-185: The first error from f.header.Subscribe in
CelestiaNodeFetcher.SubscribeHeaders is being overwritten by the call to
f.subscribeViaWS, so preserve and include both failures in the fallback log:
capture the initial err (from f.header.Subscribe) into a separate variable
(e.g., firstErr), then if subscribeViaWS also fails, log or wrap both firstErr
and the wsErr when calling f.log.Warn().Err(...).Msg so the fallback path (and
functions pollHeaders/forwardHeaders) records both error contexts for debugging.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The context passed to go-jsonrpc NewClient controls the WebSocket connection lifetime, not just the dial. Using a timeout context caused the subscription to close immediately when defer cancel() fired. Run the WS dial+subscribe in a goroutine with a select timeout so the parent context flows through to the WS connection, while still bounding the wait if the node doesn't accept WebSocket connections. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
pkg/fetch/celestia_node.go (1)
232-238: Correct swap-then-close pattern forsubCloser.The lock-swap-unlock-then-call-old pattern is the right approach: the old closer is called outside the lock to avoid blocking while holding
mu.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/celestia_node.go` around lines 232 - 238, The code should atomically replace f.subCloser with r.closer while holding f.mu and then invoke the previous closer outside the lock to avoid blocking: inside the critical section lock f.mu, save the current closer to a temporary (old), assign f.subCloser = r.closer, unlock f.mu, and then if old != nil call old(); ensure you use the same variable names (f.mu, f.subCloser, r.closer, old) so the swap-and-close behavior is explicit and the old closer is invoked only after releasing the mutex.
🧹 Nitpick comments (1)
pkg/fetch/celestia_node.go (1)
199-203: Nit: error message is slightly misleading for already-WS addresses.When
addrisws://orwss://,httpToWSreturns it unchanged, and the check fires with"address %q is not HTTP; cannot upgrade to WebSocket". For a WS address the real situation is that the main client already is WS andf.header.Subscribeshould have succeeded in the first place — the message may confuse diagnostics.📝 Suggested message clarification
- return nil, fmt.Errorf("address %q is not HTTP; cannot upgrade to WebSocket", f.addr) + return nil, fmt.Errorf("address %q is already WebSocket or uses an unsupported scheme; cannot upgrade", f.addr)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/fetch/celestia_node.go` around lines 199 - 203, The error message in subscribeViaWS misleads when f.addr is already a ws:// or wss:// address because httpToWS returns it unchanged; update the check that compares wsAddr == f.addr inside subscribeViaWS to return a clearer error (reference subscribeViaWS and httpToWS and f.addr) such as indicating the address is not an HTTP URL and cannot be upgraded to WebSocket (or that the address appears already to be a WebSocket URL), so diagnostics reflect whether the address is non-HTTP or already WS rather than always saying "not HTTP; cannot upgrade".
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/fetch/celestia_node.go`:
- Around line 276-314: Update the doc-comment for pollHeaders to explicitly
state it polls the network tip via GetNetworkHead at ~1s intervals and can miss
intermediate heights produced within a single tick (i.e., it is not gap‑free),
so it is a weaker fallback than SubscribeHeaders; advise callers requiring every
block to use the subscription path or a dedicated gap‑free replay mechanism (or
reduce the polling interval if acceptable).
- Around line 210-244: The goroutine started to dial/subscribe can succeed after
the select times out and will try to send a wsSubscribeResult into the buffered
done channel, leaking the WS client and internal goroutines; fix by spawning a
background goroutine in the timeout branch that drains done and invokes the
returned closer if any (and otherwise discards errors) so resources are cleaned
up. Specifically, in the timeout case where the select returns the time.After
branch, start a goroutine like: go func() { r := <-done; if r.closer != nil {
r.closer() } }() so that results from jsonrpc.NewClient / subAPI.Subscribe are
consumed and closer is called; keep existing logic that sets f.subCloser and
calls old() in the successful select case unchanged.
---
Duplicate comments:
In `@pkg/fetch/celestia_node.go`:
- Around line 232-238: The code should atomically replace f.subCloser with
r.closer while holding f.mu and then invoke the previous closer outside the lock
to avoid blocking: inside the critical section lock f.mu, save the current
closer to a temporary (old), assign f.subCloser = r.closer, unlock f.mu, and
then if old != nil call old(); ensure you use the same variable names (f.mu,
f.subCloser, r.closer, old) so the swap-and-close behavior is explicit and the
old closer is invoked only after releasing the mutex.
---
Nitpick comments:
In `@pkg/fetch/celestia_node.go`:
- Around line 199-203: The error message in subscribeViaWS misleads when f.addr
is already a ws:// or wss:// address because httpToWS returns it unchanged;
update the check that compares wsAddr == f.addr inside subscribeViaWS to return
a clearer error (reference subscribeViaWS and httpToWS and f.addr) such as
indicating the address is not an HTTP URL and cannot be upgraded to WebSocket
(or that the address appears already to be a WebSocket URL), so diagnostics
reflect whether the address is non-HTTP or already WS rather than always saying
"not HTTP; cannot upgrade".
| done := make(chan wsSubscribeResult, 1) | ||
| go func() { | ||
| var subAPI headerAPI | ||
| closer, err := jsonrpc.NewClient(ctx, wsAddr, "header", &subAPI, f.authHeader) | ||
| if err != nil { | ||
| done <- wsSubscribeResult{err: fmt.Errorf("connect WS header client: %w", err)} | ||
| return | ||
| } | ||
| ch, err := subAPI.Subscribe(ctx) | ||
| if err != nil { | ||
| closer() | ||
| done <- wsSubscribeResult{err: fmt.Errorf("header.Subscribe via WS: %w", err)} | ||
| return | ||
| } | ||
| done <- wsSubscribeResult{ch: ch, closer: closer} | ||
| }() | ||
|
|
||
| select { | ||
| case r := <-done: | ||
| if r.err != nil { | ||
| return nil, r.err | ||
| } | ||
| f.mu.Lock() | ||
| old := f.subCloser | ||
| f.subCloser = r.closer | ||
| f.mu.Unlock() | ||
| if old != nil { | ||
| old() | ||
| } | ||
| return r.ch, nil | ||
| case <-time.After(wsDialTimeout): | ||
| return nil, fmt.Errorf("WS connection to %s timed out after %s", wsAddr, wsDialTimeout) | ||
| case <-ctx.Done(): | ||
| return nil, ctx.Err() | ||
| } |
There was a problem hiding this comment.
WS connection leaks when the dial-timeout fires but the goroutine subsequently succeeds.
done is a chan wsSubscribeResult of capacity 1. When time.After(wsDialTimeout) fires and subscribeViaWS returns an error (falling back to polling), the goroutine is still alive with the parent ctx. If it later manages to dial and subscribe it writes {ch: ch, closer: closer} into the now-unread buffer. Nobody drains done after the timeout branch exits, so:
closeris never called — the WS client stays open untilctxends.- The go-jsonrpc internal goroutine that feeds
chruns indefinitely with no consumer.
🛡️ Proposed fix: drain and close in a background goroutine on timeout
select {
case r := <-done:
if r.err != nil {
return nil, r.err
}
f.mu.Lock()
old := f.subCloser
f.subCloser = r.closer
f.mu.Unlock()
if old != nil {
old()
}
return r.ch, nil
case <-time.After(wsDialTimeout):
+ // Drain the channel in the background and clean up if the goroutine
+ // eventually succeeds, to avoid leaking the WS connection/subscription.
+ go func() {
+ if r := <-done; r.closer != nil {
+ r.closer()
+ }
+ }()
return nil, fmt.Errorf("WS connection to %s timed out after %s", wsAddr, wsDialTimeout)
case <-ctx.Done():
return nil, ctx.Err()
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/fetch/celestia_node.go` around lines 210 - 244, The goroutine started to
dial/subscribe can succeed after the select times out and will try to send a
wsSubscribeResult into the buffered done channel, leaking the WS client and
internal goroutines; fix by spawning a background goroutine in the timeout
branch that drains done and invokes the returned closer if any (and otherwise
discards errors) so resources are cleaned up. Specifically, in the timeout case
where the select returns the time.After branch, start a goroutine like: go
func() { r := <-done; if r.closer != nil { r.closer() } }() so that results from
jsonrpc.NewClient / subAPI.Subscribe are consumed and closer is called; keep
existing logic that sets f.subCloser and calls old() in the successful select
case unchanged.
Add info-level log every 30s during streaming with current height and blocks processed in the interval. Previously streaming only logged at debug level, making it invisible at default log config. Also fixes WS subscription dial: uses goroutine+select timeout instead of context.WithTimeout, since go-jsonrpc uses the context for the WS connection lifetime (not just dial). The previous approach cancelled the subscription immediately after returning. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
go-jsonrpc requires WebSocket transport for channel-based subscriptions. When the node URL uses HTTP, SubscribeHeaders now creates a separate WS client (http→ws, https→wss) for the subscription. Falls back to polling GetNetworkHead at 1s intervals if WS also fails.
Fixes crash loop when celestia_node_url is configured with http://.
Overview
Summary by CodeRabbit
New Features
Improvements