From e2dfe69c90e06195f92852651f8af415489f6106 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 25 Feb 2026 13:47:26 +0100 Subject: [PATCH 1/5] fix: auto-upgrade HTTP to WebSocket for header subscriptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit go-jsonrpc requires WebSocket transport for channel-based subscriptions. When the node URL uses HTTP, SubscribeHeaders now creates a separate WS client (http→ws, https→wss) for the subscription. Falls back to polling GetNetworkHead at 1s intervals if WS also fails. Fixes crash loop when celestia_node_url is configured with http://. Co-Authored-By: Claude Opus 4.6 --- pkg/fetch/celestia_node.go | 116 +++++++++++++++++++++++++++++++++++-- 1 file changed, 112 insertions(+), 4 deletions(-) diff --git a/pkg/fetch/celestia_node.go b/pkg/fetch/celestia_node.go index 8275f67..3115b50 100644 --- a/pkg/fetch/celestia_node.go +++ b/pkg/fetch/celestia_node.go @@ -41,9 +41,12 @@ type CelestiaNodeFetcher struct { blob blobAPI headerCloser jsonrpc.ClientCloser blobCloser jsonrpc.ClientCloser + addr string // original address for creating WS subscription clients + authHeader http.Header log zerolog.Logger mu sync.Mutex closed bool + subCloser jsonrpc.ClientCloser // WS client for subscriptions, if any } const ( @@ -52,14 +55,20 @@ const ( defaultRPCRetryDelay = 100 * time.Millisecond ) -// NewCelestiaNodeFetcher connects to a Celestia node at the given WebSocket address. +// NewCelestiaNodeFetcher connects to a Celestia node at the given address. +// Regular RPC calls use the provided URL scheme (typically HTTP). +// Subscriptions automatically upgrade to WebSocket when needed. func NewCelestiaNodeFetcher(ctx context.Context, addr, token string, log zerolog.Logger) (*CelestiaNodeFetcher, error) { headers := http.Header{} if token != "" { headers.Set("Authorization", "Bearer "+token) } - f := &CelestiaNodeFetcher{log: log} + f := &CelestiaNodeFetcher{ + addr: addr, + authHeader: headers, + log: log, + } var err error f.headerCloser, err = jsonrpc.NewClient(ctx, addr, "header", &f.header, headers) @@ -76,6 +85,19 @@ func NewCelestiaNodeFetcher(ctx context.Context, addr, token string, log zerolog return f, nil } +// httpToWS converts http:// to ws:// and https:// to wss://. +// Returns the address unchanged if it already uses a WS scheme. +func httpToWS(addr string) string { + switch { + case strings.HasPrefix(addr, "http://"): + return "ws://" + strings.TrimPrefix(addr, "http://") + case strings.HasPrefix(addr, "https://"): + return "wss://" + strings.TrimPrefix(addr, "https://") + default: + return addr + } +} + func (f *CelestiaNodeFetcher) GetHeader(ctx context.Context, height uint64) (*types.Header, error) { raw, err := f.callRawWithRetry(ctx, "header.GetByHeight", func(callCtx context.Context) (json.RawMessage, error) { return f.header.GetByHeight(callCtx, height) @@ -147,11 +169,55 @@ func (f *CelestiaNodeFetcher) callRawWithRetry(ctx context.Context, op string, f } func (f *CelestiaNodeFetcher) SubscribeHeaders(ctx context.Context) (<-chan *types.Header, error) { + // Try subscription on the existing client first (works if already on WS). rawCh, err := f.header.Subscribe(ctx) if err != nil { - return nil, fmt.Errorf("header.Subscribe: %w", err) + // The client is likely HTTP — upgrade to WS for subscriptions. + rawCh, err = f.subscribeViaWS(ctx) + } + if err != nil { + // Neither worked — fall back to polling. + f.log.Warn().Err(err).Msg("header.Subscribe not available, falling back to polling") + return f.pollHeaders(ctx), nil + } + + return f.forwardHeaders(ctx, rawCh), nil +} + +// subscribeViaWS creates a separate WebSocket client for header subscriptions. +// This handles the case where the main client uses HTTP (no channel support). +func (f *CelestiaNodeFetcher) subscribeViaWS(ctx context.Context) (<-chan json.RawMessage, error) { + wsAddr := httpToWS(f.addr) + if wsAddr == f.addr { + return nil, fmt.Errorf("address %q is not HTTP; cannot upgrade to WebSocket", f.addr) + } + + f.log.Info().Str("ws_addr", wsAddr).Msg("upgrading to WebSocket for header subscription") + + var subAPI headerAPI + closer, err := jsonrpc.NewClient(ctx, wsAddr, "header", &subAPI, f.authHeader) + if err != nil { + return nil, fmt.Errorf("connect WS header client: %w", err) + } + + f.mu.Lock() + f.subCloser = closer + f.mu.Unlock() + + rawCh, err := subAPI.Subscribe(ctx) + if err != nil { + closer() + f.mu.Lock() + f.subCloser = nil + f.mu.Unlock() + return nil, fmt.Errorf("header.Subscribe via WS: %w", err) } + return rawCh, nil +} + +// forwardHeaders maps raw JSON headers from a subscription channel to typed headers. +func (f *CelestiaNodeFetcher) forwardHeaders(ctx context.Context, rawCh <-chan json.RawMessage) <-chan *types.Header { out := make(chan *types.Header, 64) go func() { defer close(out) @@ -176,8 +242,47 @@ func (f *CelestiaNodeFetcher) SubscribeHeaders(ctx context.Context) (<-chan *typ } } }() + return out +} + +// pollHeaders polls GetNetworkHead at 1s intervals, emitting new headers when +// the height advances. Used as a fallback when header.Subscribe is unavailable. +func (f *CelestiaNodeFetcher) pollHeaders(ctx context.Context) <-chan *types.Header { + out := make(chan *types.Header, 64) + go func() { + defer close(out) - return out, nil + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + var lastHeight uint64 + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + hdr, err := f.GetNetworkHead(ctx) + if err != nil { + if ctx.Err() != nil { + return + } + f.log.Warn().Err(err).Msg("poll network head failed") + continue + } + if hdr.Height <= lastHeight { + continue + } + lastHeight = hdr.Height + select { + case out <- hdr: + case <-ctx.Done(): + return + } + } + } + }() + return out } // GetProof forwards a blob proof request to the upstream Celestia node. @@ -210,6 +315,9 @@ func (f *CelestiaNodeFetcher) Close() error { return nil } f.closed = true + if f.subCloser != nil { + f.subCloser() + } f.headerCloser() f.blobCloser() return nil From cfa89fa90ae3b59ca49254b0c8ac36e6dc273f7e Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 25 Feb 2026 13:57:06 +0100 Subject: [PATCH 2/5] fix: close previous WS client on re-subscription to prevent leak Co-Authored-By: Claude Opus 4.6 --- pkg/fetch/celestia_node.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/fetch/celestia_node.go b/pkg/fetch/celestia_node.go index 3115b50..1defdfb 100644 --- a/pkg/fetch/celestia_node.go +++ b/pkg/fetch/celestia_node.go @@ -201,8 +201,12 @@ func (f *CelestiaNodeFetcher) subscribeViaWS(ctx context.Context) (<-chan json.R } f.mu.Lock() + old := f.subCloser f.subCloser = closer f.mu.Unlock() + if old != nil { + old() + } rawCh, err := subAPI.Subscribe(ctx) if err != nil { From 81403e61fe5750379a2bf719b0491764a3da2551 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 25 Feb 2026 14:46:56 +0100 Subject: [PATCH 3/5] fix: use goroutine+timeout for WS dial to avoid context leak The context passed to go-jsonrpc NewClient controls the WebSocket connection lifetime, not just the dial. Using a timeout context caused the subscription to close immediately when defer cancel() fired. Run the WS dial+subscribe in a goroutine with a select timeout so the parent context flows through to the WS connection, while still bounding the wait if the node doesn't accept WebSocket connections. Co-Authored-By: Claude Opus 4.6 --- pkg/fetch/celestia_node.go | 64 ++++++++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/pkg/fetch/celestia_node.go b/pkg/fetch/celestia_node.go index 1defdfb..6494a83 100644 --- a/pkg/fetch/celestia_node.go +++ b/pkg/fetch/celestia_node.go @@ -53,6 +53,7 @@ const ( defaultRPCTimeout = 8 * time.Second defaultRPCMaxRetries = 2 defaultRPCRetryDelay = 100 * time.Millisecond + wsDialTimeout = 10 * time.Second ) // NewCelestiaNodeFetcher connects to a Celestia node at the given address. @@ -184,8 +185,17 @@ func (f *CelestiaNodeFetcher) SubscribeHeaders(ctx context.Context) (<-chan *typ return f.forwardHeaders(ctx, rawCh), nil } +// wsSubscribeResult holds the outcome of a WS subscribe attempt. +type wsSubscribeResult struct { + ch <-chan json.RawMessage + closer jsonrpc.ClientCloser + err error +} + // subscribeViaWS creates a separate WebSocket client for header subscriptions. // This handles the case where the main client uses HTTP (no channel support). +// The connection attempt is bounded by wsDialTimeout; if the node doesn't +// support WebSocket the goroutine is abandoned (cleaned up when ctx ends). func (f *CelestiaNodeFetcher) subscribeViaWS(ctx context.Context) (<-chan json.RawMessage, error) { wsAddr := httpToWS(f.addr) if wsAddr == f.addr { @@ -194,30 +204,44 @@ func (f *CelestiaNodeFetcher) subscribeViaWS(ctx context.Context) (<-chan json.R f.log.Info().Str("ws_addr", wsAddr).Msg("upgrading to WebSocket for header subscription") - var subAPI headerAPI - closer, err := jsonrpc.NewClient(ctx, wsAddr, "header", &subAPI, f.authHeader) - if err != nil { - return nil, fmt.Errorf("connect WS header client: %w", err) - } - - f.mu.Lock() - old := f.subCloser - f.subCloser = closer - f.mu.Unlock() - if old != nil { - old() - } + // Run the WS dial + subscribe in a goroutine so we can timeout if the + // node doesn't accept WebSocket connections. The parent ctx is passed to + // NewClient because it controls the WS connection lifetime (not just dial). + done := make(chan wsSubscribeResult, 1) + go func() { + var subAPI headerAPI + closer, err := jsonrpc.NewClient(ctx, wsAddr, "header", &subAPI, f.authHeader) + if err != nil { + done <- wsSubscribeResult{err: fmt.Errorf("connect WS header client: %w", err)} + return + } + ch, err := subAPI.Subscribe(ctx) + if err != nil { + closer() + done <- wsSubscribeResult{err: fmt.Errorf("header.Subscribe via WS: %w", err)} + return + } + done <- wsSubscribeResult{ch: ch, closer: closer} + }() - rawCh, err := subAPI.Subscribe(ctx) - if err != nil { - closer() + select { + case r := <-done: + if r.err != nil { + return nil, r.err + } f.mu.Lock() - f.subCloser = nil + old := f.subCloser + f.subCloser = r.closer f.mu.Unlock() - return nil, fmt.Errorf("header.Subscribe via WS: %w", err) + if old != nil { + old() + } + return r.ch, nil + case <-time.After(wsDialTimeout): + return nil, fmt.Errorf("WS connection to %s timed out after %s", wsAddr, wsDialTimeout) + case <-ctx.Done(): + return nil, ctx.Err() } - - return rawCh, nil } // forwardHeaders maps raw JSON headers from a subscription channel to typed headers. From 660006e40e0abbfea13d7cd8fcde6ccfc9ead12c Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 25 Feb 2026 14:55:07 +0100 Subject: [PATCH 4/5] feat: add periodic streaming progress log and fix WS dial context Add info-level log every 30s during streaming with current height and blocks processed in the interval. Previously streaming only logged at debug level, making it invisible at default log config. Also fixes WS subscription dial: uses goroutine+select timeout instead of context.WithTimeout, since go-jsonrpc uses the context for the WS connection lifetime (not just dial). The previous approach cancelled the subscription immediately after returning. Co-Authored-By: Claude Opus 4.6 --- pkg/sync/subscription.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/sync/subscription.go b/pkg/sync/subscription.go index 301e01d..ecb48ed 100644 --- a/pkg/sync/subscription.go +++ b/pkg/sync/subscription.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/rs/zerolog" @@ -12,6 +13,8 @@ import ( "github.com/evstack/apex/pkg/types" ) +const streamingLogInterval = 30 * time.Second + // SubscriptionManager processes new headers from a live subscription. type SubscriptionManager struct { store store.Store @@ -45,10 +48,20 @@ func (sm *SubscriptionManager) Run(ctx context.Context) error { networkHeight = ss.NetworkHeight } + ticker := time.NewTicker(streamingLogInterval) + defer ticker.Stop() + var processed uint64 + for { select { case <-ctx.Done(): return nil + case <-ticker.C: + sm.log.Info(). + Uint64("height", lastHeight). + Uint64("blocks", processed). + Msg("streaming progress") + processed = 0 case hdr, ok := <-ch: if !ok { // Channel closed (disconnect or ctx cancelled). @@ -72,6 +85,7 @@ func (sm *SubscriptionManager) Run(ctx context.Context) error { } lastHeight = hdr.Height + processed++ } } } From 430e9aef5d0545caeca3bc4e4378af961152cb3e Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 25 Feb 2026 14:55:57 +0100 Subject: [PATCH 5/5] docs: document poll fallback gap-skipping behavior Co-Authored-By: Claude Opus 4.6 --- pkg/fetch/celestia_node.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/fetch/celestia_node.go b/pkg/fetch/celestia_node.go index 6494a83..f7035bd 100644 --- a/pkg/fetch/celestia_node.go +++ b/pkg/fetch/celestia_node.go @@ -275,6 +275,10 @@ func (f *CelestiaNodeFetcher) forwardHeaders(ctx context.Context, rawCh <-chan j // pollHeaders polls GetNetworkHead at 1s intervals, emitting new headers when // the height advances. Used as a fallback when header.Subscribe is unavailable. +// NOTE: only the current chain tip is emitted; intermediate heights produced +// between ticks are skipped. The sync coordinator handles this via gap detection +// and re-backfill, so no data is lost — but this path is higher latency than +// a true subscription. func (f *CelestiaNodeFetcher) pollHeaders(ctx context.Context) <-chan *types.Header { out := make(chan *types.Header, 64) go func() {