diff --git a/apps/evm/server/force_inclusion_test.go b/apps/evm/server/force_inclusion_test.go index a1ad3059e..77aa43e38 100644 --- a/apps/evm/server/force_inclusion_test.go +++ b/apps/evm/server/force_inclusion_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/evstack/ev-node/pkg/config" + blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" da "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" "github.com/rs/zerolog" @@ -73,6 +74,14 @@ func (m *mockDA) HasForcedInclusionNamespace() bool { return true } +func (m *mockDA) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) { + return nil, nil +} + +func (m *mockDA) LocalHead(ctx context.Context) (uint64, error) { + return 0, nil +} + func TestForceInclusionServer_handleSendRawTransaction_Success(t *testing.T) { testHeight := uint64(100) diff --git a/block/internal/common/metrics.go b/block/internal/common/metrics.go index 13a041943..cf806bd89 100644 --- a/block/internal/common/metrics.go +++ b/block/internal/common/metrics.go @@ -69,6 +69,11 @@ type Metrics struct { // Forced inclusion metrics ForcedInclusionTxsInGracePeriod metrics.Gauge // Number of forced inclusion txs currently in grace period ForcedInclusionTxsMalicious metrics.Counter // Total number of forced inclusion txs marked as malicious + + // Sync mode metrics + SyncMode metrics.Gauge // Current sync mode: 0=catchup, 1=follow + SubscribeErrors metrics.Counter // Number of subscription failures + ModeSwitches metrics.Counter // Number of catchup<->follow mode transitions } // PrometheusMetrics returns Metrics built using Prometheus client library @@ -201,6 +206,28 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Help: "Total number of forced inclusion transactions marked as malicious (past grace boundary)", }, labels).With(labelsAndValues...) + // Sync mode metrics + m.SyncMode = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "sync_mode", + Help: "Current sync mode: 0=catchup (polling), 1=follow (subscription)", + }, labels).With(labelsAndValues...) + + m.SubscribeErrors = prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "subscribe_errors_total", + Help: "Total number of DA subscription failures", + }, labels).With(labelsAndValues...) + + m.ModeSwitches = prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "mode_switches_total", + Help: "Total number of sync mode transitions between catchup and follow", + }, labels).With(labelsAndValues...) + // DA Submitter metrics m.DASubmitterPendingBlobs = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, @@ -269,6 +296,11 @@ func NopMetrics() *Metrics { // Forced inclusion metrics ForcedInclusionTxsInGracePeriod: discard.NewGauge(), ForcedInclusionTxsMalicious: discard.NewCounter(), + + // Sync mode metrics + SyncMode: discard.NewGauge(), + SubscribeErrors: discard.NewCounter(), + ModeSwitches: discard.NewCounter(), } // Initialize maps with no-op metrics diff --git a/block/internal/common/subscription.go b/block/internal/common/subscription.go new file mode 100644 index 000000000..4353ecb12 --- /dev/null +++ b/block/internal/common/subscription.go @@ -0,0 +1,24 @@ +package common + +import blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" + +// BlobsFromSubscription returns non-empty blob data from a subscription response. +func BlobsFromSubscription(resp *blobrpc.SubscriptionResponse) [][]byte { + if resp == nil || len(resp.Blobs) == 0 { + return nil + } + + blobs := make([][]byte, 0, len(resp.Blobs)) + for _, blob := range resp.Blobs { + if blob == nil { + continue + } + data := blob.Data() + if len(data) == 0 { + continue + } + blobs = append(blobs, data) + } + + return blobs +} diff --git a/block/internal/da/async_block_retriever.go b/block/internal/da/async_block_retriever.go index 5c147de30..05d7f27c8 100644 --- a/block/internal/da/async_block_retriever.go +++ b/block/internal/da/async_block_retriever.go @@ -25,6 +25,7 @@ type AsyncBlockRetriever interface { Stop() GetCachedBlock(ctx context.Context, daHeight uint64) (*BlockData, error) UpdateCurrentHeight(height uint64) + StoreBlock(ctx context.Context, height uint64, blobs [][]byte, timestamp time.Time) } // BlockData contains data retrieved from a single DA height @@ -125,6 +126,68 @@ func (f *asyncBlockRetriever) UpdateCurrentHeight(height uint64) { } } +// StoreBlock caches a block's blobs, favoring existing data to avoid churn. +func (f *asyncBlockRetriever) StoreBlock(ctx context.Context, height uint64, blobs [][]byte, timestamp time.Time) { + if len(f.namespace) == 0 { + return + } + if height < f.daStartHeight { + return + } + if len(blobs) == 0 { + return + } + + filtered := make([][]byte, 0, len(blobs)) + for _, blob := range blobs { + if len(blob) > 0 { + filtered = append(filtered, blob) + } + } + if len(filtered) == 0 { + return + } + + if timestamp.IsZero() { + timestamp = time.Now().UTC() + } + + key := newBlockDataKey(height) + if existing, err := f.cache.Get(ctx, key); err == nil { + var pbBlock pb.BlockData + if err := proto.Unmarshal(existing, &pbBlock); err == nil && len(pbBlock.Blobs) > 0 { + return + } + } + + pbBlock := &pb.BlockData{ + Height: height, + Timestamp: timestamp.Unix(), + Blobs: filtered, + } + data, err := proto.Marshal(pbBlock) + if err != nil { + f.logger.Error(). + Err(err). + Uint64("height", height). + Msg("failed to marshal block for caching") + return + } + + if err := f.cache.Put(ctx, key, data); err != nil { + f.logger.Error(). + Err(err). + Uint64("height", height). + Msg("failed to cache block") + return + } + + f.logger.Debug(). + Uint64("height", height). + Int("blob_count", len(filtered)). + Msg("cached block from subscription") +} + func newBlockDataKey(height uint64) ds.Key { return ds.NewKey(fmt.Sprintf("/block/%d", height)) } diff --git a/block/internal/da/client.go b/block/internal/da/client.go index d2e1d626e..146ee7ccf 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -442,3 +442,27 @@ func (c *client) Validate(ctx context.Context, ids []datypes.ID, proofs []datype return results, nil } + +// Subscribe subscribes to blobs in the specified namespace. +// Returns a channel that receives subscription responses as new blobs are included. +func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) { + ns, err := share.NewNamespaceFromBytes(namespace) + if err != nil { + return nil, fmt.Errorf("invalid namespace: %w", err) + } + + return c.blobAPI.Subscribe(ctx, ns) +} + +// LocalHead returns the height of the locally synced DA head. +func (c *client) LocalHead(ctx context.Context) (uint64, error) { + headCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + defer cancel() + + header, err := c.headerAPI.LocalHead(headCtx) + if err != nil { + return 0, fmt.Errorf("failed to get local head: %w", err) + } + + return header.Height, nil +} diff --git a/block/internal/da/forced_inclusion_retriever.go b/block/internal/da/forced_inclusion_retriever.go index 7b07b7d5d..39794d1d5 100644 --- a/block/internal/da/forced_inclusion_retriever.go +++ b/block/internal/da/forced_inclusion_retriever.go @@ -4,11 +4,14 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/rs/zerolog" + "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/pkg/config" + blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/types" ) @@ -23,6 +26,10 @@ type ForcedInclusionRetriever struct { daEpochSize uint64 daStartHeight uint64 asyncFetcher AsyncBlockRetriever + + mu sync.Mutex + lastProcessedEpochEnd uint64 + hasProcessedEpoch bool } // ForcedInclusionEvent contains forced inclusion transactions retrieved from DA. @@ -33,6 +40,14 @@ type ForcedInclusionEvent struct { Txs [][]byte } +func emptyForcedInclusionEvent(daHeight uint64) *ForcedInclusionEvent { + return &ForcedInclusionEvent{ + StartDaHeight: daHeight, + EndDaHeight: daHeight, + Txs: [][]byte{}, + } +} + // NewForcedInclusionRetriever creates a new forced inclusion retriever. // It internally creates and manages an AsyncBlockRetriever for background prefetching. func NewForcedInclusionRetriever( @@ -68,11 +83,29 @@ func (r *ForcedInclusionRetriever) Stop() { r.asyncFetcher.Stop() } +// HandleSubscriptionResponse caches forced inclusion blobs from subscription updates. +func (r *ForcedInclusionRetriever) HandleSubscriptionResponse(resp *blobrpc.SubscriptionResponse) { + if resp == nil { + return + } + if !r.client.HasForcedInclusionNamespace() { + return + } + + r.asyncFetcher.UpdateCurrentHeight(resp.Height) + + blobs := common.BlobsFromSubscription(resp) + if len(blobs) == 0 { + return + } + + r.asyncFetcher.StoreBlock(context.Background(), resp.Height, blobs, time.Now().UTC()) +} + // RetrieveForcedIncludedTxs retrieves forced inclusion transactions at the given DA height. // It respects epoch boundaries and only fetches at epoch end. // It tries to get blocks from the async fetcher cache first, then falls back to sync fetching. func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) { - // when daStartHeight is not set or no namespace is configured, we retrieve nothing. if !r.client.HasForcedInclusionNamespace() { return nil, ErrForceInclusionNotConfigured } @@ -82,21 +115,28 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context } epochStart, epochEnd, currentEpochNumber := types.CalculateEpochBoundaries(daHeight, r.daStartHeight, r.daEpochSize) - - // Update the async fetcher's current height so it knows what to prefetch r.asyncFetcher.UpdateCurrentHeight(daHeight) - if daHeight != epochEnd { + r.mu.Lock() + lastProcessed := r.lastProcessedEpochEnd + hasProcessed := r.hasProcessedEpoch + r.mu.Unlock() + + if hasProcessed { + if r.daEpochSize == 0 { + return emptyForcedInclusionEvent(daHeight), nil + } + epochStart = lastProcessed + 1 + epochEnd = epochStart + r.daEpochSize - 1 + currentEpochNumber = types.CalculateEpochNumber(epochEnd, r.daStartHeight, r.daEpochSize) + } + + if daHeight < epochEnd { r.logger.Debug(). Uint64("da_height", daHeight). Uint64("epoch_end", epochEnd). Msg("not at epoch end - returning empty transactions") - - return &ForcedInclusionEvent{ - StartDaHeight: daHeight, - EndDaHeight: daHeight, - Txs: [][]byte{}, - }, nil + return emptyForcedInclusionEvent(daHeight), nil } r.logger.Debug(). @@ -106,123 +146,100 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context Uint64("epoch_num", currentEpochNumber). Msg("retrieving forced included transactions from DA epoch") - event := &ForcedInclusionEvent{ - StartDaHeight: epochStart, - EndDaHeight: epochEnd, - Txs: [][]byte{}, + event, err := r.retrieveEpoch(ctx, epochStart, epochEnd) + if err != nil { + return nil, err } - // Collect all heights in this epoch - var heights []uint64 - for h := epochStart; h <= epochEnd; h++ { - heights = append(heights, h) - } + r.mu.Lock() + r.lastProcessedEpochEnd = epochEnd + r.hasProcessedEpoch = true + r.mu.Unlock() + + return event, nil +} - // Try to get blocks from cache first - cachedBlocks := make(map[uint64]*BlockData) +func (r *ForcedInclusionRetriever) retrieveEpoch(ctx context.Context, epochStart, epochEnd uint64) (*ForcedInclusionEvent, error) { + epochSize := epochEnd - epochStart + 1 + blocks := make(map[uint64]*BlockData, epochSize) var missingHeights []uint64 - for _, h := range heights { + // Check cache for each height in the epoch + for h := epochStart; h <= epochEnd; h++ { block, err := r.asyncFetcher.GetCachedBlock(ctx, h) if err != nil { - r.logger.Debug(). - Err(err). - Uint64("height", h). - Msg("error getting cached block, will fetch synchronously") + r.logger.Debug().Err(err).Uint64("height", h).Msg("cache error, will fetch synchronously") missingHeights = append(missingHeights, h) continue } - if block == nil { // Cache miss + if block == nil { missingHeights = append(missingHeights, h) - } else { // Cache hit - cachedBlocks[h] = block + } else { + blocks[h] = block } } - // Fetch missing heights synchronously and store in map - syncFetchedBlocks := make(map[uint64]*BlockData) + // Fetch missing heights synchronously var processErrs error + namespace := r.client.GetForcedInclusionNamespace() for _, h := range missingHeights { - result := r.client.Retrieve(ctx, h, r.client.GetForcedInclusionNamespace()) - if result.Code == datypes.StatusHeightFromFuture { - r.logger.Debug(). - Uint64("height", h). - Msg("height not yet available on DA - backoff required") + result := r.client.Retrieve(ctx, h, namespace) + + switch result.Code { + case datypes.StatusHeightFromFuture: + r.logger.Debug().Uint64("height", h).Msg("height not yet available on DA") return nil, fmt.Errorf("%w: height %d not yet available", datypes.ErrHeightFromFuture, h) - } - if result.Code == datypes.StatusNotFound { + case datypes.StatusNotFound: r.logger.Debug().Uint64("height", h).Msg("no forced inclusion blobs at height") - continue - } - - if result.Code != datypes.StatusSuccess { - err := fmt.Errorf("failed to retrieve forced inclusion blobs at height %d: %s", h, result.Message) - processErrs = errors.Join(processErrs, err) - continue - } - - // Store the sync-fetched block data - syncFetchedBlocks[h] = &BlockData{ - Blobs: result.Data, - Timestamp: result.Timestamp, - } - } - // Process all blocks in height order - for _, h := range heights { - var block *BlockData - var source string - - // Check cached blocks first, then sync-fetched - if cachedBlock, ok := cachedBlocks[h]; ok { - block = cachedBlock - source = "cache" - } else if syncBlock, ok := syncFetchedBlocks[h]; ok { - block = syncBlock - source = "sync" - } - - if block != nil { - // Add blobs from block - for _, blob := range block.Blobs { - if len(blob) > 0 { - event.Txs = append(event.Txs, blob) - } - } - - // Update timestamp if newer - if block.Timestamp.After(event.Timestamp) { - event.Timestamp = block.Timestamp + case datypes.StatusSuccess: + blocks[h] = &BlockData{ + Blobs: result.Data, + Timestamp: result.Timestamp, } - r.logger.Debug(). - Uint64("height", h). - Int("blob_count", len(block.Blobs)). - Str("source", source). - Msg("added blobs from block") + default: + processErrs = errors.Join(processErrs, fmt.Errorf("failed to retrieve at height %d: %s", h, result.Message)) } - - // Clean up maps to prevent unbounded memory growth - delete(cachedBlocks, h) - delete(syncFetchedBlocks, h) } - // any error during process, need to retry at next call if processErrs != nil { r.logger.Warn(). - Uint64("da_height", daHeight). Uint64("epoch_start", epochStart). Uint64("epoch_end", epochEnd). - Uint64("epoch_num", currentEpochNumber). Err(processErrs). - Msg("Failed to retrieve DA epoch.. retrying next iteration") + Msg("failed to retrieve DA epoch") + return nil, processErrs + } - return &ForcedInclusionEvent{ - StartDaHeight: daHeight, - EndDaHeight: daHeight, - Txs: [][]byte{}, - }, nil + // Aggregate blobs in height order + event := &ForcedInclusionEvent{ + StartDaHeight: epochStart, + EndDaHeight: epochEnd, + Txs: [][]byte{}, + } + + for h := epochStart; h <= epochEnd; h++ { + block, ok := blocks[h] + if !ok { + continue + } + + for _, blob := range block.Blobs { + if len(blob) > 0 { + event.Txs = append(event.Txs, blob) + } + } + + if block.Timestamp.After(event.Timestamp) { + event.Timestamp = block.Timestamp + } + + r.logger.Debug(). + Uint64("height", h). + Int("blob_count", len(block.Blobs)). + Msg("added blobs from block") } return event, nil diff --git a/block/internal/da/forced_inclusion_retriever_test.go b/block/internal/da/forced_inclusion_retriever_test.go index 446b655be..9b3628b5b 100644 --- a/block/internal/da/forced_inclusion_retriever_test.go +++ b/block/internal/da/forced_inclusion_retriever_test.go @@ -224,11 +224,9 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_ErrorHandling(t *tes defer retriever.Stop() ctx := context.Background() - // Should return empty event with no error (errors are logged and retried later) - event, err := retriever.RetrieveForcedIncludedTxs(ctx, 100) - assert.NilError(t, err) - assert.Assert(t, event != nil) - assert.Equal(t, len(event.Txs), 0) + // Should return error so caller can retry without skipping the epoch + _, err := retriever.RetrieveForcedIncludedTxs(ctx, 100) + assert.Assert(t, err != nil) } func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EmptyBlobsSkipped(t *testing.T) { diff --git a/block/internal/da/interface.go b/block/internal/da/interface.go index 69c2d18f7..1d56009d4 100644 --- a/block/internal/da/interface.go +++ b/block/internal/da/interface.go @@ -3,6 +3,7 @@ package da import ( "context" + blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" datypes "github.com/evstack/ev-node/pkg/da/types" ) @@ -22,6 +23,15 @@ type Client interface { GetDataNamespace() []byte GetForcedInclusionNamespace() []byte HasForcedInclusionNamespace() bool + + // Subscribe subscribes to blobs in the specified namespace. + // Returns a channel that receives subscription responses as new blobs are included. + // Used for follow mode to receive real-time blob notifications. + Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) + + // LocalHead returns the height of the locally synced DA head. + // Used to determine if the node is caught up with the DA layer. + LocalHead(ctx context.Context) (uint64, error) } // Verifier defines the interface for DA proof verification operations. diff --git a/block/internal/da/tracing.go b/block/internal/da/tracing.go index 45fae2e86..c7a44f8e3 100644 --- a/block/internal/da/tracing.go +++ b/block/internal/da/tracing.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" + blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" datypes "github.com/evstack/ev-node/pkg/da/types" ) @@ -132,6 +133,14 @@ func (t *tracedClient) HasForcedInclusionNamespace() bool { return t.inner.HasForcedInclusionNamespace() } +func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) { + return t.inner.Subscribe(ctx, namespace) +} + +func (t *tracedClient) LocalHead(ctx context.Context) (uint64, error) { + return t.inner.LocalHead(ctx) +} + type submitError struct{ msg string } func (e *submitError) Error() string { return e.msg } diff --git a/block/internal/da/tracing_test.go b/block/internal/da/tracing_test.go index ca288770c..470a3f042 100644 --- a/block/internal/da/tracing_test.go +++ b/block/internal/da/tracing_test.go @@ -12,6 +12,7 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" + blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" datypes "github.com/evstack/ev-node/pkg/da/types" ) @@ -58,6 +59,10 @@ func (m *mockFullClient) GetHeaderNamespace() []byte { return []byte{0x func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} } func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} } func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true } +func (m *mockFullClient) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) { + return nil, nil +} +func (m *mockFullClient) LocalHead(ctx context.Context) (uint64, error) { return 0, nil } // setup a tracer provider + span recorder func setupDATrace(t *testing.T, inner FullClient) (FullClient, *tracetest.SpanRecorder) { diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index 1307b3968..576862495 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -21,6 +21,9 @@ import ( // DARetriever defines the interface for retrieving events from the DA layer type DARetriever interface { RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) + // ProcessBlobs processes raw blobs from subscription and returns height events. + // Used by follow mode to process real-time blob notifications. + ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent } // daRetriever handles DA retrieval operations for syncing @@ -72,7 +75,7 @@ func (r *daRetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]co } r.logger.Debug().Int("blobs", len(blobsResp.Data)).Uint64("da_height", daHeight).Msg("retrieved blob data") - return r.processBlobs(ctx, blobsResp.Data, daHeight), nil + return r.ProcessBlobs(ctx, blobsResp.Data, daHeight), nil } // fetchBlobs retrieves blobs from both header and data namespaces @@ -148,8 +151,9 @@ func (r *daRetriever) validateBlobResponse(res datypes.ResultRetrieve, daHeight } } -// processBlobs processes retrieved blobs to extract headers and data and returns height events -func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { +// ProcessBlobs processes retrieved blobs to extract headers and data and returns height events. +// This method implements the DARetriever interface and is used by both polling and subscription modes. +func (r *daRetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { // Decode all blobs for _, bz := range blobs { if len(bz) == 0 { diff --git a/block/internal/syncing/da_retriever_mock.go b/block/internal/syncing/da_retriever_mock.go index d94dff4d6..10e08bbd9 100644 --- a/block/internal/syncing/da_retriever_mock.go +++ b/block/internal/syncing/da_retriever_mock.go @@ -38,6 +38,71 @@ func (_m *MockDARetriever) EXPECT() *MockDARetriever_Expecter { return &MockDARetriever_Expecter{mock: &_m.Mock} } +// ProcessBlobs provides a mock function for the type MockDARetriever +func (_mock *MockDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { + ret := _mock.Called(ctx, blobs, daHeight) + + if len(ret) == 0 { + panic("no return value specified for ProcessBlobs") + } + + var r0 []common.DAHeightEvent + if returnFunc, ok := ret.Get(0).(func(context.Context, [][]byte, uint64) []common.DAHeightEvent); ok { + r0 = returnFunc(ctx, blobs, daHeight) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]common.DAHeightEvent) + } + } + return r0 +} + +// MockDARetriever_ProcessBlobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessBlobs' +type MockDARetriever_ProcessBlobs_Call struct { + *mock.Call +} + +// ProcessBlobs is a helper method to define mock.On call +// - ctx context.Context +// - blobs [][]byte +// - daHeight uint64 +func (_e *MockDARetriever_Expecter) ProcessBlobs(ctx interface{}, blobs interface{}, daHeight interface{}) *MockDARetriever_ProcessBlobs_Call { + return &MockDARetriever_ProcessBlobs_Call{Call: _e.mock.On("ProcessBlobs", ctx, blobs, daHeight)} +} + +func (_c *MockDARetriever_ProcessBlobs_Call) Run(run func(ctx context.Context, blobs [][]byte, daHeight uint64)) *MockDARetriever_ProcessBlobs_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 [][]byte + if args[1] != nil { + arg1 = args[1].([][]byte) + } + var arg2 uint64 + if args[2] != nil { + arg2 = args[2].(uint64) + } + run( + arg0, + arg1, + arg2, + ) + }) + return _c +} + +func (_c *MockDARetriever_ProcessBlobs_Call) Return(dAHeightEvents []common.DAHeightEvent) *MockDARetriever_ProcessBlobs_Call { + _c.Call.Return(dAHeightEvents) + return _c +} + +func (_c *MockDARetriever_ProcessBlobs_Call) RunAndReturn(run func(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent) *MockDARetriever_ProcessBlobs_Call { + _c.Call.Return(run) + return _c +} + // RetrieveFromDA provides a mock function for the type MockDARetriever func (_mock *MockDARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { ret := _mock.Called(ctx, daHeight) diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go index 8b27513a8..76cc666dd 100644 --- a/block/internal/syncing/da_retriever_test.go +++ b/block/internal/syncing/da_retriever_test.go @@ -148,7 +148,7 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) { dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2) hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data, nil) - events := r.processBlobs(context.Background(), [][]byte{hdrBin, dataBin}, 77) + events := r.ProcessBlobs(context.Background(), [][]byte{hdrBin, dataBin}, 77) require.Len(t, events, 1) assert.Equal(t, uint64(2), events[0].Header.Height()) assert.Equal(t, uint64(2), events[0].Data.Height()) @@ -172,7 +172,7 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) { // Header with no data hash present should trigger empty data creation (per current logic) hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil, nil) - events := r.processBlobs(context.Background(), [][]byte{hb}, 88) + events := r.ProcessBlobs(context.Background(), [][]byte{hb}, 88) require.Len(t, events, 1) assert.Equal(t, uint64(3), events[0].Header.Height()) assert.NotNil(t, events[0].Data) @@ -282,14 +282,14 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) { hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data.Data, nil) // Process header from DA height 100 first - events1 := r.processBlobs(context.Background(), [][]byte{hdrBin}, 100) + events1 := r.ProcessBlobs(context.Background(), [][]byte{hdrBin}, 100) require.Len(t, events1, 0, "should not create event yet - data is missing") // Verify header is stored in pending headers require.Contains(t, r.pendingHeaders, uint64(5), "header should be stored as pending") // Process data from DA height 102 - events2 := r.processBlobs(context.Background(), [][]byte{dataBin}, 102) + events2 := r.ProcessBlobs(context.Background(), [][]byte{dataBin}, 102) require.Len(t, events2, 1, "should create event when matching data arrives") event := events2[0] @@ -319,7 +319,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin hdr5Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data5.Data, nil) // Process multiple headers from DA height 200 - should be stored as pending - events1 := r.processBlobs(context.Background(), [][]byte{hdr3Bin, hdr4Bin, hdr5Bin}, 200) + events1 := r.ProcessBlobs(context.Background(), [][]byte{hdr3Bin, hdr4Bin, hdr5Bin}, 200) require.Len(t, events1, 0, "should not create events yet - all data is missing") // Verify all headers are stored in pending @@ -328,7 +328,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin require.Contains(t, r.pendingHeaders, uint64(5), "header 5 should be pending") // Process some data from DA height 203 - should create partial events - events2 := r.processBlobs(context.Background(), [][]byte{data3Bin, data5Bin}, 203) + events2 := r.ProcessBlobs(context.Background(), [][]byte{data3Bin, data5Bin}, 203) require.Len(t, events2, 2, "should create events for heights 3 and 5") // Sort events by height for consistent testing @@ -352,7 +352,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin require.NotContains(t, r.pendingHeaders, uint64(5), "header 5 should be removed from pending") // Process remaining data from DA height 205 - events3 := r.processBlobs(context.Background(), [][]byte{data4Bin}, 205) + events3 := r.ProcessBlobs(context.Background(), [][]byte{data4Bin}, 205) require.Len(t, events3, 1, "should create event for height 4") // Verify final event for height 4 diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 0d6d56f0d..1deeb9909 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -22,12 +22,45 @@ import ( "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/block/internal/da" "github.com/evstack/ev-node/pkg/config" + blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" datypes "github.com/evstack/ev-node/pkg/da/types" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" ) +// SyncMode represents the current synchronization mode for the DA worker. +type SyncMode int + +const ( + // SyncModeCatchup indicates the node is behind the DA chain head and polling aggressively. + SyncModeCatchup SyncMode = iota + // SyncModeFollow indicates the node is caught up and using subscription for real-time updates. + SyncModeFollow +) + +// String returns a human-readable representation of the sync mode. +func (m SyncMode) String() string { + switch m { + case SyncModeCatchup: + return "catchup" + case SyncModeFollow: + return "follow" + default: + return "unknown" + } +} + +const ( + // catchupThreshold is the number of DA blocks behind local head + // before switching from follow to catchup mode. + catchupThreshold = 2 + + // followWatchdogMultiplier is the multiplier for BlockTime + // used as subscription watchdog timeout. + followWatchdogMultiplier = 3 +) + // forcedInclusionGracePeriodConfig contains internal configuration for forced inclusion grace periods. type forcedInclusionGracePeriodConfig struct { // basePeriod is the base number of additional epochs allowed for including forced inclusion transactions @@ -118,6 +151,9 @@ type Syncer struct { // P2P wait coordination p2pWaitState atomic.Value // stores p2pWaitState + + // Sync mode tracking + currentSyncMode atomic.Int32 // stores SyncMode as int32 } // pendingForcedInclusionTx represents a forced inclusion transaction that hasn't been included yet @@ -218,6 +254,9 @@ func (s *Syncer) Stop() error { if s.cancel != nil { s.cancel() } + if s.fiRetriever != nil { + s.fiRetriever.Stop() + } s.cancelP2PWait(0) s.wg.Wait() s.logger.Info().Msg("syncer stopped") @@ -318,26 +357,210 @@ func (s *Syncer) daWorkerLoop() { defer s.logger.Info().Msg("DA worker stopped") for { - err := s.fetchDAUntilCaughtUp() + select { + case <-s.ctx.Done(): + return + default: + } - var backoff time.Duration - if err == nil { - // No error, means we are caught up. - backoff = s.config.DA.BlockTime.Duration - } else { - // Error, back off for a shorter duration. - backoff = s.config.DA.BlockTime.Duration - if backoff <= 0 { - backoff = 2 * time.Second - } + mode := s.determineSyncMode() + previousMode := SyncMode(s.currentSyncMode.Load()) + + // Track mode switches + if mode != previousMode { + s.currentSyncMode.Store(int32(mode)) + s.metrics.ModeSwitches.Add(1) + s.logger.Info(). + Str("from", previousMode.String()). + Str("to", mode.String()). + Msg("sync mode changed") + } + + switch mode { + case SyncModeCatchup: + s.runCatchupMode() + case SyncModeFollow: + s.runFollowMode() } + } +} + +// determineSyncMode checks the current DA sync status and returns the appropriate mode. +func (s *Syncer) determineSyncMode() SyncMode { + // If DA client is nil (e.g., in tests), default to catchup mode + if s.daClient == nil { + return SyncModeCatchup + } + + localHead, err := s.daClient.LocalHead(s.ctx) + if err != nil { + // Default to catchup on error - safer to poll than assume we're caught up + s.logger.Debug().Err(err).Msg("failed to get local DA head, defaulting to catchup mode") + return SyncModeCatchup + } + + currentDAHeight := s.daRetrieverHeight.Load() + + // Consider "caught up" if within catchupThreshold blocks of local head + if currentDAHeight+catchupThreshold >= localHead { + return SyncModeFollow + } + return SyncModeCatchup +} + +// runCatchupMode runs the catchup sync mode - aggressive polling until caught up. +func (s *Syncer) runCatchupMode() { + s.logger.Debug().Msg("running catchup mode") + s.metrics.SyncMode.Set(float64(SyncModeCatchup)) + + err := s.fetchDAUntilCaughtUp() + if errors.Is(err, context.Canceled) { + return + } + + // Back off before next iteration: + // - On error: wait before retrying to avoid hammering a failing DA layer + // - On success (caught up): wait for new DA blocks to appear + backoff := s.config.DA.BlockTime.Duration + if backoff <= 0 { + backoff = 2 * time.Second + } + + if err != nil { + s.logger.Debug().Err(err).Msg("catchup failed, backing off before retry") + } else { + s.logger.Debug().Msg("caught up with DA, backing off before next check") + } + + s.sleepOrDone(backoff) +} + +// runFollowMode runs the follow sync mode - subscription-based real-time updates. +func (s *Syncer) runFollowMode() { + s.logger.Debug().Msg("running follow mode") + s.metrics.SyncMode.Set(float64(SyncModeFollow)) + + err := s.subscribeAndFollow() + if err != nil && !errors.Is(err, context.Canceled) { + s.metrics.SubscribeErrors.Add(1) + s.logger.Warn().Err(err).Msg("subscribe failed, will retry via mode check") + // No explicit catchup call needed - daWorkerLoop will call determineSyncMode() + // which defaults to catchup on error or when behind + } +} +// subscribeAndFollow uses the DA subscription API to receive real-time blob notifications. +// It subscribes to header, data, and forced inclusion namespaces and processes incoming blobs. +// Returns when subscription fails, context is cancelled, or node falls behind. +func (s *Syncer) subscribeAndFollow() error { + // Get namespaces + headerNS := s.daClient.GetHeaderNamespace() + dataNS := s.daClient.GetDataNamespace() + + // Create subscription context with cancellation + subCtx, cancel := context.WithCancel(s.ctx) + defer cancel() + + // Subscribe to header namespace + headerCh, err := s.daClient.Subscribe(subCtx, headerNS) + if err != nil { + return fmt.Errorf("failed to subscribe to header namespace: %w", err) + } + + // Subscribe to data namespace (only if different from header namespace) + var dataCh <-chan *blobrpc.SubscriptionResponse + if !bytes.Equal(headerNS, dataNS) { + dataCh, err = s.daClient.Subscribe(subCtx, dataNS) + if err != nil { + return fmt.Errorf("failed to subscribe to data namespace: %w", err) + } + } + + s.logger.Info().Msg("subscribed to DA namespaces for follow mode") + + // Calculate watchdog timeout + watchdogTimeout := s.config.DA.BlockTime.Duration * followWatchdogMultiplier + if watchdogTimeout <= 0 { + watchdogTimeout = 30 * time.Second + } + + // Process subscription events + // Note: Select on a nil channel blocks forever, so nil channels are effectively disabled + for { select { case <-s.ctx.Done(): - return - case <-time.After(backoff): + return s.ctx.Err() + + case resp, ok := <-headerCh: + if !ok { + return errors.New("header subscription closed") + } + if err := s.processSubscriptionResponse(resp); err != nil { + s.logger.Error().Err(err).Uint64("height", resp.Height).Msg("failed to process header subscription") + } + + case resp, ok := <-dataCh: + // Note: if dataCh is nil (same namespace as header), this case never fires + if !ok { + return errors.New("data subscription closed") + } + if err := s.processSubscriptionResponse(resp); err != nil { + s.logger.Error().Err(err).Uint64("height", resp.Height).Msg("failed to process data subscription") + } + + case <-time.After(watchdogTimeout): + // Watchdog: if no events for watchdogTimeout, recheck mode + // Might have fallen behind due to network issues + s.logger.Debug().Dur("timeout", watchdogTimeout).Msg("subscription watchdog triggered, checking sync mode") + if s.determineSyncMode() == SyncModeCatchup { + return errors.New("fell behind, switching to catchup") + } + } + } +} + +// processSubscriptionResponse processes a subscription response and sends events to the processing channel. +func (s *Syncer) processSubscriptionResponse(resp *blobrpc.SubscriptionResponse) error { + if resp == nil { + return nil + } + + s.logger.Debug(). + Uint64("da_height", resp.Height). + Int("blobs", len(resp.Blobs)). + Msg("processing subscription response") + + // Convert blobs to raw byte slices for processing + blobs := common.BlobsFromSubscription(resp) + if len(blobs) == 0 { + return nil + } + + // Process blobs using the DA retriever's ProcessBlobs method + events := s.daRetriever.ProcessBlobs(s.ctx, blobs, resp.Height) + + // Send events to the processing channel + for _, event := range events { + select { + case s.heightInCh <- event: + s.logger.Debug(). + Uint64("height", event.Header.Height()). + Uint64("da_height", event.DaHeight). + Msg("sent subscription event to processing") + default: + s.cache.SetPendingEvent(event.Header.Height(), &event) + s.logger.Debug(). + Uint64("height", event.Header.Height()). + Msg("subscription event queued as pending") } } + + // Update retriever height + if resp.Height >= s.daRetrieverHeight.Load() { + s.daRetrieverHeight.Store(resp.Height + 1) + } + + return nil } func (s *Syncer) fetchDAUntilCaughtUp() error { @@ -590,7 +813,11 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error { // Verify forced inclusion transactions if configured if event.Source == common.SourceDA { - if err := s.verifyForcedInclusionTxs(currentState, data); err != nil { + verifyState := currentState + if event.DaHeight > verifyState.DAHeight { + verifyState.DAHeight = event.DaHeight + } + if err := s.verifyForcedInclusionTxs(verifyState, data); err != nil { s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed") if errors.Is(err, errMaliciousProposer) { s.cache.RemoveHeaderDAIncluded(headerHash) @@ -732,25 +959,15 @@ func hashTx(tx []byte) string { } // calculateBlockFullness returns a value between 0.0 and 1.0 indicating how full the block is. -// It estimates fullness based on total data size. +// It estimates fullness based on total data size relative to max blob size. // This is a heuristic - actual limits may vary by execution layer. func (s *Syncer) calculateBlockFullness(data *types.Data) float64 { - const maxDataSize = common.DefaultMaxBlobSize - - var fullness float64 - count := 0 - - // Check data size fullness - dataSize := uint64(0) + var dataSize uint64 for _, tx := range data.Txs { dataSize += uint64(len(tx)) } - sizeFullness := float64(dataSize) / float64(maxDataSize) - fullness += min(sizeFullness, 1.0) - count++ - - // Return average fullness - return fullness / float64(count) + fullness := float64(dataSize) / float64(common.DefaultMaxBlobSize) + return min(fullness, 1.0) } // updateDynamicGracePeriod updates the grace period multiplier based on block fullness. diff --git a/block/internal/syncing/syncer_forced_inclusion_test.go b/block/internal/syncing/syncer_forced_inclusion_test.go index 61c556e83..215707a19 100644 --- a/block/internal/syncing/syncer_forced_inclusion_test.go +++ b/block/internal/syncing/syncer_forced_inclusion_test.go @@ -851,18 +851,6 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) { return true }) - // Mock DA for second verification at same epoch (height 104 - epoch end) - for height := uint64(101); height <= 104; height++ { - client.On("Retrieve", mock.Anything, height, []byte("nsForcedInclusion")).Return(datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{Code: datypes.StatusNotFound, Timestamp: time.Now()}, - }).Once() - } - - client.On("Retrieve", mock.Anything, uint64(100), []byte("nsForcedInclusion")).Return(datypes.ResultRetrieve{ - BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: [][]byte{[]byte("fi1"), []byte("fi2")}, Timestamp: time.Now()}, - Data: [][]byte{dataBin1, dataBin2}, - }).Once() - // Second block includes BOTH the previously included dataBin1 AND the deferred dataBin2 // This simulates the block containing both forced inclusion txs data2 := makeData(gen.ChainID, 2, 2) diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index 21b012cf2..fa004d5ae 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -700,3 +700,203 @@ func TestSyncer_getHighestStoredDAHeight(t *testing.T) { highestDA = syncer.getHighestStoredDAHeight() assert.Equal(t, uint64(200), highestDA, "should return highest DA height from most recent included height") } + +func TestSyncMode_String(t *testing.T) { + tests := []struct { + mode SyncMode + expected string + }{ + {SyncModeCatchup, "catchup"}, + {SyncModeFollow, "follow"}, + {SyncMode(99), "unknown"}, + } + + for _, tt := range tests { + t.Run(tt.expected, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.mode.String()) + }) + } +} + +func TestSyncer_determineSyncMode(t *testing.T) { + tests := []struct { + name string + localHead uint64 + localHeadErr error + currentHeight uint64 + expectedMode SyncMode + }{ + { + name: "caught up - at head", + localHead: 100, + localHeadErr: nil, + currentHeight: 100, + expectedMode: SyncModeFollow, + }, + { + name: "caught up - within threshold", + localHead: 100, + localHeadErr: nil, + currentHeight: 99, // within catchupThreshold (2) + expectedMode: SyncModeFollow, + }, + { + name: "caught up - at threshold boundary", + localHead: 100, + localHeadErr: nil, + currentHeight: 98, // exactly at threshold + expectedMode: SyncModeFollow, + }, + { + name: "behind - just past threshold", + localHead: 100, + localHeadErr: nil, + currentHeight: 97, // 3 behind, past threshold of 2 + expectedMode: SyncModeCatchup, + }, + { + name: "behind - significantly behind", + localHead: 100, + localHeadErr: nil, + currentHeight: 50, + expectedMode: SyncModeCatchup, + }, + { + name: "error getting local head - defaults to catchup", + localHead: 0, + localHeadErr: errors.New("connection failed"), + currentHeight: 100, + expectedMode: SyncModeCatchup, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockDA := testmocks.NewMockClient(t) + if tt.localHeadErr != nil { + mockDA.EXPECT().LocalHead(mock.Anything).Return(uint64(0), tt.localHeadErr) + } else { + mockDA.EXPECT().LocalHead(mock.Anything).Return(tt.localHead, nil) + } + + syncer := &Syncer{ + daClient: mockDA, + daRetrieverHeight: &atomic.Uint64{}, + ctx: context.Background(), + logger: zerolog.Nop(), + } + syncer.daRetrieverHeight.Store(tt.currentHeight) + + mode := syncer.determineSyncMode() + assert.Equal(t, tt.expectedMode, mode) + }) + } +} + +func TestSyncer_runCatchupMode(t *testing.T) { + // Test that runCatchupMode correctly sets metrics and calls fetchDAUntilCaughtUp + mockDA := testmocks.NewMockClient(t) + // Use same namespace for header and data to simplify the test + namespace := []byte("namespace") + mockDA.EXPECT().GetHeaderNamespace().Return(namespace).Maybe() + mockDA.EXPECT().GetDataNamespace().Return(namespace).Maybe() + mockDA.EXPECT().Retrieve(mock.Anything, mock.Anything, namespace). + Return(datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusHeightFromFuture, + Message: datypes.ErrHeightFromFuture.Error(), + }, + }).Once() + + cfg := config.DefaultConfig() + cfg.DA.BlockTime.Duration = 10 * time.Millisecond + + metrics := common.NopMetrics() + + syncer := &Syncer{ + daClient: mockDA, + daRetrieverHeight: &atomic.Uint64{}, + ctx: context.Background(), + logger: zerolog.Nop(), + config: cfg, + metrics: metrics, + cache: &mockCacheManager{}, + } + syncer.daRetrieverHeight.Store(1) + syncer.daRetriever = NewDARetriever(mockDA, &mockCacheManager{}, genesis.Genesis{}, zerolog.Nop()) + + // Run catchup mode - should return when caught up (ErrHeightFromFuture) + syncer.runCatchupMode() + + mockDA.AssertExpectations(t) +} + +func TestSyncer_runFollowMode_SubscribeFailureReturnsForModeCheck(t *testing.T) { + mockDA := testmocks.NewMockClient(t) + namespace := []byte("namespace") + mockDA.EXPECT().GetHeaderNamespace().Return(namespace).Once() + mockDA.EXPECT().GetDataNamespace().Return(namespace).Once() + mockDA.EXPECT().Subscribe(mock.Anything, namespace). + Return(nil, errors.New("subscribe failed")).Once() + + syncer := &Syncer{ + daClient: mockDA, + daRetrieverHeight: &atomic.Uint64{}, + ctx: context.Background(), + logger: zerolog.Nop(), + metrics: common.NopMetrics(), + } + syncer.daRetrieverHeight.Store(100) + + // runFollowMode should return after subscribe failure, + // allowing daWorkerLoop to call determineSyncMode() next iteration + syncer.runFollowMode() + + mockDA.AssertExpectations(t) +} + +func TestSyncer_modeSwitching(t *testing.T) { + // Test that mode switches are tracked correctly + syncer := &Syncer{ + currentSyncMode: atomic.Int32{}, + } + + // Initial mode should be catchup (0) + assert.Equal(t, SyncModeCatchup, SyncMode(syncer.currentSyncMode.Load())) + + // Simulate switching to follow mode + syncer.currentSyncMode.Store(int32(SyncModeFollow)) + assert.Equal(t, SyncModeFollow, SyncMode(syncer.currentSyncMode.Load())) + + // Switch back to catchup + syncer.currentSyncMode.Store(int32(SyncModeCatchup)) + assert.Equal(t, SyncModeCatchup, SyncMode(syncer.currentSyncMode.Load())) +} + +// mockCacheManager is a minimal implementation for testing +type mockCacheManager struct{} + +func (m *mockCacheManager) DaHeight() uint64 { return 0 } +func (m *mockCacheManager) SetHeaderSeen(hash string, height uint64) { +} +func (m *mockCacheManager) IsHeaderSeen(hash string) bool { return false } +func (m *mockCacheManager) SetDataSeen(hash string, height uint64) { +} +func (m *mockCacheManager) IsDataSeen(hash string) bool { return false } +func (m *mockCacheManager) SetHeaderDAIncluded(hash string, daHeight, height uint64) { +} +func (m *mockCacheManager) GetHeaderDAIncluded(hash string) (uint64, bool) { return 0, false } +func (m *mockCacheManager) RemoveHeaderDAIncluded(hash string) {} +func (m *mockCacheManager) SetDataDAIncluded(hash string, daHeight, height uint64) { +} +func (m *mockCacheManager) GetDataDAIncluded(hash string) (uint64, bool) { return 0, false } +func (m *mockCacheManager) IsTxSeen(hash string) bool { return false } +func (m *mockCacheManager) SetTxSeen(hash string) {} +func (m *mockCacheManager) CleanupOldTxs(olderThan time.Duration) int { return 0 } +func (m *mockCacheManager) SetPendingEvent(height uint64, event *common.DAHeightEvent) { +} +func (m *mockCacheManager) GetNextPendingEvent(height uint64) *common.DAHeightEvent { return nil } +func (m *mockCacheManager) SaveToDisk() error { return nil } +func (m *mockCacheManager) LoadFromDisk() error { return nil } +func (m *mockCacheManager) ClearFromDisk() error { return nil } +func (m *mockCacheManager) DeleteHeight(blockHeight uint64) {} diff --git a/execution/evm/go.sum b/execution/evm/go.sum index a43585fa5..1d9f55dcc 100644 --- a/execution/evm/go.sum +++ b/execution/evm/go.sum @@ -16,8 +16,12 @@ github.com/bits-and-blooms/bitset v1.20.0 h1:2F+rfL86jE2d/bmw7OhqUg2Sj/1rURkBn3M github.com/bits-and-blooms/bitset v1.20.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/celestiaorg/go-header v0.7.4 h1:kQx3bVvKV+H2etxRi4IUuby5VQydBONx3giHFXDcZ/o= github.com/celestiaorg/go-header v0.7.4/go.mod h1:eX9iTSPthVEAlEDLux40ZT/olXPGhpxHd+mEzJeDhd0= +github.com/celestiaorg/go-square/merkle v0.0.0-20240627094109-7d01436067a3 h1:wP84mtwOCVNOTfS3zErICjxKLnh74Z1uf+tdrlSFjVM= +github.com/celestiaorg/go-square/merkle v0.0.0-20240627094109-7d01436067a3/go.mod h1:86qIYnEhmn/hfW+xvw98NOI3zGaDEB3x8JGjYo2FqLs= github.com/celestiaorg/go-square/v3 v3.0.2 h1:eSQOgNII8inK9IhiBZ+6GADQeWbRq4HYY72BOgcduA4= github.com/celestiaorg/go-square/v3 v3.0.2/go.mod h1:oFReMLsSDMRs82ICFEeFQFCqNvwdsbIM1BzCcb0f7dM= +github.com/celestiaorg/nmt v0.24.2 h1:LlpJSPOd6/Lw1Ig6HUhZuqiINHLka/ZSRTBzlNJpchg= +github.com/celestiaorg/nmt v0.24.2/go.mod h1:vgLBpWBi8F5KLxTdXSwb7AU4NhiIQ1AQRGa+PzdcLEA= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -80,6 +84,8 @@ github.com/ferranbt/fastssz v0.1.4 h1:OCDB+dYDEQDvAgtAGnTSidK1Pe2tW3nFV40XyMkTeD github.com/ferranbt/fastssz v0.1.4/go.mod h1:Ea3+oeoRGGLGm5shYAeDgu6PGUlcvQhE2fILyD9+tGg= github.com/filecoin-project/go-clock v0.1.0 h1:SFbYIM75M8NnFm1yMHhN9Ahy3W5bEZV9gd6MPfXbKVU= github.com/filecoin-project/go-clock v0.1.0/go.mod h1:4uB/O4PvOjlx1VCMdZ9MyDZXRm//gkj1ELEbxfI1AZs= +github.com/filecoin-project/go-jsonrpc v0.10.0 h1:gZc1thGVD5Khg5Gp1UJibRWZrnNBEP1iFrGOTn0w5TE= +github.com/filecoin-project/go-jsonrpc v0.10.0/go.mod h1:OG7kVBVh/AbDFHIwx7Kw0l9ARmKOS6gGOr0LbdBpbLc= github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg= github.com/flynn/noise v1.1.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= @@ -518,6 +524,8 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= +golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls= diff --git a/execution/evm/test/go.sum b/execution/evm/test/go.sum index 9663ab8a4..d44209e91 100644 --- a/execution/evm/test/go.sum +++ b/execution/evm/test/go.sum @@ -68,8 +68,12 @@ github.com/btcsuite/btcd/btcutil v1.1.6 h1:zFL2+c3Lb9gEgqKNzowKUPQNb8jV7v5Oaodi/ github.com/btcsuite/btcd/btcutil v1.1.6/go.mod h1:9dFymx8HpuLqBnsPELrImQeTQfKBQqzqGbbV3jK55aE= github.com/celestiaorg/go-header v0.7.4 h1:kQx3bVvKV+H2etxRi4IUuby5VQydBONx3giHFXDcZ/o= github.com/celestiaorg/go-header v0.7.4/go.mod h1:eX9iTSPthVEAlEDLux40ZT/olXPGhpxHd+mEzJeDhd0= +github.com/celestiaorg/go-square/merkle v0.0.0-20240627094109-7d01436067a3 h1:wP84mtwOCVNOTfS3zErICjxKLnh74Z1uf+tdrlSFjVM= +github.com/celestiaorg/go-square/merkle v0.0.0-20240627094109-7d01436067a3/go.mod h1:86qIYnEhmn/hfW+xvw98NOI3zGaDEB3x8JGjYo2FqLs= github.com/celestiaorg/go-square/v3 v3.0.2 h1:eSQOgNII8inK9IhiBZ+6GADQeWbRq4HYY72BOgcduA4= github.com/celestiaorg/go-square/v3 v3.0.2/go.mod h1:oFReMLsSDMRs82ICFEeFQFCqNvwdsbIM1BzCcb0f7dM= +github.com/celestiaorg/nmt v0.24.2 h1:LlpJSPOd6/Lw1Ig6HUhZuqiINHLka/ZSRTBzlNJpchg= +github.com/celestiaorg/nmt v0.24.2/go.mod h1:vgLBpWBi8F5KLxTdXSwb7AU4NhiIQ1AQRGa+PzdcLEA= github.com/celestiaorg/tastora v0.8.0 h1:+FWAIsP2onwwqPTGzBLIBtx8B1h9sImdx4msv2N4DsI= github.com/celestiaorg/tastora v0.8.0/go.mod h1:9b5GsL/+pKEw3HZG/nd3qhnGadUnNNoTBygy9HeGIyw= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= @@ -204,6 +208,8 @@ github.com/ferranbt/fastssz v0.1.4 h1:OCDB+dYDEQDvAgtAGnTSidK1Pe2tW3nFV40XyMkTeD github.com/ferranbt/fastssz v0.1.4/go.mod h1:Ea3+oeoRGGLGm5shYAeDgu6PGUlcvQhE2fILyD9+tGg= github.com/filecoin-project/go-clock v0.1.0 h1:SFbYIM75M8NnFm1yMHhN9Ahy3W5bEZV9gd6MPfXbKVU= github.com/filecoin-project/go-clock v0.1.0/go.mod h1:4uB/O4PvOjlx1VCMdZ9MyDZXRm//gkj1ELEbxfI1AZs= +github.com/filecoin-project/go-jsonrpc v0.10.0 h1:gZc1thGVD5Khg5Gp1UJibRWZrnNBEP1iFrGOTn0w5TE= +github.com/filecoin-project/go-jsonrpc v0.10.0/go.mod h1:OG7kVBVh/AbDFHIwx7Kw0l9ARmKOS6gGOr0LbdBpbLc= github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg= github.com/flynn/noise v1.1.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= @@ -869,6 +875,8 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= +golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= diff --git a/test/mocks/da.go b/test/mocks/da.go index 0b5c71a49..fd9bb8db6 100644 --- a/test/mocks/da.go +++ b/test/mocks/da.go @@ -7,6 +7,7 @@ package mocks import ( "context" + "github.com/evstack/ev-node/pkg/da/jsonrpc" "github.com/evstack/ev-node/pkg/da/types" mock "github.com/stretchr/testify/mock" ) @@ -294,6 +295,66 @@ func (_c *MockClient_HasForcedInclusionNamespace_Call) RunAndReturn(run func() b return _c } +// LocalHead provides a mock function for the type MockClient +func (_mock *MockClient) LocalHead(ctx context.Context) (uint64, error) { + ret := _mock.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for LocalHead") + } + + var r0 uint64 + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { + return returnFunc(ctx) + } + if returnFunc, ok := ret.Get(0).(func(context.Context) uint64); ok { + r0 = returnFunc(ctx) + } else { + r0 = ret.Get(0).(uint64) + } + if returnFunc, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = returnFunc(ctx) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockClient_LocalHead_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LocalHead' +type MockClient_LocalHead_Call struct { + *mock.Call +} + +// LocalHead is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockClient_Expecter) LocalHead(ctx interface{}) *MockClient_LocalHead_Call { + return &MockClient_LocalHead_Call{Call: _e.mock.On("LocalHead", ctx)} +} + +func (_c *MockClient_LocalHead_Call) Run(run func(ctx context.Context)) *MockClient_LocalHead_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockClient_LocalHead_Call) Return(v uint64, err error) *MockClient_LocalHead_Call { + _c.Call.Return(v, err) + return _c +} + +func (_c *MockClient_LocalHead_Call) RunAndReturn(run func(ctx context.Context) (uint64, error)) *MockClient_LocalHead_Call { + _c.Call.Return(run) + return _c +} + // Retrieve provides a mock function for the type MockClient func (_mock *MockClient) Retrieve(ctx context.Context, height uint64, namespace []byte) da.ResultRetrieve { ret := _mock.Called(ctx, height, namespace) @@ -432,6 +493,74 @@ func (_c *MockClient_Submit_Call) RunAndReturn(run func(ctx context.Context, dat return _c } +// Subscribe provides a mock function for the type MockClient +func (_mock *MockClient) Subscribe(ctx context.Context, namespace []byte) (<-chan *jsonrpc.SubscriptionResponse, error) { + ret := _mock.Called(ctx, namespace) + + if len(ret) == 0 { + panic("no return value specified for Subscribe") + } + + var r0 <-chan *jsonrpc.SubscriptionResponse + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte) (<-chan *jsonrpc.SubscriptionResponse, error)); ok { + return returnFunc(ctx, namespace) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte) <-chan *jsonrpc.SubscriptionResponse); ok { + r0 = returnFunc(ctx, namespace) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan *jsonrpc.SubscriptionResponse) + } + } + if returnFunc, ok := ret.Get(1).(func(context.Context, []byte) error); ok { + r1 = returnFunc(ctx, namespace) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockClient_Subscribe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Subscribe' +type MockClient_Subscribe_Call struct { + *mock.Call +} + +// Subscribe is a helper method to define mock.On call +// - ctx context.Context +// - namespace []byte +func (_e *MockClient_Expecter) Subscribe(ctx interface{}, namespace interface{}) *MockClient_Subscribe_Call { + return &MockClient_Subscribe_Call{Call: _e.mock.On("Subscribe", ctx, namespace)} +} + +func (_c *MockClient_Subscribe_Call) Run(run func(ctx context.Context, namespace []byte)) *MockClient_Subscribe_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 []byte + if args[1] != nil { + arg1 = args[1].([]byte) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockClient_Subscribe_Call) Return(subscriptionResponseCh <-chan *jsonrpc.SubscriptionResponse, err error) *MockClient_Subscribe_Call { + _c.Call.Return(subscriptionResponseCh, err) + return _c +} + +func (_c *MockClient_Subscribe_Call) RunAndReturn(run func(ctx context.Context, namespace []byte) (<-chan *jsonrpc.SubscriptionResponse, error)) *MockClient_Subscribe_Call { + _c.Call.Return(run) + return _c +} + // NewMockVerifier creates a new instance of MockVerifier. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockVerifier(t interface { diff --git a/test/testda/dummy.go b/test/testda/dummy.go index 633bf1cf9..876475f9c 100644 --- a/test/testda/dummy.go +++ b/test/testda/dummy.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "time" + blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" datypes "github.com/evstack/ev-node/pkg/da/types" ) @@ -290,3 +291,18 @@ func (d *DummyDA) GetHeaderByHeight(_ context.Context, height uint64) (*Header, } return header, nil } + +// Subscribe returns a channel that receives subscription responses. +// This is a stub implementation that returns an empty channel. +// In tests, callers should not rely on actual subscription behavior. +func (d *DummyDA) Subscribe(_ context.Context, _ []byte) (<-chan *blobrpc.SubscriptionResponse, error) { + // Return a channel that will never receive anything - tests should use mocks for subscription behavior + ch := make(chan *blobrpc.SubscriptionResponse) + return ch, nil +} + +// LocalHead returns the current DA height. +// This mirrors the HeaderAPI.LocalHead method and is used to determine sync mode. +func (d *DummyDA) LocalHead(_ context.Context) (uint64, error) { + return d.height.Load(), nil +} diff --git a/tools/local-da/rpc.go b/tools/local-da/rpc.go index 60dd51ac4..32c907bff 100644 --- a/tools/local-da/rpc.go +++ b/tools/local-da/rpc.go @@ -136,10 +136,53 @@ func (s *blobServer) Subscribe(_ context.Context, _ libshare.Namespace) (<-chan return ch, nil } -// startBlobServer starts an HTTP JSON-RPC server on addr serving the blob namespace. +// headerServer exposes a minimal header RPC surface backed by LocalDA. +type headerServer struct { + da *LocalDA + logger zerolog.Logger +} + +// LocalHead returns the header for the locally synced DA head. +func (s *headerServer) LocalHead(_ context.Context) (*jsonrpc.Header, error) { + s.da.mu.Lock() + defer s.da.mu.Unlock() + + return &jsonrpc.Header{ + Height: s.da.height, + BlockTime: s.da.timestamps[s.da.height], + }, nil +} + +// NetworkHead returns the header for the network DA head (same as local for LocalDA). +func (s *headerServer) NetworkHead(_ context.Context) (*jsonrpc.Header, error) { + return s.LocalHead(context.Background()) +} + +// GetByHeight returns the header for a specific height. +func (s *headerServer) GetByHeight(_ context.Context, height uint64) (*jsonrpc.Header, error) { + s.da.mu.Lock() + defer s.da.mu.Unlock() + + if height > s.da.height { + return nil, datypes.ErrHeightFromFuture + } + + ts, ok := s.da.timestamps[height] + if !ok { + ts = time.Time{} + } + + return &jsonrpc.Header{ + Height: height, + BlockTime: ts, + }, nil +} + +// startBlobServer starts an HTTP JSON-RPC server on addr serving the blob and header namespaces. func startBlobServer(logger zerolog.Logger, addr string, da *LocalDA) (*http.Server, error) { rpc := fjrpc.NewServer() rpc.Register("blob", &blobServer{da: da, logger: logger}) + rpc.Register("header", &headerServer{da: da, logger: logger}) srv := &http.Server{ Addr: addr,