Skip to content

fix: auto-upgrade HTTP to WebSocket for header subscriptions#36

Merged
tac0turtle merged 5 commits intomainfrom
fix/ws-subscription-upgrade
Feb 25, 2026
Merged

fix: auto-upgrade HTTP to WebSocket for header subscriptions#36
tac0turtle merged 5 commits intomainfrom
fix/ws-subscription-upgrade

Conversation

@tac0turtle
Copy link
Contributor

@tac0turtle tac0turtle commented Feb 25, 2026

go-jsonrpc requires WebSocket transport for channel-based subscriptions. When the node URL uses HTTP, SubscribeHeaders now creates a separate WS client (http→ws, https→wss) for the subscription. Falls back to polling GetNetworkHead at 1s intervals if WS also fails.

Fixes crash loop when celestia_node_url is configured with http://.

Overview

Summary by CodeRabbit

  • New Features

    • Added WebSocket-based header subscription with automatic upgrade from HTTP and graceful fallback to periodic polling when subscriptions fail.
  • Improvements

    • More reliable header synchronization with authentication-aware connections and improved cleanup on close to avoid stale subscriptions.

go-jsonrpc requires WebSocket transport for channel-based subscriptions.
When the node URL uses HTTP, SubscribeHeaders now creates a separate WS
client (http→ws, https→wss) for the subscription. Falls back to polling
GetNetworkHead at 1s intervals if WS also fails.

Fixes crash loop when celestia_node_url is configured with http://.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link

coderabbitai bot commented Feb 25, 2026

Warning

Rate limit exceeded

@tac0turtle has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 23 minutes and 4 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 81403e6 and 430e9ae.

📒 Files selected for processing (2)
  • pkg/fetch/celestia_node.go
  • pkg/sync/subscription.go
📝 Walkthrough

Walkthrough

Adds WebSocket-based header subscriptions to the Celestia node fetcher with an HTTP-to-WS upgrade path and polling fallback; extends fetcher state to store original address, auth headers, and a subscription client closer; updates subscription, forwarding, polling, and cleanup logic.

Changes

Cohort / File(s) Summary
Header subscription & WS fallback
pkg/fetch/celestia_node.go
Adds WS subscription path and http->ws conversion, new helpers subscribeViaWS, forwardHeaders, pollHeaders, and httpToWS. Extends CelestiaNodeFetcher with addr, authHeader, and subCloser. SubscribeHeaders now tries RPC subscription, upgrades to WS, then falls back to polling. Close cleans up WS client.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant Fetcher as CelestiaNodeFetcher
    participant HTTP as Celestia Node<br/>(HTTP)
    participant WS as Celestia Node<br/>(WebSocket)

    Caller->>Fetcher: SubscribeHeaders()
    activate Fetcher

    Fetcher->>HTTP: Attempt RPC subscription
    HTTP-->>Fetcher: Fail / not supported

    Fetcher->>Fetcher: httpToWS(addr)
    Fetcher->>WS: Establish WS JSON‑RPC (subscribeViaWS)
    WS-->>Fetcher: WS client ready

    Fetcher->>WS: Subscribe to headers
    WS-->>Fetcher: Raw JSON messages

    loop Header stream
      Fetcher->>Fetcher: forwardHeaders(map -> *types.Header)
      Fetcher-->>Caller: Typed header
    end

    alt WS unavailable/fails
      Fetcher->>HTTP: pollHeaders (GetNetworkHead)
      loop Polling
        HTTP-->>Fetcher: Current head
        Fetcher-->>Caller: Header on height advance
      end
    end

    deactivate Fetcher
Loading

Estimated Code Review Effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Poem

I’m a rabbit on the wire, ears alert for news,
I hop from HTTP to WS with nimble, quiet shoes,
If sockets nap, I poll the trail,
Till headers hop and never fail,
Tiny paws keep data in good use. 🐰✨

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and clearly describes the main change: auto-upgrading HTTP to WebSocket for header subscriptions, which is the core functionality added in this PR.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/ws-subscription-upgrade

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/fetch/celestia_node.go (1)

311-324: ⚠️ Potential issue | 🟡 Minor

Minor race between subscribeViaWS and Close.

If Close() runs while subscribeViaWS is between the jsonrpc.NewClient call (line 198) and storing the closer (line 204), the new WS client won't be cleaned up — f.closed is already true, and subCloser is written after Close released the lock.

Consider checking f.closed inside subscribeViaWS under the same lock that stores subCloser:

🛡️ Proposed fix
 	f.mu.Lock()
+	if f.closed {
+		f.mu.Unlock()
+		closer()
+		return nil, fmt.Errorf("fetcher is closed")
+	}
 	f.subCloser = closer
 	f.mu.Unlock()

(Applied in subscribeViaWS at lines 203–205.)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/celestia_node.go` around lines 311 - 324, The Close method can race
with subscribeViaWS: ensure subscribeViaWS checks f.closed and sets f.subCloser
under the same mutex to avoid leaving a newly-created WS client unclosed;
specifically, after creating the client (jsonrpc.NewClient) but before assigning
f.subCloser, acquire f.mu, if f.closed then immediately close the new client and
return, otherwise set f.subCloser and release the lock—use the existing f.mu,
f.closed, and f.subCloser fields in CelestiaNodeFetcher to coordinate with
Close().
🧹 Nitpick comments (1)
pkg/fetch/celestia_node.go (1)

171-185: First subscription error is silently discarded, making debugging harder.

When f.header.Subscribe fails (line 173), that error is overwritten by the subscribeViaWS result. If both fail, only the WS error is logged at line 180. Consider logging the initial error at debug/warn level, or wrapping both errors into the fallback log message so operators can see the full picture.

 	rawCh, err := f.header.Subscribe(ctx)
 	if err != nil {
-		// The client is likely HTTP — upgrade to WS for subscriptions.
-		rawCh, err = f.subscribeViaWS(ctx)
+		// The client is likely HTTP — upgrade to WS for subscriptions.
+		firstErr := err
+		rawCh, err = f.subscribeViaWS(ctx)
+		if err != nil {
+			f.log.Warn().Err(err).AnErr("initial_err", firstErr).Msg("header.Subscribe not available, falling back to polling")
+			return f.pollHeaders(ctx), nil
+		}
 	}
-	if err != nil {
-		// Neither worked — fall back to polling.
-		f.log.Warn().Err(err).Msg("header.Subscribe not available, falling back to polling")
-		return f.pollHeaders(ctx), nil
-	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/celestia_node.go` around lines 171 - 185, The first error from
f.header.Subscribe in CelestiaNodeFetcher.SubscribeHeaders is being overwritten
by the call to f.subscribeViaWS, so preserve and include both failures in the
fallback log: capture the initial err (from f.header.Subscribe) into a separate
variable (e.g., firstErr), then if subscribeViaWS also fails, log or wrap both
firstErr and the wsErr when calling f.log.Warn().Err(...).Msg so the fallback
path (and functions pollHeaders/forwardHeaders) records both error contexts for
debugging.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/fetch/celestia_node.go`:
- Around line 187-217: subscribeViaWS currently overwrites f.subCloser without
closing any existing closer, leaking WS clients; fix by atomically swapping in
the new closer and closing the previous one: after creating closer, acquire
f.mu, save old := f.subCloser, set f.subCloser = closer, release f.mu, then if
old != nil call old() to close the prior client (call the close outside the lock
to avoid blocking while holding f.mu). Ensure you reference subscribeViaWS and
the f.subCloser field in the change.

---

Outside diff comments:
In `@pkg/fetch/celestia_node.go`:
- Around line 311-324: The Close method can race with subscribeViaWS: ensure
subscribeViaWS checks f.closed and sets f.subCloser under the same mutex to
avoid leaving a newly-created WS client unclosed; specifically, after creating
the client (jsonrpc.NewClient) but before assigning f.subCloser, acquire f.mu,
if f.closed then immediately close the new client and return, otherwise set
f.subCloser and release the lock—use the existing f.mu, f.closed, and
f.subCloser fields in CelestiaNodeFetcher to coordinate with Close().

---

Nitpick comments:
In `@pkg/fetch/celestia_node.go`:
- Around line 171-185: The first error from f.header.Subscribe in
CelestiaNodeFetcher.SubscribeHeaders is being overwritten by the call to
f.subscribeViaWS, so preserve and include both failures in the fallback log:
capture the initial err (from f.header.Subscribe) into a separate variable
(e.g., firstErr), then if subscribeViaWS also fails, log or wrap both firstErr
and the wsErr when calling f.log.Warn().Err(...).Msg so the fallback path (and
functions pollHeaders/forwardHeaders) records both error contexts for debugging.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9b611f8 and e2dfe69.

📒 Files selected for processing (1)
  • pkg/fetch/celestia_node.go

tac0turtle and others added 2 commits February 25, 2026 13:57
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The context passed to go-jsonrpc NewClient controls the WebSocket
connection lifetime, not just the dial. Using a timeout context caused
the subscription to close immediately when defer cancel() fired.

Run the WS dial+subscribe in a goroutine with a select timeout so
the parent context flows through to the WS connection, while still
bounding the wait if the node doesn't accept WebSocket connections.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
pkg/fetch/celestia_node.go (1)

232-238: Correct swap-then-close pattern for subCloser.

The lock-swap-unlock-then-call-old pattern is the right approach: the old closer is called outside the lock to avoid blocking while holding mu.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/celestia_node.go` around lines 232 - 238, The code should
atomically replace f.subCloser with r.closer while holding f.mu and then invoke
the previous closer outside the lock to avoid blocking: inside the critical
section lock f.mu, save the current closer to a temporary (old), assign
f.subCloser = r.closer, unlock f.mu, and then if old != nil call old(); ensure
you use the same variable names (f.mu, f.subCloser, r.closer, old) so the
swap-and-close behavior is explicit and the old closer is invoked only after
releasing the mutex.
🧹 Nitpick comments (1)
pkg/fetch/celestia_node.go (1)

199-203: Nit: error message is slightly misleading for already-WS addresses.

When addr is ws:// or wss://, httpToWS returns it unchanged, and the check fires with "address %q is not HTTP; cannot upgrade to WebSocket". For a WS address the real situation is that the main client already is WS and f.header.Subscribe should have succeeded in the first place — the message may confuse diagnostics.

📝 Suggested message clarification
-		return nil, fmt.Errorf("address %q is not HTTP; cannot upgrade to WebSocket", f.addr)
+		return nil, fmt.Errorf("address %q is already WebSocket or uses an unsupported scheme; cannot upgrade", f.addr)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/celestia_node.go` around lines 199 - 203, The error message in
subscribeViaWS misleads when f.addr is already a ws:// or wss:// address because
httpToWS returns it unchanged; update the check that compares wsAddr == f.addr
inside subscribeViaWS to return a clearer error (reference subscribeViaWS and
httpToWS and f.addr) such as indicating the address is not an HTTP URL and
cannot be upgraded to WebSocket (or that the address appears already to be a
WebSocket URL), so diagnostics reflect whether the address is non-HTTP or
already WS rather than always saying "not HTTP; cannot upgrade".
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/fetch/celestia_node.go`:
- Around line 276-314: Update the doc-comment for pollHeaders to explicitly
state it polls the network tip via GetNetworkHead at ~1s intervals and can miss
intermediate heights produced within a single tick (i.e., it is not gap‑free),
so it is a weaker fallback than SubscribeHeaders; advise callers requiring every
block to use the subscription path or a dedicated gap‑free replay mechanism (or
reduce the polling interval if acceptable).
- Around line 210-244: The goroutine started to dial/subscribe can succeed after
the select times out and will try to send a wsSubscribeResult into the buffered
done channel, leaking the WS client and internal goroutines; fix by spawning a
background goroutine in the timeout branch that drains done and invokes the
returned closer if any (and otherwise discards errors) so resources are cleaned
up. Specifically, in the timeout case where the select returns the time.After
branch, start a goroutine like: go func() { r := <-done; if r.closer != nil {
r.closer() } }() so that results from jsonrpc.NewClient / subAPI.Subscribe are
consumed and closer is called; keep existing logic that sets f.subCloser and
calls old() in the successful select case unchanged.

---

Duplicate comments:
In `@pkg/fetch/celestia_node.go`:
- Around line 232-238: The code should atomically replace f.subCloser with
r.closer while holding f.mu and then invoke the previous closer outside the lock
to avoid blocking: inside the critical section lock f.mu, save the current
closer to a temporary (old), assign f.subCloser = r.closer, unlock f.mu, and
then if old != nil call old(); ensure you use the same variable names (f.mu,
f.subCloser, r.closer, old) so the swap-and-close behavior is explicit and the
old closer is invoked only after releasing the mutex.

---

Nitpick comments:
In `@pkg/fetch/celestia_node.go`:
- Around line 199-203: The error message in subscribeViaWS misleads when f.addr
is already a ws:// or wss:// address because httpToWS returns it unchanged;
update the check that compares wsAddr == f.addr inside subscribeViaWS to return
a clearer error (reference subscribeViaWS and httpToWS and f.addr) such as
indicating the address is not an HTTP URL and cannot be upgraded to WebSocket
(or that the address appears already to be a WebSocket URL), so diagnostics
reflect whether the address is non-HTTP or already WS rather than always saying
"not HTTP; cannot upgrade".

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e2dfe69 and 81403e6.

📒 Files selected for processing (1)
  • pkg/fetch/celestia_node.go

Comment on lines +210 to 244
done := make(chan wsSubscribeResult, 1)
go func() {
var subAPI headerAPI
closer, err := jsonrpc.NewClient(ctx, wsAddr, "header", &subAPI, f.authHeader)
if err != nil {
done <- wsSubscribeResult{err: fmt.Errorf("connect WS header client: %w", err)}
return
}
ch, err := subAPI.Subscribe(ctx)
if err != nil {
closer()
done <- wsSubscribeResult{err: fmt.Errorf("header.Subscribe via WS: %w", err)}
return
}
done <- wsSubscribeResult{ch: ch, closer: closer}
}()

select {
case r := <-done:
if r.err != nil {
return nil, r.err
}
f.mu.Lock()
old := f.subCloser
f.subCloser = r.closer
f.mu.Unlock()
if old != nil {
old()
}
return r.ch, nil
case <-time.After(wsDialTimeout):
return nil, fmt.Errorf("WS connection to %s timed out after %s", wsAddr, wsDialTimeout)
case <-ctx.Done():
return nil, ctx.Err()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

WS connection leaks when the dial-timeout fires but the goroutine subsequently succeeds.

done is a chan wsSubscribeResult of capacity 1. When time.After(wsDialTimeout) fires and subscribeViaWS returns an error (falling back to polling), the goroutine is still alive with the parent ctx. If it later manages to dial and subscribe it writes {ch: ch, closer: closer} into the now-unread buffer. Nobody drains done after the timeout branch exits, so:

  1. closer is never called — the WS client stays open until ctx ends.
  2. The go-jsonrpc internal goroutine that feeds ch runs indefinitely with no consumer.
🛡️ Proposed fix: drain and close in a background goroutine on timeout
 	select {
 	case r := <-done:
 		if r.err != nil {
 			return nil, r.err
 		}
 		f.mu.Lock()
 		old := f.subCloser
 		f.subCloser = r.closer
 		f.mu.Unlock()
 		if old != nil {
 			old()
 		}
 		return r.ch, nil
 	case <-time.After(wsDialTimeout):
+		// Drain the channel in the background and clean up if the goroutine
+		// eventually succeeds, to avoid leaking the WS connection/subscription.
+		go func() {
+			if r := <-done; r.closer != nil {
+				r.closer()
+			}
+		}()
 		return nil, fmt.Errorf("WS connection to %s timed out after %s", wsAddr, wsDialTimeout)
 	case <-ctx.Done():
 		return nil, ctx.Err()
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/fetch/celestia_node.go` around lines 210 - 244, The goroutine started to
dial/subscribe can succeed after the select times out and will try to send a
wsSubscribeResult into the buffered done channel, leaking the WS client and
internal goroutines; fix by spawning a background goroutine in the timeout
branch that drains done and invokes the returned closer if any (and otherwise
discards errors) so resources are cleaned up. Specifically, in the timeout case
where the select returns the time.After branch, start a goroutine like: go
func() { r := <-done; if r.closer != nil { r.closer() } }() so that results from
jsonrpc.NewClient / subAPI.Subscribe are consumed and closer is called; keep
existing logic that sets f.subCloser and calls old() in the successful select
case unchanged.

tac0turtle and others added 2 commits February 25, 2026 14:55
Add info-level log every 30s during streaming with current height and
blocks processed in the interval. Previously streaming only logged at
debug level, making it invisible at default log config.

Also fixes WS subscription dial: uses goroutine+select timeout instead
of context.WithTimeout, since go-jsonrpc uses the context for the WS
connection lifetime (not just dial). The previous approach cancelled
the subscription immediately after returning.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@tac0turtle tac0turtle merged commit 87217cd into main Feb 25, 2026
4 checks passed
@tac0turtle tac0turtle deleted the fix/ws-subscription-upgrade branch February 25, 2026 13:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant