From b98e04887066c402687274974b71d431067880d8 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Wed, 10 Jun 2026 22:24:27 -0300 Subject: [PATCH 1/4] refactor(stdlib): thin chain-checkpointed event and collapsed L2BlockStream work() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the per-checkpoint fat `chain-checkpointed` event (carrying a full PublishedCheckpoint) with a thin once-per-pass tip event `{ block, checkpoint }`, symmetric with chain-proven/chain-finalized. Collapse L2BlockStream.work() to: reorg walk-back + chain-pruned; one getBlocks download loop; end-of-pass tier reconciliation (checkpointed -> proven -> finalized) from a single sourceTips snapshot. This removes the prefetch buffer, both startingBlock checkpoint fast-forward arms, nextCheckpointToEmit, Loop 1 and Loop 2, and the CHECKPOINT_PREFETCH_LIMIT option. The stream no longer fetches checkpoint payloads, so its source type narrows to Pick; PXE's node adapter and the telemetry wrapper drop getCheckpoints accordingly. Fixes the A-1061 stuck-checkpointed-cursor livelock structurally: the checkpointed reconciliation runs unconditionally against the source snapshot, so a startingBlock past the source checkpointed tip still advances the local checkpointed cursor on the first pass. The tier reconciliation re-reads local tips after a prune so it compares against the cursors the prune handler just clamped (kills the Loop 1 stale-snapshot duplicate-emit bug), and the hash-aware diff compares (number, hash) while skipping the hash when the local hash is undefined (world-state) — re-emitting same-number/different-hash tips without looping on world-state-shaped providers. PXE advances its checkpointed anchor by fetching the tip header by hash, which is safe against same-height reorgs and skips cleanly when the block was reorged out. The tips store reads the checkpoint id straight off the thin event. Co-Authored-By: Claude Fable 5 --- .../block_synchronizer/block_stream_source.ts | 46 +- .../block_synchronizer.test.ts | 52 +- .../block_synchronizer/block_synchronizer.ts | 14 +- .../schema_tests.ts | 5 +- .../src/block/l2_block_stream/interfaces.ts | 11 +- .../l2_block_stream/l2_block_stream.test.ts | 1691 +++-------------- .../block/l2_block_stream/l2_block_stream.ts | 211 +- .../l2_block_stream/l2_tips_store_base.ts | 6 +- .../block/test/l2_tips_store_test_suite.ts | 12 +- .../src/wrappers/l2_block_stream.ts | 2 +- 10 files changed, 391 insertions(+), 1659 deletions(-) diff --git a/yarn-project/pxe/src/block_synchronizer/block_stream_source.ts b/yarn-project/pxe/src/block_synchronizer/block_stream_source.ts index 2009d350b2a1..cbd6ed780cb2 100644 --- a/yarn-project/pxe/src/block_synchronizer/block_stream_source.ts +++ b/yarn-project/pxe/src/block_synchronizer/block_stream_source.ts @@ -1,24 +1,15 @@ -import { Fr } from '@aztec/foundation/curves/bn254'; -import { - type BlockData, - type BlockQuery, - type BlocksQuery, - type CheckpointsQuery, - L2Block, - type L2BlockSource, -} from '@aztec/stdlib/block'; -import { Checkpoint, L1PublishedData, PublishedCheckpoint } from '@aztec/stdlib/checkpoint'; +import { type BlockData, type BlockQuery, type BlocksQuery, L2Block, type L2BlockSource } from '@aztec/stdlib/block'; import type { AztecNode } from '@aztec/stdlib/interfaces/client'; /** * Lifts an {@link AztecNode} RPC client into the shape {@link L2BlockStream} expects. - * `getBlocks` requests transaction bodies so that real `L2Block` instances can be constructed; - * `getCheckpoints` requests blocks + L1 info + attestations so that `PublishedCheckpoint` - * instances are fully populated. + * `getBlocks` requests transaction bodies so that real `L2Block` instances can be constructed. The stream no + * longer fetches checkpoint payloads (the `chain-checkpointed` event is a thin tip event), so `getCheckpoints` + * is not part of the lifted shape. */ export function blockStreamSourceFromAztecNode( node: AztecNode, -): Pick { +): Pick { return { getL2Tips: async () => { const tips = await node.getChainTips(); @@ -50,32 +41,5 @@ export function blockStreamSourceFromAztecNode( const responses = await node.getBlocks(query.from, query.limit, { includeTransactions: true }); return responses.map(r => new L2Block(r.archive, r.header, r.body!, r.checkpointNumber, r.indexWithinCheckpoint)); }, - - async getCheckpoints(query: CheckpointsQuery): Promise { - if (!('from' in query)) { - throw new Error('getCheckpoints with epoch query not supported via AztecNode RPC'); - } - const { from, limit } = query; - const responses = await node.getCheckpoints(from, limit, { - includeBlocks: true, - includeTransactions: true, - includeL1PublishInfo: true, - includeAttestations: true, - }); - return responses.map(r => { - const checkpoint = new Checkpoint( - r.archive, - r.header, - r.blocks!.map(b => new L2Block(b.archive, b.header, b.body!, b.checkpointNumber, b.indexWithinCheckpoint)), - r.number, - r.feeAssetPriceModifier, - ); - const l1 = - r.l1?.published === true - ? new L1PublishedData(r.l1.blockNumber, r.l1.timestamp, r.l1.blockHash) - : new L1PublishedData(0n, 0n, Fr.ZERO.toString()); - return new PublishedCheckpoint(checkpoint, l1, r.attestations ?? []); - }); - }, }; } diff --git a/yarn-project/pxe/src/block_synchronizer/block_synchronizer.test.ts b/yarn-project/pxe/src/block_synchronizer/block_synchronizer.test.ts index bbd6d2ae9729..c15898ba2109 100644 --- a/yarn-project/pxe/src/block_synchronizer/block_synchronizer.test.ts +++ b/yarn-project/pxe/src/block_synchronizer/block_synchronizer.test.ts @@ -17,7 +17,6 @@ import { makeL2BlockId, makeL2CheckpointId, } from '@aztec/stdlib/block'; -import { Checkpoint, L1PublishedData, PublishedCheckpoint } from '@aztec/stdlib/checkpoint'; import type { AztecNode, BlockResponse } from '@aztec/stdlib/interfaces/client'; import { NoteDao, NoteStatus } from '@aztec/stdlib/note'; import { TxHash } from '@aztec/stdlib/tx'; @@ -376,24 +375,53 @@ describe('BlockSynchronizer', () => { const initialBlock = await L2Block.random(BlockNumber(0)); await anchorBlockStore.setHeader(initialBlock.header); - // Create a checkpoint with a block + // The checkpointed tip block, fetched by hash from the node when the thin event arrives. const checkpointBlock = await L2Block.random(BlockNumber(1)); - const checkpoint = await Checkpoint.random(CheckpointNumber(1), { numBlocks: 1 }); - // Replace the random block with our known block - checkpoint.blocks[0] = checkpointBlock; - - const publishedCheckpoint = new PublishedCheckpoint(checkpoint, L1PublishedData.random(), []); + const checkpointBlockHash = await checkpointBlock.hash(); + aztecNode.getBlockData.mockImplementation(query => + Promise.resolve( + query instanceof BlockHash && query.equals(checkpointBlockHash) + ? ({ + header: checkpointBlock.header, + archive: checkpointBlock.archive, + blockHash: checkpointBlockHash, + checkpointNumber: checkpointBlock.checkpointNumber, + indexWithinCheckpoint: checkpointBlock.indexWithinCheckpoint, + } as BlockData) + : undefined, + ), + ); await synchronizer.handleBlockStreamEvent({ type: 'chain-checkpointed', - checkpoint: publishedCheckpoint, - block: { number: BlockNumber(1), hash: '0x456' }, + block: { number: BlockNumber(1), hash: checkpointBlockHash.toString() }, + checkpoint: makeL2CheckpointId(CheckpointNumber(1), Fr.random().toString()), }); const obtainedHeader = await anchorBlockStore.getBlockHeader(); expect(obtainedHeader.equals(checkpointBlock.header)).toBe(true); }); + it('skips the anchor update on chain-checkpointed when the block was reorged out (missing by hash)', async () => { + synchronizer = createSynchronizer({ syncChainTip: 'checkpointed' }); + + const initialBlock = await L2Block.random(BlockNumber(0)); + await anchorBlockStore.setHeader(initialBlock.header); + + // The node no longer serves the checkpointed block at that hash (transient reorg). + aztecNode.getBlockData.mockResolvedValue(undefined); + + await synchronizer.handleBlockStreamEvent({ + type: 'chain-checkpointed', + block: { number: BlockNumber(1), hash: Fr.random().toString() }, + checkpoint: makeL2CheckpointId(CheckpointNumber(1), Fr.random().toString()), + }); + + // Anchor is left untouched; a later event corrects it. + const obtainedHeader = await anchorBlockStore.getBlockHeader(); + expect(obtainedHeader.equals(initialBlock.header)).toBe(true); + }); + it('does not update anchor on chain-checkpointed when syncChainTip is proposed', async () => { synchronizer = createSynchronizer({ syncChainTip: 'proposed' }); @@ -401,14 +429,10 @@ describe('BlockSynchronizer', () => { const initialBlock = await L2Block.random(BlockNumber(1)); await synchronizer.handleBlockStreamEvent({ type: 'blocks-added', blocks: [initialBlock] }); - // Create a different checkpoint - const checkpoint = await Checkpoint.random(CheckpointNumber(1), { numBlocks: 1 }); - const publishedCheckpoint = new PublishedCheckpoint(checkpoint, L1PublishedData.random(), []); - await synchronizer.handleBlockStreamEvent({ type: 'chain-checkpointed', - checkpoint: publishedCheckpoint, block: { number: BlockNumber(1), hash: '0x456' }, + checkpoint: makeL2CheckpointId(CheckpointNumber(1), Fr.random().toString()), }); // Anchor should still be the initial block, not the checkpoint block diff --git a/yarn-project/pxe/src/block_synchronizer/block_synchronizer.ts b/yarn-project/pxe/src/block_synchronizer/block_synchronizer.ts index f0e0735f9a0d..3bfdc6141def 100644 --- a/yarn-project/pxe/src/block_synchronizer/block_synchronizer.ts +++ b/yarn-project/pxe/src/block_synchronizer/block_synchronizer.ts @@ -74,9 +74,17 @@ export class BlockSynchronizer implements L2BlockStreamEventHandler { } case 'chain-checkpointed': { if (this.config.syncChainTip === 'checkpointed') { - // Get the last block header from the checkpoint - const lastBlock = event.checkpoint.checkpoint.blocks.at(-1)!; - await this.updateAnchorBlockHeader(lastBlock.header); + // Fetch the checkpointed tip header by hash. By-hash is safer than by-number against a + // same-height reorg; a missing result means the block was reorged out between the event + // and this fetch, so we skip the anchor update and let a later event correct it. + const block = await this.node.getBlockData(BlockHash.fromString(event.block.hash)); + if (block) { + await this.updateAnchorBlockHeader(block.header); + } else { + this.log.warn( + `Block header not found for checkpointed block ${event.block.number}, skipping anchor update`, + ); + } } break; } diff --git a/yarn-project/pxe/src/storage/backwards_compatibility_tests/schema_tests.ts b/yarn-project/pxe/src/storage/backwards_compatibility_tests/schema_tests.ts index e6e6fc8ac71d..9b9f037de3e8 100644 --- a/yarn-project/pxe/src/storage/backwards_compatibility_tests/schema_tests.ts +++ b/yarn-project/pxe/src/storage/backwards_compatibility_tests/schema_tests.ts @@ -261,7 +261,10 @@ export const SCHEMA_TESTS: readonly SchemaTest[] = [ await l2TipsStore.handleBlockStreamEvent({ type: 'chain-checkpointed', block: { number: BlockNumber(71), hash: new Fr(73n).toString() }, - checkpoint: publishedCheckpoint, + checkpoint: { + number: publishedCheckpoint.checkpoint.number, + hash: publishedCheckpoint.checkpoint.hash().toString(), + }, }); // `'chain-proven'` writes the 'proven' tag. `'finalized'` is omitted because its handler runs delete-before // logic that would depend on the order of preceding events. diff --git a/yarn-project/stdlib/src/block/l2_block_stream/interfaces.ts b/yarn-project/stdlib/src/block/l2_block_stream/interfaces.ts index e9e13deca97a..6ed09b019739 100644 --- a/yarn-project/stdlib/src/block/l2_block_stream/interfaces.ts +++ b/yarn-project/stdlib/src/block/l2_block_stream/interfaces.ts @@ -1,6 +1,5 @@ import type { BlockNumber } from '@aztec/foundation/branded-types'; -import type { PublishedCheckpoint } from '../../checkpoint/published_checkpoint.js'; import type { L2Block } from '../l2_block.js'; import type { CheckpointId, L2BlockId, L2TipId, LocalL2Tips } from '../l2_block_source.js'; @@ -21,7 +20,7 @@ export type LocalL2BlockId = { number: BlockNumber; hash?: string }; */ export type LocalChainTips = { proposed: LocalL2BlockId; - checkpointed?: { checkpoint: CheckpointId }; + checkpointed?: { block: LocalL2BlockId; checkpoint: CheckpointId }; proven: { block: LocalL2BlockId }; finalized: { block: LocalL2BlockId }; }; @@ -46,10 +45,14 @@ export type L2BlockStreamEvent = type: 'blocks-added'; blocks: L2Block[]; } - | /** Emits checkpoints published to L1. */ { + | /** + * Reports a new checkpointed tip. Emitted at most once per sync pass when the source's checkpointed tip + * leads the local one. Carries only the block + checkpoint ids; consumers that need the full checkpoint + * payload fetch it on demand from the block source. + */ { type: 'chain-checkpointed'; - checkpoint: PublishedCheckpoint; block: L2BlockId; + checkpoint: CheckpointId; } | /** * Reports last correct block (new tip of the proposed chain). Note that this is not necessarily the anchor block diff --git a/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.test.ts b/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.test.ts index 1b17a151c4f5..a123e9160682 100644 --- a/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.test.ts +++ b/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.test.ts @@ -3,22 +3,14 @@ import { compactArray } from '@aztec/foundation/collection'; import { Fr } from '@aztec/foundation/curves/bn254'; import type { Logger } from '@aztec/foundation/log'; -import { jest } from '@jest/globals'; import { type MockProxy, mock } from 'jest-mock-extended'; import times from 'lodash.times'; -import type { PublishedCheckpoint } from '../../checkpoint/published_checkpoint.js'; import type { BlockHeader } from '../../tx/block_header.js'; import type { BlockData } from '../block_data.js'; -import { BlockHash, GENESIS_BLOCK_HEADER_HASH } from '../block_hash.js'; +import { BlockHash } from '../block_hash.js'; import type { L2Block } from '../l2_block.js'; -import { - type BlocksQuery, - GENESIS_CHECKPOINT_HEADER_HASH, - type L2BlockId, - type L2BlockSource, - type LocalL2Tips, -} from '../l2_block_source.js'; +import type { BlocksQuery, L2BlockId, L2BlockSource, LocalL2Tips } from '../l2_block_source.js'; import type { L2BlockStreamEvent, L2BlockStreamEventHandler, @@ -32,7 +24,6 @@ describe('L2BlockStream', () => { let blockSource: MockProxy; let latest: number = 0; - let checkpointed: number = 0; const makeHash = (number: number) => new Fr(number).toString(); @@ -43,15 +34,6 @@ describe('L2BlockStream', () => { indexWithinCheckpoint: 0, }) as L2Block; - /** Makes a block with hash method (for use in mocks that need hash) */ - const makeBlockWithHash = (number: number) => - ({ - number: BlockNumber(number), - checkpointNumber: CheckpointNumber(number), - indexWithinCheckpoint: 0, - hash: () => Promise.resolve(new BlockHash(new Fr(number))), - }) as L2Block; - const makeBlockData = (number: number, checkpointNum: number): BlockData => ({ header: makeHeader(number), @@ -64,29 +46,6 @@ describe('L2BlockStream', () => { const makeBlockId = (number: number): L2BlockId => ({ number: BlockNumber(number), hash: makeHash(number) }); - /** Helper to match a blocks-added event with blocks that may have extra properties like hash */ - const expectBlocksAdded = (blockNumbers: number[]) => - expect.objectContaining({ - type: 'blocks-added', - blocks: blockNumbers.map(n => - expect.objectContaining({ - number: BlockNumber(n), - }), - ), - }); - - /** Helper to match a chain-checkpointed event */ - const expectCheckpointed = (checkpointNumber?: number) => - checkpointNumber !== undefined - ? expect.objectContaining({ - type: 'chain-checkpointed', - checkpoint: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: checkpointNumber }), - }), - block: expect.objectContaining({ number: expect.any(Number) }), - }) - : expect.objectContaining({ type: 'chain-checkpointed' }); - const makeCheckpointId = (number: number) => ({ number: CheckpointNumber(number), hash: makeHash(number) }); const makeTipId = (number: number) => ({ @@ -94,6 +53,13 @@ describe('L2BlockStream', () => { checkpoint: { number: CheckpointNumber(number), hash: makeHash(number) }, }); + /** A thin chain-checkpointed event for the source's checkpointed tip at `number`. */ + const checkpointedEvent = (number: number): L2BlockStreamEvent => ({ + type: 'chain-checkpointed', + block: makeBlockId(number), + checkpoint: makeCheckpointId(number), + }); + /** Sets the remote tips. All tips default to 0 except latest. */ const setRemoteTips = ( latest_: number, @@ -107,7 +73,6 @@ describe('L2BlockStream', () => { proven = proven ?? 0; finalized = finalized ?? 0; latest = latest_; - checkpointed = checkpointed_; blockSource.getL2Tips.mockResolvedValue({ proposed: { number: BlockNumber(latest), hash: makeHash(latest) }, @@ -121,7 +86,7 @@ describe('L2BlockStream', () => { beforeEach(() => { blockSource = mock(); - // Returns blocks up until what was reported as the latest block (for uncheckpointed blocks) + // Returns blocks up until what was reported as the latest block. blockSource.getBlocks.mockImplementation((query: BlocksQuery) => 'from' in query ? Promise.resolve( @@ -137,31 +102,6 @@ describe('L2BlockStream', () => { } return Promise.resolve(query.number > latest ? undefined : makeBlockData(query.number, query.number)); }); - - // Returns published checkpoints - each checkpoint contains just the one block for simplicity - // Respects the limit parameter and returns up to `limit` checkpoints - blockSource.getCheckpoints.mockImplementation(query => { - if (!('from' in query)) { - return Promise.resolve([]); - } - const { from: checkpointNumber, limit } = query; - return Promise.resolve( - compactArray( - times(limit, i => { - const cpNum = checkpointNumber + i; - return cpNum > checkpointed - ? undefined - : ({ - checkpoint: { - number: cpNum, - hash: () => new Fr(cpNum), - blocks: [makeBlockWithHash(cpNum)], - }, - } as unknown as PublishedCheckpoint); - }), - ), - ); - }); }); describe('with mock local data provider', () => { @@ -172,10 +112,7 @@ describe('L2BlockStream', () => { beforeEach(() => { localData = new TestL2BlockStreamLocalDataProvider(); handler = new TestL2BlockStreamEventHandler(); - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - checkpointPrefetchLimit: 1, - }); + blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { batchSize: 10 }); }); it('pulls new blocks from start', async () => { @@ -267,50 +204,41 @@ describe('L2BlockStream', () => { ] satisfies L2BlockStreamEvent[]); }); - it('fetches checkpointed blocks and emits chain-checkpointed events', async () => { - // All blocks are checkpointed (checkpointed=5, proposed=5) + it('emits a single chain-checkpointed event carrying the source checkpointed tip', async () => { + // All blocks are checkpointed (checkpointed=5, proposed=5). Download all 5 blocks in one batch, then a + // single thin checkpointed event for the source tip. setRemoteTips(5, 5); await blockStream.work(); - // Each checkpointed block triggers a blocks-added and chain-checkpointed event - // (since each checkpoint contains one block in our mock) expect(handler.events).toEqual([ - expectBlocksAdded([1]), - expectCheckpointed(), - expectBlocksAdded([2]), - expectCheckpointed(), - expectBlocksAdded([3]), - expectCheckpointed(), - expectBlocksAdded([4]), - expectCheckpointed(), - expectBlocksAdded([5]), - expectCheckpointed(), + { type: 'blocks-added', blocks: times(5, i => makeBlock(i + 1)) }, + checkpointedEvent(5), ]); - // 2 calls: one for block 0 in reorg detection (hash compare at genesis), one for block 1 in loop 2. - expect(blockSource.getBlockData).toHaveBeenCalledTimes(2); - expect(blockSource.getBlocks).not.toHaveBeenCalled(); + // No checkpoint payloads are fetched anymore. + expect(blockSource.getBlocks).toHaveBeenCalledWith({ from: BlockNumber(1), limit: 5 }); }); - it('fetches checkpointed blocks first, then uncheckpointed blocks', async () => { - // Blocks 1-3 are checkpointed, blocks 4-5 are uncheckpointed + it('emits checkpointed once even when the checkpointed tip trails the proposed tip', async () => { + // Blocks 1-3 checkpointed, blocks 4-5 uncheckpointed. setRemoteTips(5, 3); await blockStream.work(); - // First 3 blocks come via checkpoints, last 2 via getBlocks + // Download all 5 blocks, then a single checkpointed event for checkpoint 3. expect(handler.events).toEqual([ - expectBlocksAdded([1]), - expectCheckpointed(), - expectBlocksAdded([2]), - expectCheckpointed(), - expectBlocksAdded([3]), - expectCheckpointed(), - expectBlocksAdded([4, 5]), + { type: 'blocks-added', blocks: times(5, i => makeBlock(i + 1)) }, + checkpointedEvent(3), ]); - // 2 calls: one for block 0 in reorg detection (hash compare at genesis), one for block 1 in loop 2. - expect(blockSource.getBlockData).toHaveBeenCalledTimes(2); - expect(blockSource.getBlocks).toHaveBeenCalledWith({ from: BlockNumber(4), limit: 2 }); + }); + + it('does not re-emit the checkpointed event once the local tip matches the source', async () => { + setRemoteTips(5, 5); + localData.setProposed(5); + localData.setCheckpointed(5, 5); + + await blockStream.work(); + expect(handler.events.filter(e => e.type === 'chain-checkpointed')).toEqual([]); }); it('handles reorg with uncheckpointed reason when pruned to checkpointed tip', async () => { @@ -346,22 +274,60 @@ describe('L2BlockStream', () => { // The reorg-search loop must NOT walk past block 0; it should throw a clear error // pointing at the genesis-hash mismatch instead of cascading into "block hash not found - // for -1" further down. The error is caught and logged by `work` rather than rethrown, - // so we assert via the logged error and ensure no events were emitted. - const errorSpy = jest.spyOn( - (blockStream as unknown as { log: { error: (...args: any[]) => void } }).log, - 'error', - ); + // for -1" further down. The error is caught and logged by `work` rather than rethrown. + const log = mock(); + blockStream = new TestL2BlockStream(blockSource, localData, handler, log, { batchSize: 10 }); await blockStream.work(); expect(handler.events).toEqual([]); - expect(errorSpy).toHaveBeenCalledWith( - expect.stringContaining('Error processing block stream'), - expect.objectContaining({ - message: expect.stringContaining('Genesis block hash mismatch'), - }), - ); + expect(log.error).toHaveBeenCalledWith(expect.stringContaining('Genesis block hash mismatch'), expect.anything()); + }); + }); + + describe('A-1061 regression: startingBlock past the source checkpointed tip', () => { + let localData: TestL2BlockStreamLocalDataProvider; + let handler: TestL2BlockStreamEventHandler; + let blockStream: TestL2BlockStream; + + beforeEach(() => { + localData = new TestL2BlockStreamLocalDataProvider(); + handler = new TestL2BlockStreamEventHandler(); + }); + + it('emits the source checkpointed tip on the FIRST pass even when startingBlock is past it', async () => { + // Source: checkpointed=30, proven=25, proposed=35. The node restarts with startingBlock=33 (past the + // checkpointed tip of block 30). Pre-rewrite, the startingBlock fast-forward suppressed all checkpoint + // emission, leaving the local checkpointed cursor stuck at genesis while proven still advanced. + setRemoteTips(35, 30, 25, 10); + blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { + batchSize: 10, + startingBlock: 33, + }); + + await blockStream.work(); + + const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); + expect(checkpointEvents).toEqual([checkpointedEvent(30)]); + + // proven is resolvable (block 25), and the checkpointed cursor is NOT stuck at genesis. + const provenEvents = handler.events.filter(e => e.type === 'chain-proven'); + expect(provenEvents).toEqual([ + { type: 'chain-proven', block: makeBlockId(25), checkpoint: makeCheckpointId(25) }, + ]); + }); + + it('downloads blocks from startingBlock, not from genesis', async () => { + setRemoteTips(35, 30, 25, 10); + blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { + batchSize: 10, + startingBlock: 33, + }); + + await blockStream.work(); + + // First block download begins at startingBlock (33), skipping 1..32. + expect(blockSource.getBlocks).toHaveBeenCalledWith({ from: BlockNumber(33), limit: 3 }); }); }); @@ -384,13 +350,12 @@ describe('L2BlockStream', () => { startingBlock: 30, }); - // We first seed a few blocks into the blockstream - // Block 30 comes via checkpoint, blocks 31-35 via uncheckpointed + // We first seed a few blocks into the blockstream: blocks 30-35 (startingBlock=30), plus the + // checkpointed/proven/finalized reconciliation events. await blockStream.work(); expect(handler.events).toEqual([ - expectBlocksAdded([30]), - expectCheckpointed(30), - { type: 'blocks-added', blocks: times(5, i => makeBlock(i + 31)) }, + { type: 'blocks-added', blocks: times(6, i => makeBlock(i + 30)) }, + checkpointedEvent(30), { type: 'chain-proven', block: makeBlockId(25), checkpoint: makeCheckpointId(25) }, { type: 'chain-finalized', block: makeBlockId(10), checkpoint: makeCheckpointId(10) }, ]); @@ -404,19 +369,16 @@ describe('L2BlockStream', () => { ]); }); - // Regression test for the checkpoint-replay storm: pruning to an uncheckpointed block ahead of - // the checkpointed tip must not reset the checkpointed cursor, otherwise the next work() replays - // every checkpoint from 1 to the source tip. - it('does not replay checkpoints after pruning to an uncheckpointed block ahead of the checkpointed tip', async () => { - // Sync blocks 1-7: blocks 1-5 are checkpointed (checkpoints 1-5), blocks 6-7 uncheckpointed. + // Regression: pruning to an uncheckpointed block ahead of the checkpointed tip must not reset the + // checkpointed cursor, otherwise the next work() re-emits the checkpointed tip. + it('does not re-emit the checkpointed tip after pruning to a block ahead of it', async () => { + // Sync blocks 1-7: blocks 1-5 are checkpointed (checkpoint 5), blocks 6-7 uncheckpointed. setRemoteTips(7, 5); await blockStream.work(); - const checkpointEventsOnSync = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEventsOnSync).toHaveLength(5); + expect(handler.events.filter(e => e.type === 'chain-checkpointed')).toEqual([checkpointedEvent(5)]); handler.clearEvents(); // Source drops its proposed tip to block 6 (uncheckpointed, still ahead of checkpointed=5). - // The stream prunes the local proposed tip from 7 back to 6. setRemoteTips(6, 5); await blockStream.work(); expect(handler.events).toEqual([ @@ -424,1231 +386,161 @@ describe('L2BlockStream', () => { ]); handler.clearEvents(); - // The next sync must NOT re-emit any chain-checkpointed events: the checkpointed cursor was - // left at block 5 / checkpoint 5, so there is nothing to replay. + // The next sync must NOT re-emit a chain-checkpointed event: the checkpointed cursor was left at + // block 5 / checkpoint 5. await blockStream.work(); expect(handler.events.filter(e => e.type === 'chain-checkpointed')).toEqual([]); }); - }); - - describe('multiple blocks per checkpoint', () => { - let localData: TestL2BlockStreamLocalDataProvider; - let handler: TestL2BlockStreamEventHandler; - let blockStream: TestL2BlockStream; - - // Configuration for checkpoint structure: each checkpoint contains 3 blocks - const blocksPerCheckpoint = 3; - - /** Gets the checkpoint number for a given block number */ - const getCheckpointForBlock = (blockNum: number) => Math.ceil(blockNum / blocksPerCheckpoint); - - /** Gets the first block number in a checkpoint */ - const getFirstBlockInCheckpoint = (checkpointNum: number) => (checkpointNum - 1) * blocksPerCheckpoint + 1; - - /** Gets the last block number in a checkpoint */ - const getLastBlockInCheckpoint = (checkpointNum: number) => checkpointNum * blocksPerCheckpoint; - - /** Makes a block with hash method (for use in mocks that need hash) */ - const makeBlockInCheckpointWithHash = (blockNum: number) => { - const checkpointNum = getCheckpointForBlock(blockNum); - const firstBlockInCheckpoint = getFirstBlockInCheckpoint(checkpointNum); - return { - number: BlockNumber(blockNum), - checkpointNumber: CheckpointNumber(checkpointNum), - indexWithinCheckpoint: blockNum - firstBlockInCheckpoint, - hash: () => Promise.resolve(new BlockHash(new Fr(blockNum))), - } as L2Block; - }; - - /** Makes block data for a checkpointed block */ - const makeBlockDataInCheckpoint = (blockNum: number): BlockData => - ({ - header: makeHeader(blockNum), - checkpointNumber: CheckpointNumber(getCheckpointForBlock(blockNum)), - indexWithinCheckpoint: blockNum - getFirstBlockInCheckpoint(getCheckpointForBlock(blockNum)), - }) as unknown as BlockData; - - /** Sets the remote tips with correct checkpoint numbers for multi-block checkpoints. */ - const setRemoteTipsMultiBlock = ( - latest_: number, - checkpointedBlock?: number, - proven?: number, - finalized?: number, - proposedCheckpointBlock?: number, - ) => { - checkpointedBlock = checkpointedBlock ?? 0; - proven = proven ?? 0; - finalized = finalized ?? 0; - proposedCheckpointBlock = proposedCheckpointBlock ?? 0; - latest = latest_; - checkpointed = checkpointedBlock; - - const checkpointedCheckpointNum = checkpointedBlock > 0 ? getCheckpointForBlock(checkpointedBlock) : 0; - const provenCheckpointNum = proven > 0 ? getCheckpointForBlock(proven) : 0; - const finalizedCheckpointNum = finalized > 0 ? getCheckpointForBlock(finalized) : 0; - const proposedCheckpointNum = proposedCheckpointBlock > 0 ? getCheckpointForBlock(proposedCheckpointBlock) : 0; - - blockSource.getL2Tips.mockResolvedValue({ - proposed: { number: BlockNumber(latest), hash: makeHash(latest) }, - checkpointed: { - block: { number: BlockNumber(checkpointedBlock), hash: makeHash(checkpointedBlock) }, - checkpoint: { - number: CheckpointNumber(checkpointedCheckpointNum), - hash: makeHash(checkpointedCheckpointNum), - }, - }, - proposedCheckpoint: { - block: { number: BlockNumber(proposedCheckpointBlock), hash: makeHash(proposedCheckpointBlock) }, - checkpoint: { - number: CheckpointNumber(proposedCheckpointNum), - hash: makeHash(proposedCheckpointNum), - }, - }, - proven: { - block: { number: BlockNumber(proven), hash: makeHash(proven) }, - checkpoint: { number: CheckpointNumber(provenCheckpointNum), hash: makeHash(provenCheckpointNum) }, - }, - finalized: { - block: { number: BlockNumber(finalized), hash: makeHash(finalized) }, - checkpoint: { number: CheckpointNumber(finalizedCheckpointNum), hash: makeHash(finalizedCheckpointNum) }, - }, - }); - }; - - beforeEach(() => { - localData = new TestL2BlockStreamLocalDataProvider(); - handler = new TestL2BlockStreamEventHandler(); - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { batchSize: 10 }); - - // Override the mock to support multiple blocks per checkpoint - blockSource.getBlockData.mockImplementation(query => { - if (!('number' in query)) { - return Promise.resolve(undefined); - } - return Promise.resolve(query.number > latest ? undefined : makeBlockDataInCheckpoint(query.number)); - }); - - // Returns published checkpoints with multiple blocks each, respecting the limit parameter - blockSource.getCheckpoints.mockImplementation(query => { - if (!('from' in query)) { - return Promise.resolve([]); - } - const { from: checkpointNumber, limit } = query; - const checkpoints: PublishedCheckpoint[] = []; - for (let i = 0; i < limit; i++) { - const cpNum = CheckpointNumber(checkpointNumber + i); - const firstBlock = getFirstBlockInCheckpoint(cpNum); - const lastBlock = getLastBlockInCheckpoint(cpNum); - // Only include checkpoints that are within the checkpointed range - if (lastBlock > checkpointed) { - break; - } - checkpoints.push({ - checkpoint: { - number: cpNum, - hash: () => new Fr(cpNum), - blocks: times(blocksPerCheckpoint, j => makeBlockInCheckpointWithHash(firstBlock + j)), - }, - } as unknown as PublishedCheckpoint); - } - return Promise.resolve(checkpoints); - }); - }); - - it('emits all blocks in a checkpoint before chain-checkpointed event', async () => { - // Set up: 6 blocks in 2 checkpoints (blocks 1-3 in checkpoint 1, blocks 4-6 in checkpoint 2) - setRemoteTipsMultiBlock(6, 6); + // prune + same-pass reconciliation: a prune walk-back and the catch-up tier events fire in one pass. + it('emits the prune event and the new tier events in the same pass', async () => { + // Sync up to a checkpointed/proven chain: proposed=9, checkpointed=9, proven=6, finalized=3. + setRemoteTips(9, 9, 6, 3); await blockStream.work(); + handler.clearEvents(); - // Should emit blocks 1-3, then checkpoint 1, then blocks 4-6, then checkpoint 2 - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - ]); - }); - - it('handles partial checkpoint at the end (uncheckpointed blocks)', async () => { - // Set up: 5 blocks total, but only first 3 are checkpointed (checkpoint 1 complete) - // Blocks 4-5 are uncheckpointed - setRemoteTipsMultiBlock(5, 3); - + // Reorg: the source drops its proposed/checkpointed tip to block 6 (the memory store still holds 7-9, + // which the source no longer serves) and finalized advances to 6 within the same snapshot. + setRemoteTips(6, 6, 6, 6); await blockStream.work(); - // Should emit checkpoint 1 blocks, then checkpoint event, then uncheckpointed blocks 4-5 - expect(handler.events).toEqual([expectBlocksAdded([1, 2, 3]), expectCheckpointed(1), expectBlocksAdded([4, 5])]); + // First the prune to block 6, then the finalized reconciliation event for the advanced finalized tip. + expect(handler.events[0]).toEqual({ + type: 'chain-pruned', + block: makeBlockId(6), + checkpointed: makeTipId(6), + proven: makeTipId(6), + }); + const finalizedEvents = handler.events.filter(e => e.type === 'chain-finalized'); + expect(finalizedEvents).toEqual([ + { type: 'chain-finalized', block: makeBlockId(6), checkpoint: makeCheckpointId(6) }, + ]); }); + }); - it('handles starting from middle of a checkpoint', async () => { - // Set up: 9 blocks in 3 checkpoints, but we start from block 5 (middle of checkpoint 2) - // Local has blocks 1-4, local checkpointed = 0 - setRemoteTipsMultiBlock(9, 9); - localData.proposed.number = BlockNumber(4); + describe('hash-gated tier reconciliation', () => { + // World-state-shaped provider: reports `undefined` block hashes for its proven/finalized tips. The + // reconciliation must skip the hash comparison so it does not re-emit on every poll. + class WorldStateShapedProvider implements L2BlockStreamLocalDataProvider { + public proposedNumber = BlockNumber.ZERO; + public provenNumber = BlockNumber.ZERO; + public finalizedNumber = BlockNumber.ZERO; - await blockStream.work(); + public getL2BlockHash(number: number): Promise { + return Promise.resolve(number > this.proposedNumber ? undefined : new Fr(number).toString()); + } - // Should first emit checkpoint 1 (blocks 1-4 already local) - // Then continue from block 5, which is in checkpoint 2 - // Blocks 5-6 complete checkpoint 2, then blocks 7-9 complete checkpoint 3 - expect(handler.events).toEqual([ - expectCheckpointed(1), // checkpoint 1 for already-local blocks 1-4 - expectBlocksAdded([5, 6]), - expectCheckpointed(2), // checkpoint 2 - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), // checkpoint 3 - ]); + public getL2Tips(): Promise { + return Promise.resolve({ + proposed: { number: this.proposedNumber, hash: new Fr(this.proposedNumber).toString() }, + // proven/finalized hashes are intentionally undefined, as world-state reports them. + proven: { block: { number: this.provenNumber } }, + finalized: { block: { number: this.finalizedNumber } }, + }); + } + } - // Verify checkpoint order - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(3); - expect((checkpointEvents[0] as any).checkpoint.checkpoint.number).toBe(CheckpointNumber(1)); - expect((checkpointEvents[1] as any).checkpoint.checkpoint.number).toBe(CheckpointNumber(2)); - expect((checkpointEvents[2] as any).checkpoint.checkpoint.number).toBe(CheckpointNumber(3)); - }); + it('does not re-emit proven/finalized when the local hash is undefined and numbers match', async () => { + const localData = new WorldStateShapedProvider(); + const handler = new TestL2BlockStreamEventHandler(); + const blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { + batchSize: 10, + ignoreCheckpoints: true, + }); - it('correctly identifies checkpoint number in chain-checkpointed events', async () => { - // Set up: 6 blocks in 2 checkpoints - setRemoteTipsMultiBlock(6, 6); + // Source proven/finalized are at the same numbers the local provider already tracks. + setRemoteTips(9, 0, 6, 3); + localData.proposedNumber = BlockNumber(9); + localData.provenNumber = BlockNumber(6); + localData.finalizedNumber = BlockNumber(3); await blockStream.work(); - // Extract the chain-checkpointed events - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(2); - expect((checkpointEvents[0] as any).checkpoint.checkpoint.number).toBe(CheckpointNumber(1)); - expect((checkpointEvents[1] as any).checkpoint.checkpoint.number).toBe(CheckpointNumber(2)); + // Numbers match and local hashes are undefined ⇒ no re-emission. + expect(handler.events.filter(e => e.type === 'chain-proven')).toEqual([]); + expect(handler.events.filter(e => e.type === 'chain-finalized')).toEqual([]); }); - it('handles many checkpoints with batching', async () => { - // Set up: 12 blocks in 4 checkpoints (3 blocks each), with batch size of 5 - // Batch size doesn't align with checkpoint boundaries, so the stream must - // respect checkpoint boundaries and emit events correctly - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { batchSize: 5 }); - setRemoteTipsMultiBlock(12, 12); - - await blockStream.work(); - - // Even though batch size is 5, checkpoint boundaries (every 3 blocks) take precedence - // Expected sequence: - // - Blocks 1-3 (checkpoint 1), then checkpoint 1 event - // - Blocks 4-6 (checkpoint 2), then checkpoint 2 event - // - Blocks 7-9 (checkpoint 3), then checkpoint 3 event - // - Blocks 10-12 (checkpoint 4), then checkpoint 4 event - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - expectBlocksAdded([10, 11, 12]), - expectCheckpointed(4), - ]); - }); + // Finding 3: a same-number, different-hash proven tip IS re-emitted. + it('re-emits the proven tip when numbers match but the known local hash differs', async () => { + const localData = new TestL2BlockStreamLocalDataProvider(); + const handler = new TestL2BlockStreamEventHandler(); + const blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { + batchSize: 10, + ignoreCheckpoints: true, + }); - it('does not emit more than batchSize blocks in a single blocks-added event for checkpointed blocks', async () => { - // Set up: 12 blocks in 4 checkpoints (3 blocks each), but batch size is 2 - // Each checkpoint has 3 blocks, but we should never emit more than 2 at once - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { batchSize: 2 }); - setRemoteTipsMultiBlock(12, 12); + setRemoteTips(9, 0, 6, 3); + localData.proposed.number = BlockNumber(9); + // Local proven sits at the same block number but a stale hash (e.g. a same-height reorg). + localData.proven.block.number = BlockNumber(6); + localData.proven.block.hash = '0xstale6'; + localData.finalized.block.number = BlockNumber(3); + localData.finalized.block.hash = makeHash(3); await blockStream.work(); - // Verify no blocks-added event has more than 2 blocks - const blocksAddedEvents = handler.events.filter(e => e.type === 'blocks-added'); - for (const event of blocksAddedEvents) { - if (event.type === 'blocks-added') { - expect(event.blocks.length).toBeLessThanOrEqual(2); - } - } - - // Expected sequence with batchSize=2: - // - Blocks 1-2, then blocks 3, then checkpoint 1 - // - Blocks 4-5, then block 6, then checkpoint 2 - // etc. - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2]), - expectBlocksAdded([3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5]), - expectBlocksAdded([6]), - expectCheckpointed(2), - expectBlocksAdded([7, 8]), - expectBlocksAdded([9]), - expectCheckpointed(3), - expectBlocksAdded([10, 11]), - expectBlocksAdded([12]), - expectCheckpointed(4), + expect(handler.events.filter(e => e.type === 'chain-proven')).toEqual([ + { type: 'chain-proven', block: makeBlockId(6), checkpoint: makeCheckpointId(6) }, ]); + // Finalized matched on both number and hash ⇒ not re-emitted. + expect(handler.events.filter(e => e.type === 'chain-finalized')).toEqual([]); }); + }); - it('emits checkpoint event when blocks become checkpointed after being added as uncheckpointed', async () => { - // Phase 1: Start with 3 checkpointed blocks (checkpoint 1), then add blocks 4-6 as uncheckpointed - setRemoteTipsMultiBlock(6, 3); - - await blockStream.work(); - - // Expect: blocks 1-3 via checkpoint, then uncheckpointed blocks 4-6 - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - ]); - - handler.clearEvents(); - - // Update local state to reflect what the handler would have stored - localData.proposed.number = BlockNumber(6); - localData.checkpointed.block.number = BlockNumber(3); - localData.checkpointed.checkpoint.number = CheckpointNumber(1); - - // Phase 2: Now checkpoint 2 completes (blocks 4-6 become checkpointed) - setRemoteTipsMultiBlock(6, 6); - - await blockStream.work(); + describe('ignoreCheckpoints', () => { + let localData: TestL2BlockStreamLocalDataProvider; + let handler: TestL2BlockStreamEventHandler; + let blockStream: TestL2BlockStream; - // Should emit a checkpoint event for checkpoint 2 (blocks 4-6), even though blocks were already added - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(1); - expect((checkpointEvents[0] as any).checkpoint.checkpoint.number).toBe(CheckpointNumber(2)); + beforeEach(() => { + localData = new TestL2BlockStreamLocalDataProvider(); + handler = new TestL2BlockStreamEventHandler(); + blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { + batchSize: 10, + ignoreCheckpoints: true, + }); }); - it('emits checkpoint event BEFORE new uncheckpointed blocks when checkpoint completes', async () => { - // Phase 1: Start with 3 checkpointed blocks (checkpoint 1), then add blocks 4-6 as uncheckpointed - setRemoteTipsMultiBlock(6, 3); - - await blockStream.work(); - - // Expect: blocks 1-3 via checkpoint, then uncheckpointed blocks 4-6 - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - ]); - - handler.clearEvents(); - - // Update local state to reflect what the handler would have stored - localData.proposed.number = BlockNumber(6); - localData.checkpointed.block.number = BlockNumber(3); - localData.checkpointed.checkpoint.number = CheckpointNumber(1); - - // Phase 2: Checkpoint 2 completes (blocks 4-6) AND a new block 7 arrives - setRemoteTipsMultiBlock(7, 6); + it('does not emit checkpoint events for new checkpointed blocks', async () => { + setRemoteTips(6, 6); await blockStream.work(); - // Should emit checkpoint 2 FIRST, then the new uncheckpointed block 7 - // NOT: block 7 first, then checkpoint 2 - expect(handler.events).toEqual([expectCheckpointed(2), expectBlocksAdded([7])]); + expect(handler.events).toEqual([{ type: 'blocks-added', blocks: times(6, i => makeBlock(i + 1)) }]); }); - it('emits checkpoint as soon as last block in checkpoint arrives', async () => { - // This tests the realistic scenario where checkpoints are published as blocks arrive. - // Uncheckpointed blocks are always just a partial checkpoint (the current incomplete one). - - // Sync 1: Source has checkpointed=6 (checkpoint 2), proposed=9 - // Client gets blocks 1-6 via checkpoints, blocks 7-9 as uncheckpointed - setRemoteTipsMultiBlock(9, 6); - + it('still emits prune events but no checkpoint events', async () => { + setRemoteTips(9, 9); await blockStream.work(); - - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - expectBlocksAdded([7, 8, 9]), // uncheckpointed - ]); - handler.clearEvents(); - // Update local state localData.proposed.number = BlockNumber(9); - localData.checkpointed.block.number = BlockNumber(6); - localData.checkpointed.checkpoint.number = CheckpointNumber(2); - - // Sync 2: Checkpoint 3 is now published (blocks 7-9), new blocks 10-12 are uncheckpointed - setRemoteTipsMultiBlock(12, 9); - - await blockStream.work(); - - // Should emit checkpoint 3 for already-local blocks 7-9, then uncheckpointed blocks 10-12 - expect(handler.events).toEqual([ - expectCheckpointed(3), - expectBlocksAdded([10, 11, 12]), // uncheckpointed - ]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(12); localData.checkpointed.block.number = BlockNumber(9); - localData.checkpointed.checkpoint.number = CheckpointNumber(3); - - // Sync 3: Checkpoint 4 is now published (blocks 10-12), no new blocks - setRemoteTipsMultiBlock(12, 12); - - await blockStream.work(); - - // Should emit checkpoint 4 for already-local blocks 10-12 - expect(handler.events).toEqual([expectCheckpointed(4)]); - }); - - it('emits all checkpoints when source jumps ahead with multiple new checkpoints', async () => { - // Phase 1: Start with checkpoint 1 complete (blocks 1-3), blocks 4-6 uncheckpointed - setRemoteTipsMultiBlock(6, 3); + localData.checkpointed.checkpoint.number = CheckpointNumber(9); + for (let i = 4; i <= 9; i++) { + localData.blockHashes[i] = `0xbad${i}`; + } + setRemoteTips(3, 3); await blockStream.work(); expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), + { type: 'chain-pruned', block: makeBlockId(3), checkpointed: makeTipId(3), proven: makeTipId(0) }, ]); + }); - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(6); - localData.checkpointed.block.number = BlockNumber(3); - localData.checkpointed.checkpoint.number = CheckpointNumber(1); - - // Phase 2: Source jumps to block 12 with checkpoints at 6, 9, and 12 - // - Checkpoint 2 (blocks 4-6) - blocks already local, needs checkpoint event - // - Checkpoint 3 (blocks 7-9) - new blocks + checkpoint event - // - Checkpoint 4 (blocks 10-12) - new blocks + checkpoint event - setRemoteTipsMultiBlock(12, 12); + it('still emits proven and finalized events', async () => { + setRemoteTips(9, 9, 6, 3); await blockStream.work(); - // Should emit: - // 1. Checkpoint 2 event (blocks 4-6 were already local) - // 2. Blocks 7-9 + checkpoint 3 event - // 3. Blocks 10-12 + checkpoint 4 event expect(handler.events).toEqual([ - expectCheckpointed(2), - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - expectBlocksAdded([10, 11, 12]), - expectCheckpointed(4), + { type: 'blocks-added', blocks: times(9, i => makeBlock(i + 1)) }, + { type: 'chain-proven', block: makeBlockId(6), checkpoint: makeCheckpointId(6) }, + { type: 'chain-finalized', block: makeBlockId(3), checkpoint: makeCheckpointId(3) }, ]); }); - - describe('startingBlock with stale checkpoint state', () => { - // When a node restarts with startingBlock set and has local blocks but no checkpoint - // state (e.g. checkpoint tracking is new, or checkpoint state was reset), Loop 1 - // should not spam checkpoint events for all historical checkpoints. - - it('skips historical checkpoint events before startingBlock on restart with stale checkpoint state', async () => { - // node has blocks 1-15 locally (proposed=15) but no checkpoint state. - // Checkpoint 5 covers blocks 13-15 (the last checkpoint). - setRemoteTipsMultiBlock(15, 15); - localData.proposed.number = BlockNumber(15); - // localData.checkpointed starts at 0 - simulating stale/missing checkpoint state - - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - startingBlock: 13, // start from checkpoint 5 (blocks 13-15) - }); - - await blockStream.work(); - - // Should only emit checkpoint 5 (the one containing startingBlock=13), not all 5 checkpoints - expect(handler.events).toEqual([expectCheckpointed(5)]); - // Verify we don't spam checkpoints 1-4 - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(1); - }); - - it('without startingBlock emits all historical checkpoints for already-local blocks', async () => { - // Same scenario without startingBlock: should emit all 5 checkpoints (correct catch-up behavior) - setRemoteTipsMultiBlock(15, 15); - localData.proposed.number = BlockNumber(15); - // localData.checkpointed starts at 0 - - await blockStream.work(); - - // All 5 checkpoints should be emitted since they're all for already-local blocks - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(5); - }); - - it('skips Loop 1 entirely when startingBlock is past the checkpointed tip', async () => { - // proposed=15, checkpointed=9 (ckpt 3 covers blocks 7-9). startingBlock=12 is past the - // checkpointed tip, in the proposed range. Loop 1 must skip without an RPC for block 12. - setRemoteTipsMultiBlock(15, 9); - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - startingBlock: 12, - }); - - await blockStream.work(); - - // No chain-checkpointed events because startingBlock is past the checkpointed tip. - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(0); - // Loop 1 must not query block 12 — past-the-tip is decided from sourceTips alone. - const loop1Calls = blockSource.getBlockData.mock.calls.filter( - c => 'number' in c[0] && (c[0] as { number: number }).number === 12, - ); - expect(loop1Calls).toHaveLength(0); - }); - - it('calls getBlockData for block 0 only for reorg detection, not checkpoint lookup, when startingBlock is 0', async () => { - // With startingBlock=0, the stream skips the checkpoint-number lookup (line 121 path) - // so getBlockData is called for block 0 only once: for the genesis reorg detection. - setRemoteTipsMultiBlock(15, 15); - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - startingBlock: 0, - }); - - await blockStream.work(); - - const calls = blockSource.getBlockData.mock.calls; - const block0Calls = calls.filter(c => 'number' in c[0] && (c[0] as { number: number }).number === 0); - // Only the genesis reorg-detection call — not an additional checkpoint-lookup call. - expect(block0Calls).toHaveLength(1); - }); - }); - - describe('checkpoint prefetching', () => { - it('prefetches multiple checkpoints in a single RPC call', async () => { - // Set up: 9 blocks in 3 checkpoints - setRemoteTipsMultiBlock(9, 9); - - // Create a stream with prefetch limit of 10 (will fetch all 3 checkpoints in one call) - // This also tests that we handle getting fewer checkpoints (3) than requested (10) - const prefetchStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - checkpointPrefetchLimit: 10, - }); - - await prefetchStream.work(); - - // Should have fetched all 3 checkpoints in a single call (Loop 2 makes 1 call with limit 10) - // Even though we requested 10, only 3 exist - verify we handle this correctly - const calls = blockSource.getCheckpoints.mock.calls; - const loop2Calls = calls.filter(([query]) => 'limit' in query && query.limit === 10); - expect(loop2Calls.length).toBe(1); - expect((loop2Calls[0][0] as { from: number }).from).toBe(1); // Starting from checkpoint 1 - - // All 3 checkpoints should be emitted correctly (not 10) - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(3); - - // Verify correct event order - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - ]); - }); - - it('prefetches checkpoints correctly when starting from an offset', async () => { - // Set up: 15 blocks in 5 checkpoints, but we already have blocks 1-6 locally (checkpoints 1-2) - setRemoteTipsMultiBlock(15, 15); - localData.proposed.number = BlockNumber(6); - localData.checkpointed.checkpoint.number = CheckpointNumber(2); - - // Create a stream with prefetch limit of 10 - const prefetchStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - checkpointPrefetchLimit: 10, - }); - - await prefetchStream.work(); - - // Loop 2 should start fetching from checkpoint 3 (block 7 is in checkpoint 3) - const calls = blockSource.getCheckpoints.mock.calls; - const loop2Calls = calls.filter(([query]) => 'limit' in query && query.limit === 10); - expect(loop2Calls.length).toBe(1); - expect((loop2Calls[0][0] as { from: number }).from).toBe(3); // Starting from checkpoint 3, not 1 - - // Should only emit blocks 7-15 and checkpoints 3-5 (not 1-2, those are already local) - expect(handler.events).toEqual([ - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - expectBlocksAdded([10, 11, 12]), - expectCheckpointed(4), - expectBlocksAdded([13, 14, 15]), - expectCheckpointed(5), - ]); - - // Verify only 3 new checkpoints emitted - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(3); - }); - - it('prefetches correctly when starting from middle of a checkpoint', async () => { - // Local has blocks 1-7: checkpoints 1-2 complete, block 7 is first block of checkpoint 3 - setRemoteTipsMultiBlock(15, 15); - localData.proposed.number = BlockNumber(7); - localData.checkpointed.checkpoint.number = CheckpointNumber(2); - - const prefetchStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - checkpointPrefetchLimit: 10, - }); - - await prefetchStream.work(); - - // Should start prefetching from checkpoint 3 - const calls = blockSource.getCheckpoints.mock.calls; - const loop2Calls = calls.filter(([query]) => 'limit' in query && query.limit === 10); - expect(loop2Calls.length).toBe(1); - expect((loop2Calls[0][0] as { from: number }).from).toBe(3); // Starting from checkpoint 3 - - // Should emit only blocks 8-9 from checkpoint 3 (block 7 is already local) - expect(handler.events).toEqual([ - expectBlocksAdded([8, 9]), // Rest of checkpoint 3 - expectCheckpointed(3), - expectBlocksAdded([10, 11, 12]), - expectCheckpointed(4), - expectBlocksAdded([13, 14, 15]), - expectCheckpointed(5), - ]); - - // Verify only 3 checkpoints emitted - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(3); - }); - - it('refills prefetch buffer when exhausted', async () => { - // Set up: 15 blocks in 5 checkpoints - setRemoteTipsMultiBlock(15, 15); - - // Create a stream with prefetch limit of 2 (will need 3 calls to fetch 5 checkpoints) - const prefetchStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - checkpointPrefetchLimit: 2, - }); - - await prefetchStream.work(); - - // Should have made 3 calls with limit 2 to fetch 5 checkpoints - const calls = blockSource.getCheckpoints.mock.calls; - const loop2Calls = calls.filter(([query]) => 'limit' in query && query.limit === 2); - expect(loop2Calls.length).toBe(3); // ceil(5/2) = 3 - expect((loop2Calls[0][0] as { from: number }).from).toBe(1); // First batch: checkpoints 1-2 - expect((loop2Calls[1][0] as { from: number }).from).toBe(3); // Second batch: checkpoints 3-4 - expect((loop2Calls[2][0] as { from: number }).from).toBe(5); // Third batch: checkpoint 5 - - // All 5 checkpoints should be emitted - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(5); - }); - - it('uses default prefetch limit when not specified', async () => { - // Set up: 9 blocks in 3 checkpoints - setRemoteTipsMultiBlock(9, 9); - - // Create a stream without specifying checkpointPrefetchLimit (should use default of 50) - const defaultPrefetchStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - }); - - await defaultPrefetchStream.work(); - - // Should have used default limit of 50 for Loop 2 calls - const calls = blockSource.getCheckpoints.mock.calls; - const loop2Calls = calls.filter(([query]) => 'limit' in query && query.limit === 50); - expect(loop2Calls.length).toBeGreaterThanOrEqual(1); - }); - }); - - describe('prune scenarios', () => { - it('prunes proposed chain back to checkpointed tip, then continues', async () => { - // Phase 1: Sync blocks 1-9 with checkpoints 1-2, blocks 7-9 uncheckpointed - setRemoteTipsMultiBlock(9, 6); - - await blockStream.work(); - - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - expectBlocksAdded([7, 8, 9]), // uncheckpointed - ]); - - handler.clearEvents(); - - // Update local state to reflect what the handler stored - localData.proposed.number = BlockNumber(9); - localData.checkpointed.block.number = BlockNumber(6); - localData.checkpointed.checkpoint.number = CheckpointNumber(2); - - // Phase 2: Prune - proposed chain pruned back to checkpointed tip (block 6) - // This happens when uncheckpointed blocks (7-9) are invalid - // Mess up hashes for blocks 7-9 to simulate reorg - localData.blockHashes[7] = '0xbad7'; - localData.blockHashes[8] = '0xbad8'; - localData.blockHashes[9] = '0xbad9'; - - // Source now has proposed=6, checkpointed=6 - setRemoteTipsMultiBlock(6, 6); - - await blockStream.work(); - - // Should emit chain-pruned back to block 6 - expect(handler.events).toEqual([ - { - type: 'chain-pruned', - block: makeBlockId(6), - checkpointed: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(2) }), - }), - proven: expect.objectContaining({ - block: expect.objectContaining({ number: BlockNumber(0) }), - }), - }, - ]); - - handler.clearEvents(); - - // Update local state after prune - localData.proposed.number = BlockNumber(6); - delete localData.blockHashes[7]; - delete localData.blockHashes[8]; - delete localData.blockHashes[9]; - - // Phase 3: Chain continues - new blocks 7-12 arrive with checkpoints 3-4 - setRemoteTipsMultiBlock(12, 12); - - await blockStream.work(); - - // Should continue normally: blocks 7-9 + checkpoint 3, blocks 10-12 + checkpoint 4 - expect(handler.events).toEqual([ - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - expectBlocksAdded([10, 11, 12]), - expectCheckpointed(4), - ]); - }); - - it('prunes proposed and checkpointed chains back to proven tip, then continues', async () => { - // Phase 1: Sync blocks 1-12 with checkpoints 1-3, proven at checkpoint 2 (block 6) - setRemoteTipsMultiBlock(12, 9, 6); - - await blockStream.work(); - - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - expectBlocksAdded([10, 11, 12]), - { type: 'chain-proven', block: makeBlockId(6), checkpoint: makeCheckpointId(2) }, - ]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(12); - localData.checkpointed.block.number = BlockNumber(9); - localData.checkpointed.checkpoint.number = CheckpointNumber(3); - localData.proven.block.number = BlockNumber(6); - localData.proven.checkpoint.number = CheckpointNumber(2); - - // Phase 2: Prune - checkpoint 3 failed to prove, prune back to proven tip (block 6) - // Mess up hashes for blocks 7-12 to simulate reorg - for (let i = 7; i <= 12; i++) { - localData.blockHashes[i] = `0xbad${i}`; - } - - // Source now has proposed=6, checkpointed=6, proven=6 - setRemoteTipsMultiBlock(6, 6, 6); - - await blockStream.work(); - - // Should emit chain-pruned back to block 6, carrying the source proven tip (block 6 / ckpt 2) - expect(handler.events).toEqual([ - { - type: 'chain-pruned', - block: makeBlockId(6), - checkpointed: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(2) }), - }), - proven: expect.objectContaining({ - block: expect.objectContaining({ number: BlockNumber(6) }), - checkpoint: expect.objectContaining({ number: CheckpointNumber(2) }), - }), - }, - ]); - - handler.clearEvents(); - - // Update local state after prune - localData.proposed.number = BlockNumber(6); - localData.checkpointed.block.number = BlockNumber(6); - localData.checkpointed.checkpoint.number = CheckpointNumber(2); - for (let i = 7; i <= 12; i++) { - delete localData.blockHashes[i]; - } - - // Phase 3: Chain continues with new blocks and checkpoints - // New blocks 7-15 arrive with checkpoints 3-5, proven advances to checkpoint 3 - setRemoteTipsMultiBlock(15, 15, 9); - - await blockStream.work(); - - // Should continue normally with new blocks and checkpoints - expect(handler.events).toEqual([ - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - expectBlocksAdded([10, 11, 12]), - expectCheckpointed(4), - expectBlocksAdded([13, 14, 15]), - expectCheckpointed(5), - { type: 'chain-proven', block: makeBlockId(9), checkpoint: makeCheckpointId(3) }, - ]); - }); - - it('prunes uncheckpointed blocks and immediately receives new ones', async () => { - // Phase 1: Sync blocks 1-6 with checkpoint 1, blocks 4-6 uncheckpointed - setRemoteTipsMultiBlock(6, 3); - - await blockStream.work(); - - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - ]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(6); - localData.checkpointed.block.number = BlockNumber(3); - localData.checkpointed.checkpoint.number = CheckpointNumber(1); - - // Phase 2: Prune blocks 4-6 due to bad hashes - localData.blockHashes[4] = '0xbad4'; - localData.blockHashes[5] = '0xbad5'; - localData.blockHashes[6] = '0xbad6'; - - // Source still at checkpointed=3 (no new checkpoints yet) - setRemoteTipsMultiBlock(3, 3); - - await blockStream.work(); - - // Should emit prune back to block 3 - expect(handler.events).toEqual([ - { - type: 'chain-pruned', - block: makeBlockId(3), - checkpointed: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(1) }), - }), - proven: expect.objectContaining({ - block: expect.objectContaining({ number: BlockNumber(0) }), - }), - }, - ]); - - handler.clearEvents(); - - // Update local state after prune - localData.proposed.number = BlockNumber(3); - delete localData.blockHashes[4]; - delete localData.blockHashes[5]; - delete localData.blockHashes[6]; - - // Phase 3: New blocks 4-9 arrive with checkpoints 2-3 - setRemoteTipsMultiBlock(9, 9); - - await blockStream.work(); - - // Should continue normally with new blocks and checkpoints - expect(handler.events).toEqual([ - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - ]); - }); - - it('prunes proposed chain back to genesis when no checkpoints exist', async () => { - // Phase 1: Sync blocks 1-6, no checkpoints (checkpointed=0, proven=0, finalized=0) - setRemoteTipsMultiBlock(6, 0); - - await blockStream.work(); - - // All blocks come as uncheckpointed - expect(handler.events).toEqual([expectBlocksAdded([1, 2, 3, 4, 5, 6])]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(6); - - // Phase 2: All blocks are invalid, prune back to genesis (block 0) - for (let i = 1; i <= 6; i++) { - localData.blockHashes[i] = `0xbad${i}`; - } - - // Source now has proposed=0, checkpointed=0 - setRemoteTipsMultiBlock(0, 0); - - await blockStream.work(); - - // Should emit chain-pruned back to block 0 - expect(handler.events).toEqual([ - { - type: 'chain-pruned', - block: makeBlockId(0), - checkpointed: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(0) }), - }), - proven: expect.objectContaining({ - block: expect.objectContaining({ number: BlockNumber(0) }), - }), - }, - ]); - - handler.clearEvents(); - - // Update local state after prune - localData.proposed.number = BlockNumber(0); - for (let i = 1; i <= 6; i++) { - delete localData.blockHashes[i]; - } - - // Phase 3: New blocks 1-6 arrive with checkpoints 1-2 - setRemoteTipsMultiBlock(6, 6); - - await blockStream.work(); - - // Should continue normally from genesis - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - ]); - }); - - it('prunes both proposed and checkpointed chains back to genesis', async () => { - // Phase 1: Sync blocks 1-6 with checkpoint 1 (blocks 1-3), blocks 4-6 uncheckpointed - setRemoteTipsMultiBlock(6, 3); - - await blockStream.work(); - - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - ]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(6); - localData.checkpointed.block.number = BlockNumber(3); - localData.checkpointed.checkpoint.number = CheckpointNumber(1); - - // Phase 2: All blocks are invalid (even checkpointed ones), prune back to genesis - for (let i = 1; i <= 6; i++) { - localData.blockHashes[i] = `0xbad${i}`; - } - - // Source now has proposed=0, checkpointed=0 (full chain reset) - setRemoteTipsMultiBlock(0, 0); - - await blockStream.work(); - - // Should emit chain-pruned back to block 0 - expect(handler.events).toEqual([ - { - type: 'chain-pruned', - block: makeBlockId(0), - checkpointed: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(0) }), - }), - proven: expect.objectContaining({ - block: expect.objectContaining({ number: BlockNumber(0) }), - }), - }, - ]); - - handler.clearEvents(); - - // Update local state after prune - localData.proposed.number = BlockNumber(0); - localData.checkpointed.block.number = BlockNumber(0); - localData.checkpointed.checkpoint.number = CheckpointNumber(0); - for (let i = 1; i <= 6; i++) { - delete localData.blockHashes[i]; - } - - // Phase 3: New chain starts fresh with blocks 1-9 and checkpoints 1-3 - setRemoteTipsMultiBlock(9, 9); - - await blockStream.work(); - - // Should continue normally from genesis - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - ]); - }); - }); - - describe('ignoreCheckpoints', () => { - beforeEach(() => { - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - ignoreCheckpoints: true, - }); - }); - - it('does not emit checkpoint events for new checkpointed blocks', async () => { - // 6 blocks in 2 checkpoints (blocks 1-3 in checkpoint 1, blocks 4-6 in checkpoint 2) - setRemoteTipsMultiBlock(6, 6); - - await blockStream.work(); - - // Should emit blocks-added events but NO chain-checkpointed events - expect(handler.events).toEqual([expectBlocksAdded([1, 2, 3]), expectBlocksAdded([4, 5, 6])]); - }); - - it('does not emit checkpoint events when blocks become checkpointed after being added', async () => { - // Phase 1: Blocks 1-3 checkpointed (checkpoint 1), blocks 4-6 uncheckpointed - setRemoteTipsMultiBlock(6, 3); - - await blockStream.work(); - - // Blocks 1-3 via checkpoint, blocks 4-6 as uncheckpointed, no checkpoint events - expect(handler.events).toEqual([expectBlocksAdded([1, 2, 3]), expectBlocksAdded([4, 5, 6])]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(6); - localData.checkpointed.block.number = BlockNumber(3); - localData.checkpointed.checkpoint.number = CheckpointNumber(1); - - // Phase 2: Now blocks 4-6 become checkpointed too (checkpoint 2) - setRemoteTipsMultiBlock(6, 6); - - await blockStream.work(); - - // Checkpoint 2 event would normally be emitted for blocks 4-6, but not with ignoreCheckpoints - expect(handler.events).toEqual([]); - }); - - it('does not emit checkpoint events but still emits prune events for uncheckpointed blocks', async () => { - // Phase 1: Sync blocks 1-9, blocks 1-6 checkpointed (checkpoints 1-2), blocks 7-9 uncheckpointed - setRemoteTipsMultiBlock(9, 6); - - await blockStream.work(); - - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectBlocksAdded([4, 5, 6]), - expectBlocksAdded([7, 8, 9]), - ]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(9); - localData.checkpointed.block.number = BlockNumber(6); - localData.checkpointed.checkpoint.number = CheckpointNumber(2); - - // Phase 2: Prune uncheckpointed blocks 7-9 (bad hashes) - localData.blockHashes[7] = '0xbad7'; - localData.blockHashes[8] = '0xbad8'; - localData.blockHashes[9] = '0xbad9'; - - setRemoteTipsMultiBlock(6, 6); - - await blockStream.work(); - - // Should emit chain-pruned event (prune events are always emitted), no checkpoint events - expect(handler.events).toEqual([ - { - type: 'chain-pruned', - block: makeBlockId(6), - checkpointed: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(2) }), - }), - proven: expect.objectContaining({ - block: expect.objectContaining({ number: BlockNumber(0) }), - }), - }, - ]); - }); - - it('does not emit checkpoint events but still emits prune events for checkpointed blocks', async () => { - // Phase 1: Sync blocks 1-9, all checkpointed (checkpoints 1-3) - setRemoteTipsMultiBlock(9, 9); - - await blockStream.work(); - - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectBlocksAdded([4, 5, 6]), - expectBlocksAdded([7, 8, 9]), - ]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(9); - localData.checkpointed.block.number = BlockNumber(9); - localData.checkpointed.checkpoint.number = CheckpointNumber(3); - - // Phase 2: Prune checkpointed blocks (reorg of checkpointed chain back to checkpoint 1) - localData.blockHashes[4] = '0xbad4'; - localData.blockHashes[5] = '0xbad5'; - localData.blockHashes[6] = '0xbad6'; - localData.blockHashes[7] = '0xbad7'; - localData.blockHashes[8] = '0xbad8'; - localData.blockHashes[9] = '0xbad9'; - - setRemoteTipsMultiBlock(3, 3); - - await blockStream.work(); - - // Should emit chain-pruned event - expect(handler.events).toEqual([ - { - type: 'chain-pruned', - block: makeBlockId(3), - checkpointed: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(1) }), - }), - proven: expect.objectContaining({ - block: expect.objectContaining({ number: BlockNumber(0) }), - }), - }, - ]); - }); - - it('does not emit checkpoint events but still emits proven and finalized events', async () => { - // 9 blocks in 3 checkpoints, proven at block 6 (checkpoint 2), finalized at block 3 (checkpoint 1) - setRemoteTipsMultiBlock(9, 9, 6, 3); - - await blockStream.work(); - - // Should have blocks-added, chain-proven, chain-finalized, but NO checkpoint events - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectBlocksAdded([4, 5, 6]), - expectBlocksAdded([7, 8, 9]), - { type: 'chain-proven', block: makeBlockId(6), checkpoint: makeCheckpointId(2) }, - { type: 'chain-finalized', block: makeBlockId(3), checkpoint: makeCheckpointId(1) }, - ]); - }); - - it('does not emit checkpoint events but still emits proven and finalized events with skipFinalized', async () => { - // Use skipFinalized in addition to ignoreCheckpoints - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - ignoreCheckpoints: true, - skipFinalized: true, - }); - - // 12 blocks in 4 checkpoints, proven at block 9 (checkpoint 3), finalized at block 6 (checkpoint 2) - setRemoteTipsMultiBlock(12, 12, 9, 6); - - // Local is behind - at block 3, finalized at block 3 - localData.proposed.number = BlockNumber(3); - localData.checkpointed.block.number = BlockNumber(3); - localData.checkpointed.checkpoint.number = CheckpointNumber(1); - localData.proven.block.number = BlockNumber(3); - localData.proven.checkpoint.number = CheckpointNumber(1); - localData.finalized.block.number = BlockNumber(3); - localData.finalized.checkpoint.number = CheckpointNumber(1); - - await blockStream.work(); - - // With skipFinalized, we skip to the finalized tip (block 6), then sync from there - // Should emit blocks 6-12, proven, finalized, but NO checkpoint events - expect(handler.events).toEqual([ - expectBlocksAdded([6]), - expectBlocksAdded([7, 8, 9]), - expectBlocksAdded([10, 11, 12]), - { type: 'chain-proven', block: makeBlockId(9), checkpoint: makeCheckpointId(3) }, - { type: 'chain-finalized', block: makeBlockId(6), checkpoint: makeCheckpointId(2) }, - ]); - }); - - it('does not emit checkpoint events after prune and re-sync with new blocks', async () => { - // Phase 1: Sync blocks 1-9, all checkpointed (checkpoints 1-3) - setRemoteTipsMultiBlock(9, 9); - - await blockStream.work(); - - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectBlocksAdded([4, 5, 6]), - expectBlocksAdded([7, 8, 9]), - ]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(9); - localData.checkpointed.block.number = BlockNumber(9); - localData.checkpointed.checkpoint.number = CheckpointNumber(3); - - // Phase 2: Prune back to block 6 (checkpoint 2) - localData.blockHashes[7] = '0xbad7'; - localData.blockHashes[8] = '0xbad8'; - localData.blockHashes[9] = '0xbad9'; - - setRemoteTipsMultiBlock(6, 6); - - await blockStream.work(); - - expect(handler.events).toEqual([ - { - type: 'chain-pruned', - block: makeBlockId(6), - checkpointed: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(2) }), - }), - proven: expect.objectContaining({ - block: expect.objectContaining({ number: BlockNumber(0) }), - }), - }, - ]); - - handler.clearEvents(); - - // Update local state after prune - localData.proposed.number = BlockNumber(6); - localData.checkpointed.block.number = BlockNumber(6); - localData.checkpointed.checkpoint.number = CheckpointNumber(2); - delete localData.blockHashes[7]; - delete localData.blockHashes[8]; - delete localData.blockHashes[9]; - - // Phase 3: New blocks 7-12 arrive, all checkpointed (checkpoints 3-4) - setRemoteTipsMultiBlock(12, 12); - - await blockStream.work(); - - // Should have new blocks-added events but still no checkpoint events - expect(handler.events).toEqual([expectBlocksAdded([7, 8, 9]), expectBlocksAdded([10, 11, 12])]); - }); - }); }); describe('skipFinalized', () => { @@ -1668,9 +560,9 @@ describe('L2BlockStream', () => { it('skips ahead to the latest finalized block', async () => { setRemoteTips(40, 0, 38, 35); - localData.proposed.number = BlockNumber(5); - localData.proven.block.number = BlockNumber(2); - localData.finalized.block.number = BlockNumber(2); + localData.setProposed(5); + localData.setProven(2); + localData.setFinalized(2); await blockStream.work(); @@ -1685,133 +577,17 @@ describe('L2BlockStream', () => { it('does not skip if already ahead of finalized', async () => { setRemoteTips(40, 0, 38, 35); - localData.proposed.number = BlockNumber(38); - localData.proven.block.number = BlockNumber(38); - localData.finalized.block.number = BlockNumber(35); + localData.setProposed(38); + localData.setProven(38); + localData.setFinalized(35); await blockStream.work(); + // proven and finalized tips already match the source on (number, hash), so only new blocks are emitted. expect(handler.events).toEqual([ { type: 'blocks-added', blocks: times(2, i => makeBlock(i + 39)) }, ] satisfies L2BlockStreamEvent[]); }); - - it('emits the finalized checkpoint when fetching the finalized block (inconsistency)', async () => { - // This test demonstrates an inconsistency: Loop 1 skips finalized checkpoints via - // nextCheckpointToEmit adjustment, but Loop 2 still emits the finalized checkpoint - // when we fetch the finalized block. - // - // Scenario: Fresh start with skipFinalized=true - // - Checkpoint 6 (block 6) is finalized - // - Checkpoints 7-9 are checkpointed - // - Blocks 10-12 are uncheckpointed - // - // Expected (if consistent): Skip checkpoint 6, only emit checkpoints 7+ - // Actual: Checkpoint 6 IS emitted because Loop 2 fetches block 6 and emits its checkpoint - - // 12 blocks total, checkpointed up to 9, proven at 9, finalized at 6 - setRemoteTips(12, 9, 9, 6); - - // Fresh start - no local blocks - localData.proposed.number = BlockNumber(0); - localData.checkpointed.block.number = BlockNumber(0); - localData.checkpointed.checkpoint.number = CheckpointNumber(0); - localData.proven.block.number = BlockNumber(0); - localData.proven.checkpoint.number = CheckpointNumber(0); - localData.finalized.block.number = BlockNumber(0); - localData.finalized.checkpoint.number = CheckpointNumber(0); - - await blockStream.work(); - - // With skipFinalized, nextCheckpointToEmit starts at checkpoint 6 (finalized checkpoint) - // Loop 1 skips immediately (no local blocks) - // Loop 2 starts at block 6 (finalized block), finds checkpoint 6, emits block 6, then emits checkpoint 6 - // This IS the inconsistency: we emit checkpoint 6 even though it's finalized - expect(handler.events).toEqual([ - expectBlocksAdded([6]), - expectCheckpointed(6), // <-- This is the finalized checkpoint being emitted! - expectBlocksAdded([7]), - expectCheckpointed(7), - expectBlocksAdded([8]), - expectCheckpointed(8), - expectBlocksAdded([9]), - expectCheckpointed(9), - { type: 'blocks-added', blocks: times(3, i => makeBlock(i + 10)) }, - { type: 'chain-proven', block: makeBlockId(9), checkpoint: makeCheckpointId(9) }, - { type: 'chain-finalized', block: makeBlockId(6), checkpoint: makeCheckpointId(6) }, - ]); - }); - - it('does not emit finalized checkpointed blocks when skipFinalized is true', async () => { - // Source: finalized=35, proven=38, checkpointed=38, proposed=40 - // All blocks up to 38 are checkpointed (each block is its own checkpoint), blocks 39-40 are uncheckpointed - setRemoteTips(40, 38, 38, 35); - - // Local is at block 5, finalized at 2 - localData.proposed.number = BlockNumber(5); - localData.checkpointed.block.number = BlockNumber(2); - localData.checkpointed.checkpoint.number = CheckpointNumber(2); - localData.proven.block.number = BlockNumber(2); - localData.proven.checkpoint.number = CheckpointNumber(2); - localData.finalized.block.number = BlockNumber(2); - localData.finalized.checkpoint.number = CheckpointNumber(2); - - await blockStream.work(); - - // With skipFinalized=true, we should skip to block 35 (finalized tip) - // We should NOT emit blocks 6-34 since they are finalized - // We should only emit blocks from 35 onwards, and checkpoint events from checkpoint 36 onwards - // (checkpoint 35 is the finalized checkpoint, so we skip it too) - expect(handler.events).toEqual([ - expect.objectContaining({ - type: 'blocks-added', - blocks: [expect.objectContaining({ number: BlockNumber(35) })], - }), - expect.objectContaining({ - type: 'chain-checkpointed', - block: expect.objectContaining({ number: BlockNumber(35) }), - checkpoint: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(35) }), - }), - }), - expect.objectContaining({ - type: 'blocks-added', - blocks: [expect.objectContaining({ number: BlockNumber(36) })], - }), - expect.objectContaining({ - type: 'chain-checkpointed', - block: expect.objectContaining({ number: BlockNumber(36) }), - checkpoint: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(36) }), - }), - }), - expect.objectContaining({ - type: 'blocks-added', - blocks: [expect.objectContaining({ number: BlockNumber(37) })], - }), - expect.objectContaining({ - type: 'chain-checkpointed', - block: expect.objectContaining({ number: BlockNumber(37) }), - checkpoint: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(37) }), - }), - }), - expect.objectContaining({ - type: 'blocks-added', - blocks: [expect.objectContaining({ number: BlockNumber(38) })], - }), - expect.objectContaining({ - type: 'chain-checkpointed', - block: expect.objectContaining({ number: BlockNumber(38) }), - checkpoint: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(38) }), - }), - }), - { type: 'blocks-added', blocks: times(2, i => makeBlock(i + 39)) }, - { type: 'chain-proven', block: makeBlockId(38), checkpoint: makeCheckpointId(38) }, - { type: 'chain-finalized', block: makeBlockId(35), checkpoint: makeCheckpointId(35) }, - ]); - }); }); describe('local provider without checkpointed tip', () => { @@ -1832,14 +608,8 @@ describe('L2BlockStream', () => { await blockStream.work(); - // All 5 blocks are synced (one per checkpoint in the mock) and no checkpoint events are emitted. - expect(handler.events).toEqual([ - expectBlocksAdded([1]), - expectBlocksAdded([2]), - expectBlocksAdded([3]), - expectBlocksAdded([4]), - expectBlocksAdded([5]), - ]); + // All 5 blocks are synced and no checkpoint events are emitted. + expect(handler.events).toEqual([{ type: 'blocks-added', blocks: times(5, i => makeBlock(i + 1)) }]); expect(handler.events.every(e => e.type === 'blocks-added')).toBe(true); }); @@ -1859,6 +629,11 @@ describe('L2BlockStream', () => { }); }); +/** Builds a checkpoint id from a plain number, isolated so the branded-type lint rule sees no BlockNumber flow. */ +function makeTipCheckpointId(checkpointNumber: number) { + return { number: CheckpointNumber(checkpointNumber), hash: new Fr(checkpointNumber).toString() }; +} + class TestL2BlockStreamEventHandler implements L2BlockStreamEventHandler { public readonly events: L2BlockStreamEvent[] = []; public throwing: boolean = false; @@ -1885,20 +660,48 @@ class TestL2BlockStreamEventHandler implements L2BlockStreamEventHandler { class TestL2BlockStreamLocalDataProvider implements L2BlockStreamLocalDataProvider { public readonly blockHashes: Record = {}; - public proposed = { number: BlockNumber.ZERO, hash: GENESIS_BLOCK_HEADER_HASH.toString() }; + // Genesis tip hashes match `getL2BlockHash(0)` (`new Fr(0)`) and the mock source's genesis tips + // (`makeHash(0)`), so the tier reconciliation finds no spurious difference at genesis. + public proposed = { number: BlockNumber.ZERO, hash: new Fr(0).toString() }; public checkpointed = { - block: { number: BlockNumber.ZERO, hash: GENESIS_BLOCK_HEADER_HASH.toString() }, - checkpoint: { number: CheckpointNumber.ZERO, hash: GENESIS_CHECKPOINT_HEADER_HASH.toString() }, + block: { number: BlockNumber.ZERO, hash: new Fr(0).toString() }, + checkpoint: { number: CheckpointNumber.ZERO, hash: new Fr(0).toString() }, }; public proven = { - block: { number: BlockNumber.ZERO, hash: GENESIS_BLOCK_HEADER_HASH.toString() }, - checkpoint: { number: CheckpointNumber.ZERO, hash: GENESIS_CHECKPOINT_HEADER_HASH.toString() }, + block: { number: BlockNumber.ZERO, hash: new Fr(0).toString() }, + checkpoint: { number: CheckpointNumber.ZERO, hash: new Fr(0).toString() }, }; public finalized = { - block: { number: BlockNumber.ZERO, hash: GENESIS_BLOCK_HEADER_HASH.toString() }, - checkpoint: { number: CheckpointNumber.ZERO, hash: GENESIS_CHECKPOINT_HEADER_HASH.toString() }, + block: { number: BlockNumber.ZERO, hash: new Fr(0).toString() }, + checkpoint: { number: CheckpointNumber.ZERO, hash: new Fr(0).toString() }, }; + /** Sets a tip's number and a matching hash, so the tier reconciliation sees consistent (number, hash) pairs. */ + public setProposed(number: number) { + this.proposed = { number: BlockNumber(number), hash: new Fr(number).toString() }; + } + + public setCheckpointed(blockNumber: number, checkpointNumber: number) { + this.checkpointed = { + block: { number: BlockNumber(blockNumber), hash: new Fr(blockNumber).toString() }, + checkpoint: makeTipCheckpointId(checkpointNumber), + }; + } + + public setProven(blockNumber: number) { + this.proven = { + block: { number: BlockNumber(blockNumber), hash: new Fr(blockNumber).toString() }, + checkpoint: makeTipCheckpointId(blockNumber), + }; + } + + public setFinalized(blockNumber: number) { + this.finalized = { + block: { number: BlockNumber(blockNumber), hash: new Fr(blockNumber).toString() }, + checkpoint: makeTipCheckpointId(blockNumber), + }; + } + public getL2BlockHash(number: number): Promise { return Promise.resolve( number > this.proposed.number ? undefined : (this.blockHashes[number] ?? new Fr(number).toString()), @@ -1919,9 +722,9 @@ class TestL2BlockStreamLocalDataProvider implements L2BlockStreamLocalDataProvid class TestLocalChainTipsProvider implements L2BlockStreamLocalDataProvider { public readonly blockHashes: Record = {}; - public proposed = { number: BlockNumber.ZERO, hash: GENESIS_BLOCK_HEADER_HASH.toString() }; - public proven = { block: { number: BlockNumber.ZERO, hash: GENESIS_BLOCK_HEADER_HASH.toString() } }; - public finalized = { block: { number: BlockNumber.ZERO, hash: GENESIS_BLOCK_HEADER_HASH.toString() } }; + public proposed = { number: BlockNumber.ZERO, hash: new Fr(0).toString() }; + public proven = { block: { number: BlockNumber.ZERO, hash: new Fr(0).toString() } }; + public finalized = { block: { number: BlockNumber.ZERO, hash: new Fr(0).toString() } }; public getL2BlockHash(number: number): Promise { return Promise.resolve( diff --git a/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.ts b/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.ts index 3d5280e1684d..cd8fd2bef027 100644 --- a/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.ts +++ b/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.ts @@ -1,14 +1,18 @@ -import { BlockNumber, CheckpointNumber } from '@aztec/foundation/branded-types'; +import { BlockNumber } from '@aztec/foundation/branded-types'; import { AbortError } from '@aztec/foundation/error'; import { createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; -import type { PublishedCheckpoint } from '../../checkpoint/published_checkpoint.js'; -import { type L2BlockId, type L2BlockSource, makeL2BlockId } from '../l2_block_source.js'; -import type { L2BlockStreamEvent, L2BlockStreamEventHandler, L2BlockStreamLocalDataProvider } from './interfaces.js'; +import { type L2BlockId, type L2BlockSource, type L2TipId, makeL2BlockId } from '../l2_block_source.js'; +import type { + L2BlockStreamEvent, + L2BlockStreamEventHandler, + L2BlockStreamLocalDataProvider, + LocalL2BlockId, +} from './interfaces.js'; -/** Maximum number of checkpoints to prefetch at once during sync. Matches MAX_RPC_CHECKPOINTS_LEN. */ -export const CHECKPOINT_PREFETCH_LIMIT = 50; +/** Subset of the block source the stream depends on. Checkpoint payloads are no longer fetched here. */ +type L2BlockStreamSource = Pick; /** Creates a stream of events for new blocks, chain tips updates, and reorgs, out of polling an archiver or a node. */ export class L2BlockStream { @@ -17,7 +21,7 @@ export class L2BlockStream { private hasStarted = false; constructor( - private l2BlockSource: Pick, + private l2BlockSource: L2BlockStreamSource, private localData: L2BlockStreamLocalDataProvider, private handler: L2BlockStreamEventHandler, private readonly log = createLogger('types:block_stream'), @@ -27,10 +31,8 @@ export class L2BlockStream { startingBlock?: number; /** Instead of downloading all blocks, only fetch the smallest subset that results in reliable reorg detection. */ skipFinalized?: boolean; - /** When true, checkpoint events will not be emitted. Blocks are still fetched via checkpoints but only blocks-added events are emitted. */ + /** When true, checkpoint events will not be emitted. Blocks are still fetched but only blocks-added events are emitted. */ ignoreCheckpoints?: boolean; - /** Maximum number of checkpoints to prefetch at once during sync. Defaults to CHECKPOINT_PREFETCH_LIMIT (50). */ - checkpointPrefetchLimit?: number; } = {}, ) { // Note that RunningPromise is in stopped state by default. This promise won't run until someone invokes `start`, @@ -100,6 +102,7 @@ export class L2BlockStream { latestBlockNumber--; } + let pruned = false; if (latestBlockNumber < localTips.proposed.number) { latestBlockNumber = BlockNumber(Math.min(latestBlockNumber, sourceTips.proposed.number)); // see #13471 const hash = sourceCache.get(latestBlockNumber) ?? (await this.getBlockHashFromSource(latestBlockNumber)); @@ -115,149 +118,34 @@ export class L2BlockStream { checkpointed: sourceTips.checkpointed, proven: sourceTips.proven, }); + pruned = true; } - // If we are just starting, use the starting block number from the options. - const startingBlock = this.opts.startingBlock !== undefined ? BlockNumber(this.opts.startingBlock) : undefined; - if (latestBlockNumber === 0 && startingBlock !== undefined) { - latestBlockNumber = BlockNumber(Math.max(startingBlock - 1, 0)); - } - - // Only log this entry once (for sanity) - if (!this.hasStarted) { - this.log.verbose(`Starting sync from block number ${latestBlockNumber}`); - this.hasStarted = true; - } - + // The post-prune cursor: the highest block number both sides agree on. Block downloads resume from here. let nextBlockNumber = latestBlockNumber + 1; - // When checkpoints are ignored the local provider may omit `checkpointed`; in that case the fallback to - // CheckpointNumber.ZERO is harmless because `nextCheckpointToEmit` is never consumed for emission (Loop 1 and - // the startingBlock/skipFinalized adjustments below only feed checkpoint emission, which is gated off). - let nextCheckpointToEmit = CheckpointNumber( - (localTips.checkpointed?.checkpoint.number ?? CheckpointNumber.ZERO) + 1, - ); - // When startingBlock is set, also skip ahead for checkpoints. - if ( - startingBlock !== undefined && - startingBlock >= 1 && - nextCheckpointToEmit <= sourceTips.checkpointed.checkpoint.number - ) { - if (startingBlock > sourceTips.checkpointed.block.number) { - // startingBlock is past all checkpointed blocks; skip Loop 1 entirely. - nextCheckpointToEmit = CheckpointNumber(sourceTips.checkpointed.checkpoint.number + 1); - } else { - const startingBlockData = await this.l2BlockSource.getBlockData({ number: startingBlock }); - if (startingBlockData) { - nextCheckpointToEmit = CheckpointNumber(Math.max(nextCheckpointToEmit, startingBlockData.checkpointNumber)); - } - } + // If we are just starting from a fresh local store, fast-forward the download cursor to the configured + // starting block so we skip the history the consumer doesn't care about. + const startingBlock = this.opts.startingBlock !== undefined ? BlockNumber(this.opts.startingBlock) : undefined; + if (latestBlockNumber === 0 && startingBlock !== undefined) { + nextBlockNumber = Math.max(startingBlock, 1); } if (this.opts.skipFinalized) { // When skipping finalized blocks we need to provide reliable reorg detection while fetching as few blocks as // possible. Finalized blocks cannot be reorged by definition, so we can skip most of them. We do need the very // last finalized block however in order to guarantee that we will eventually find a block in which our local - // store matches the source. - // If the last finalized block is behind our local tip, there is nothing to skip. + // store matches the source. If the last finalized block is behind our local tip, there is nothing to skip. nextBlockNumber = Math.max(sourceTips.finalized.block.number, nextBlockNumber); - // If the next checkpoint to emit is behind the finalized tip then skip forward - nextCheckpointToEmit = CheckpointNumber(Math.max(nextCheckpointToEmit, sourceTips.finalized.checkpoint.number)); - } - - // Loop 1: Emit checkpoint events for checkpoints whose blocks are already in local storage. - // This handles the case where blocks were synced as uncheckpointed and later became checkpointed. - // The guard `lastBlockInCheckpoint.number > localTips.proposed.number` ensures we don't emit - // checkpoints for blocks we don't have (e.g., when startingBlock skips earlier blocks). - // Since only one checkpoint can ever be uncheckpointed, this loop should iterate at most once. - if (!this.opts.ignoreCheckpoints) { - let loop1Iterations = 0; - while (nextCheckpointToEmit <= sourceTips.checkpointed.checkpoint.number) { - const checkpoints = await this.l2BlockSource.getCheckpoints({ from: nextCheckpointToEmit, limit: 1 }); - if (checkpoints.length === 0) { - break; - } - const lastBlockInCheckpoint = checkpoints[0].checkpoint.blocks.at(-1)!; - // If this checkpoint has blocks we haven't seen yet, stop - they need to be fetched first - if (lastBlockInCheckpoint.number > localTips.proposed.number) { - break; - } - loop1Iterations++; - if (loop1Iterations > 1) { - this.log.warn( - `Emitting multiple checkpoints (${loop1Iterations}) for already-local blocks. ` + - `Next checkpoint: ${nextCheckpointToEmit}, source checkpointed: ${sourceTips.checkpointed.checkpoint.number}`, - ); - } - const lastBlockHash = await lastBlockInCheckpoint.hash(); - await this.emitEvent({ - type: 'chain-checkpointed', - checkpoint: checkpoints[0], - block: makeL2BlockId(lastBlockInCheckpoint.number, lastBlockHash.toString()), - }); - nextCheckpointToEmit = CheckpointNumber(nextCheckpointToEmit + 1); - } } - // Loop 2: Fetch new checkpointed blocks. For each checkpoint, emit all blocks - // from that checkpoint that we need, then emit the checkpoint event. - // We prefetch multiple checkpoints, then process them one by one. - let prefetchedCheckpoints: PublishedCheckpoint[] = []; - let prefetchIdx = 0; - let nextCheckpointNumber: CheckpointNumber | undefined; - - // Find the starting checkpoint number - if (nextBlockNumber <= sourceTips.checkpointed.block.number) { - const blockData = await this.l2BlockSource.getBlockData({ number: BlockNumber(nextBlockNumber) }); - if (blockData) { - nextCheckpointNumber = blockData.checkpointNumber; - } - } - - while (nextBlockNumber <= sourceTips.checkpointed.block.number && nextCheckpointNumber !== undefined) { - // Refill the prefetch buffer when exhausted - if (prefetchIdx >= prefetchedCheckpoints.length) { - const prefetchLimit = this.opts.checkpointPrefetchLimit ?? CHECKPOINT_PREFETCH_LIMIT; - prefetchedCheckpoints = await this.l2BlockSource.getCheckpoints({ - from: nextCheckpointNumber, - limit: prefetchLimit, - }); - prefetchIdx = 0; - if (prefetchedCheckpoints.length === 0) { - break; - } - } - - const checkpoint = prefetchedCheckpoints[prefetchIdx]!; - - // Get all blocks from this checkpoint that we need, respecting batchSize - const limit = Math.min(this.opts.batchSize ?? 50, sourceTips.checkpointed.block.number - nextBlockNumber + 1); - const blocksForCheckpoint = checkpoint.checkpoint.blocks - .filter(b => b.number >= nextBlockNumber) - .slice(0, limit); - if (blocksForCheckpoint.length === 0) { - break; - } - await this.emitEvent({ type: 'blocks-added', blocks: blocksForCheckpoint }); - nextBlockNumber = blocksForCheckpoint.at(-1)!.number + 1; - - // If we've reached the end of this checkpoint, emit the checkpoint event and move to next - const lastBlockInCheckpoint = checkpoint.checkpoint.blocks.at(-1)!; - if (nextBlockNumber > lastBlockInCheckpoint.number) { - if (!this.opts.ignoreCheckpoints) { - const lastBlockHash = await lastBlockInCheckpoint.hash(); - await this.emitEvent({ - type: 'chain-checkpointed', - checkpoint, - block: makeL2BlockId(lastBlockInCheckpoint.number, lastBlockHash.toString()), - }); - } - prefetchIdx++; - nextCheckpointNumber = CheckpointNumber(nextCheckpointNumber + 1); - } + // Only log this entry once (for sanity) + if (!this.hasStarted) { + this.log.verbose(`Starting sync from block number ${nextBlockNumber - 1}`); + this.hasStarted = true; } - // Loop 3: Fetch any remaining uncheckpointed (proposed) blocks. + // Download every block up to the source's proposed tip, batched by `batchSize`. while (nextBlockNumber <= sourceTips.proposed.number) { const limit = Math.min(this.opts.batchSize ?? 50, sourceTips.proposed.number - nextBlockNumber + 1); this.log.trace(`Requesting blocks from ${nextBlockNumber} limit ${limit}`); @@ -269,15 +157,29 @@ export class L2BlockStream { nextBlockNumber = blocks.at(-1)!.number + 1; } - // Update the proven and finalized tips. - if (localTips.proven !== undefined && sourceTips.proven.block.number !== localTips.proven.block.number) { + // End-of-pass tier reconciliation. For each tier, emit a single event iff the source tip differs from the + // local one. All three source tips come from the SAME `sourceTips` snapshot, so no extra source fetches are + // needed. We re-read the local tips after a prune so the comparison reflects the cursors the prune handler + // just clamped (the initial `localTips` snapshot is stale post-prune — finding 2). + const reconcileTips = pruned ? await this.localData.getL2Tips() : localTips; + if (!this.opts.ignoreCheckpoints && this.tipDiffers(reconcileTips.checkpointed?.block, sourceTips.checkpointed)) { + await this.emitEvent({ + type: 'chain-checkpointed', + block: sourceTips.checkpointed.block, + checkpoint: sourceTips.checkpointed.checkpoint, + }); + } + if (reconcileTips.proven !== undefined && this.tipDiffers(reconcileTips.proven.block, sourceTips.proven)) { await this.emitEvent({ type: 'chain-proven', block: sourceTips.proven.block, checkpoint: sourceTips.proven.checkpoint, }); } - if (localTips.finalized !== undefined && sourceTips.finalized.block.number !== localTips.finalized.block.number) { + if ( + reconcileTips.finalized !== undefined && + this.tipDiffers(reconcileTips.finalized.block, sourceTips.finalized) + ) { await this.emitEvent({ type: 'chain-finalized', block: sourceTips.finalized.block, @@ -292,6 +194,25 @@ export class L2BlockStream { } } + /** + * Returns whether the source tip differs from the local one and therefore warrants a tier event. Compares block + * number and, when both hashes are known, block hash. The hash comparison is skipped when the local hash is + * undefined or missing: world-state legitimately reports `undefined` hashes for tips ahead of its synced range, + * and comparing against an undefined hash would re-emit the event on every poll. + */ + private tipDiffers(localBlock: LocalL2BlockId | undefined, sourceTip: L2TipId): boolean { + if (localBlock === undefined) { + return true; + } + if (sourceTip.block.number !== localBlock.number) { + return true; + } + if (localBlock.hash === undefined) { + return false; + } + return sourceTip.block.hash !== localBlock.hash; + } + /** * Returns whether the source and local agree on the block hash at a given height. * @param blockNumber - The block number to test. @@ -327,7 +248,13 @@ export class L2BlockStream { private async emitEvent(event: L2BlockStreamEvent) { this.log.debug( - `Emitting ${event.type} (${event.type === 'blocks-added' ? event.blocks.length : event.type === 'chain-checkpointed' ? event.checkpoint.checkpoint.number : event.block.number})`, + `Emitting ${event.type} (${ + event.type === 'blocks-added' + ? event.blocks.length + : event.type === 'chain-checkpointed' + ? event.checkpoint.number + : event.block.number + })`, ); await this.handler.handleBlockStreamEvent(event); if (!this.isRunning() && !this.isSyncing) { diff --git a/yarn-project/stdlib/src/block/l2_block_stream/l2_tips_store_base.ts b/yarn-project/stdlib/src/block/l2_block_stream/l2_tips_store_base.ts index 37a576424b29..4a2a317e18bb 100644 --- a/yarn-project/stdlib/src/block/l2_block_stream/l2_tips_store_base.ts +++ b/yarn-project/stdlib/src/block/l2_block_stream/l2_tips_store_base.ts @@ -153,12 +153,8 @@ export abstract class L2TipsStoreBase implements L2BlockStreamEventHandler, L2Bl return; } await this.runInTransaction(async () => { - const checkpointId: CheckpointId = { - number: event.checkpoint.checkpoint.number, - hash: event.checkpoint.checkpoint.hash().toString(), - }; await this.saveTag('checkpointed', event.block); - await this.setTipCheckpoint('checkpointed', checkpointId); + await this.setTipCheckpoint('checkpointed', event.checkpoint); }); } diff --git a/yarn-project/stdlib/src/block/test/l2_tips_store_test_suite.ts b/yarn-project/stdlib/src/block/test/l2_tips_store_test_suite.ts index 7335058c452f..e8a928e5fb3d 100644 --- a/yarn-project/stdlib/src/block/test/l2_tips_store_test_suite.ts +++ b/yarn-project/stdlib/src/block/test/l2_tips_store_test_suite.ts @@ -94,14 +94,18 @@ export function testL2TipsStore(makeTipsStore: () => Promise) { return new PublishedCheckpoint(checkpoint, L1PublishedData.random(), []); }; - /** Creates a chain-checkpointed event with the required block field */ - const makeCheckpointedEvent = async (checkpoint: PublishedCheckpoint) => { - const lastBlock = checkpoint.checkpoint.blocks.at(-1)!; + /** Creates a thin chain-checkpointed event carrying the block + checkpoint ids of the checkpoint's last block. */ + const makeCheckpointedEvent = async (published: PublishedCheckpoint) => { + const lastBlock = published.checkpoint.blocks.at(-1)!; const blockId: L2BlockId = { number: lastBlock.number, hash: (await lastBlock.hash()).toString(), }; - return { type: 'chain-checkpointed' as const, checkpoint, block: blockId }; + const checkpointId = { + number: published.checkpoint.number, + hash: published.checkpoint.hash().toString(), + }; + return { type: 'chain-checkpointed' as const, checkpoint: checkpointId, block: blockId }; }; it('returns zero if no tips are stored', async () => { diff --git a/yarn-project/telemetry-client/src/wrappers/l2_block_stream.ts b/yarn-project/telemetry-client/src/wrappers/l2_block_stream.ts index 90b58b6a5a62..c6f49966cdb5 100644 --- a/yarn-project/telemetry-client/src/wrappers/l2_block_stream.ts +++ b/yarn-project/telemetry-client/src/wrappers/l2_block_stream.ts @@ -11,7 +11,7 @@ import { type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client' /** Extends an L2BlockStream with a tracer to create a new trace per iteration. */ export class TraceableL2BlockStream extends L2BlockStream implements Traceable { constructor( - l2BlockSource: Pick, + l2BlockSource: Pick, localData: L2BlockStreamLocalDataProvider, handler: L2BlockStreamEventHandler, public readonly tracer: Tracer, From a4d419a4d7a2117232a72f0a50a925c54bcebe7c Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Wed, 10 Jun 2026 22:24:51 -0300 Subject: [PATCH 2/4] refactor(prover-node): process checkpoint jumps from a cursor The L2BlockStream now delivers a single thin chain-checkpointed tip event per pass instead of one fat event per checkpoint, so the prover-node drives the catch-up itself: on a tip event it walks every checkpoint between its cursor and the reported tip, fetching light getCheckpointsData metadata first to decide relevance, then heavy getCheckpoint only for checkpoints in epochs that can still be proven. Whole unprovable epochs (fully proven / past the proof-submission window) are skipped; individual checkpoints inside a provable epoch are never skipped, honoring the SessionManager full-coverage contract. The cursor seeds at start() from the last checkpoint of the last fully-proven epoch (or 0), so a restart reprocesses the partially-proven epoch rather than trusting a checkpointed tip that may lead the last proven checkpoint. It advances one checkpoint at a time and only after both checkpointStore.addOrUpdate and sessionManager.onCheckpointAdded succeed, preserving the A-1041 at-least-once semantics (a mid-jump failure leaves the cursor behind to retry). chain-pruned clamps the cursor down to the post-prune checkpointed tip. checkEpochExpiry() now also runs from a periodic ticker, since the thin once-per-pass event no longer drives it per checkpoint and idle periods would otherwise stall expiry. Co-Authored-By: Claude Fable 5 --- .../prover-node/src/prover-node.test.ts | 100 +++++++++------- yarn-project/prover-node/src/prover-node.ts | 107 +++++++++++++++--- 2 files changed, 147 insertions(+), 60 deletions(-) diff --git a/yarn-project/prover-node/src/prover-node.test.ts b/yarn-project/prover-node/src/prover-node.test.ts index b9e23f2846c0..ba7cdd7f553c 100644 --- a/yarn-project/prover-node/src/prover-node.test.ts +++ b/yarn-project/prover-node/src/prover-node.test.ts @@ -77,6 +77,7 @@ describe('ProverNode', () => { // exercise. proverNode.setSessionManager(sessionManager); proverNode.setPublishingService(publishingService); + mined.clear(); }); // ---------------- event dispatch ---------------- @@ -87,14 +88,9 @@ describe('ProverNode', () => { checkpoint: { number: CheckpointNumber(n), hash: `0x0${n}` }, }); - it('dispatches chain-checkpointed to handleCheckpointEvent', async () => { + it('dispatches chain-checkpointed: catches up and registers the checkpoint', async () => { setupNotFullyProven(); - const checkpoint = makeCheckpoint(1, 1, 1); - const event: L2BlockStreamEvent = { - type: 'chain-checkpointed', - checkpoint: makePublishedCheckpoint(checkpoint), - block: { number: BlockNumber(1), hash: '0x01' }, - }; + const event = mineCheckpoint(makeCheckpoint(1, 1, 1)); await proverNode.handleBlockStreamEvent(event); @@ -114,11 +110,7 @@ describe('ProverNode', () => { // Register a checkpoint, then prune. setupNotFullyProven(); - await proverNode.handleBlockStreamEvent({ - type: 'chain-checkpointed', - checkpoint: makePublishedCheckpoint(makeCheckpoint(2, 2, 2)), - block: { number: BlockNumber(2), hash: '0x02' }, - }); + await proverNode.handleBlockStreamEvent(mineCheckpoint(makeCheckpoint(2, 2, 2))); await proverNode.handleBlockStreamEvent({ type: 'chain-pruned', @@ -207,18 +199,16 @@ describe('ProverNode', () => { // the tips stay put for the L2BlockStream to retry. worldState.syncImmediate.mockRejectedValue(new Error('boom')); - const event: L2BlockStreamEvent = { - type: 'chain-checkpointed', - checkpoint: makePublishedCheckpoint(makeCheckpoint(1, 1, 1)), - block: { number: BlockNumber(1), hash: '0x01' }, - }; + const event = mineCheckpoint(makeCheckpoint(1, 1, 1)); await expect(proverNode.handleBlockStreamEvent(event)).rejects.toThrow('boom'); - // Tips left unadvanced; nothing was registered and the session manager wasn't notified. + // Tips left unadvanced; nothing was registered, the session manager wasn't notified, and the catch-up + // cursor stays behind so the next pass retries this checkpoint. expect(await proverNode.getTipsStore().getL2BlockHash(1)).toBeUndefined(); expect(proverNode.getCheckpointStore().listAll()).toHaveLength(0); expect(sessionManager.onCheckpointAdded).not.toHaveBeenCalled(); + expect(proverNode.getLastProcessedCheckpoint()).toEqual(CheckpointNumber.ZERO); }); it('leaves the tips store unadvanced when a handler propagates an error (A-1041)', async () => { @@ -227,11 +217,7 @@ describe('ProverNode', () => { // tips-store update, so the error surfaces to the L2BlockStream and the tips stay put. l2BlockSource.getSyncedL2SlotNumber.mockRejectedValue(new Error('archiver down')); - const event: L2BlockStreamEvent = { - type: 'chain-checkpointed', - checkpoint: makePublishedCheckpoint(makeCheckpoint(1, 1, 1)), - block: { number: BlockNumber(1), hash: '0x01' }, - }; + const event = mineCheckpoint(makeCheckpoint(1, 1, 1)); await expect(proverNode.handleBlockStreamEvent(event)).rejects.toThrow('archiver down'); @@ -251,25 +237,19 @@ describe('ProverNode', () => { ); l2BlockSource.isEpochComplete.mockResolvedValue(true); - await proverNode.handleBlockStreamEvent({ - type: 'chain-checkpointed', - checkpoint: makePublishedCheckpoint(makeCheckpoint(1, 1, 1)), - block: { number: BlockNumber(1), hash: '0x01' }, - }); + await proverNode.handleBlockStreamEvent(mineCheckpoint(makeCheckpoint(1, 1, 1))); expect(proverNode.getCheckpointStore().listAll().length).toBe(0); expect(sessionManager.onCheckpointAdded).not.toHaveBeenCalled(); + // The whole-epoch skip still advances the cursor so we don't re-evaluate it next pass. + expect(proverNode.getLastProcessedCheckpoint()).toEqual(CheckpointNumber(1)); }); it('content-addresses the prover by the checkpoint archive root', async () => { setupNotFullyProven(); const archiveRoot = Fr.random(); - await proverNode.handleBlockStreamEvent({ - type: 'chain-checkpointed', - checkpoint: makePublishedCheckpoint(makeCheckpoint(1, 1, 2, archiveRoot)), - block: { number: BlockNumber(2), hash: '0x02' }, - }); + await proverNode.handleBlockStreamEvent(mineCheckpoint(makeCheckpoint(1, 1, 2, archiveRoot))); const prover = proverNode.getCheckpointStore().listAll()[0]; expect(prover.id).toContain(archiveRoot.toString()); @@ -370,16 +350,8 @@ describe('ProverNode', () => { setupRegistrationSuccess(); // Register two checkpoints at slots 6 and 7 (both in epoch 3). - await proverNode.handleBlockStreamEvent({ - type: 'chain-checkpointed', - checkpoint: makePublishedCheckpoint(makeCheckpoint(1, 6, 6)), - block: { number: BlockNumber(6), hash: '0x06' }, - }); - await proverNode.handleBlockStreamEvent({ - type: 'chain-checkpointed', - checkpoint: makePublishedCheckpoint(makeCheckpoint(2, 7, 7)), - block: { number: BlockNumber(7), hash: '0x07' }, - }); + await proverNode.handleBlockStreamEvent(mineCheckpoint(makeCheckpoint(1, 6, 6))); + await proverNode.handleBlockStreamEvent(mineCheckpoint(makeCheckpoint(2, 7, 7))); expect(proverNode.getCheckpointStore().listAll().length).toBe(2); // Pruning above checkpoint 0 marks both as pruned — onPrune must receive [EpochNumber(3)], @@ -617,6 +589,44 @@ describe('ProverNode', () => { function makePublishedCheckpoint(checkpoint: Checkpoint): PublishedCheckpoint { return { checkpoint, attestations: [] } as unknown as PublishedCheckpoint; } + + /** Registry of mined checkpoints. */ + const mined = new Map(); + + /** + * Registers `checkpoint` with the block source mocks: its light metadata is returned by `getCheckpointsData` + * range queries, and its full payload by `getCheckpoint({ number })`. Returns the thin `chain-checkpointed` + * tip event that points at it — the block stream now delivers only the tip, and the prover-node fetches + * everything between its cursor and the tip itself. + */ + function mineCheckpoint(checkpoint: Checkpoint): L2BlockStreamEvent { + mined.set(Number(checkpoint.number), checkpoint); + l2BlockSource.getCheckpoint.mockImplementation((query: any) => + Promise.resolve('number' in query ? makeMaybePublished(mined.get(Number(query.number))) : undefined), + ); + l2BlockSource.getCheckpointsData.mockImplementation((query: any) => { + if (!('from' in query)) { + return Promise.resolve([]); + } + const data = []; + for (let n = Number(query.from); n < Number(query.from) + query.limit; n++) { + const cp = mined.get(n); + if (cp) { + data.push({ checkpointNumber: cp.number, header: cp.header } as any); + } + } + return Promise.resolve(data); + }); + return { + type: 'chain-checkpointed', + block: { number: BlockNumber(checkpoint.blocks[0].number), hash: '0x01' }, + checkpoint: { number: checkpoint.number, hash: checkpoint.hash().toString() }, + }; + } + + function makeMaybePublished(checkpoint: Checkpoint | undefined): PublishedCheckpoint | undefined { + return checkpoint ? makePublishedCheckpoint(checkpoint) : undefined; + } }); /** ProverNode subclass that exposes hooks for injecting a mocked SessionManager + reads. */ @@ -660,4 +670,8 @@ class TestProverNode extends ProverNode { public getLastExpiredEpoch(): EpochNumber | undefined { return this.lastExpiredEpoch; } + + public getLastProcessedCheckpoint(): CheckpointNumber { + return this.lastProcessedCheckpoint; + } } diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index 9e5b7fbd7508..400388286974 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -5,6 +5,7 @@ import { BlockNumber, CheckpointNumber, EpochNumber } from '@aztec/foundation/br import { assertRequired, compact, pick } from '@aztec/foundation/collection'; import { memoize } from '@aztec/foundation/decorators'; import { createLogger } from '@aztec/foundation/log'; +import { RunningPromise } from '@aztec/foundation/running-promise'; import { DateProvider, executeTimeout } from '@aztec/foundation/timer'; import type { EpochProverFactory } from '@aztec/prover-client'; import { getLastSiblingPath } from '@aztec/prover-client/helpers'; @@ -95,6 +96,17 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra */ protected lastExpiredEpoch: EpochNumber | undefined; + /** + * Highest checkpoint number whose proving-side handling has completed (or that was legitimately skipped). + * The catch-up loop walks from here to each `chain-checkpointed` tip event. Seeded at start() from the last + * checkpoint of the last fully-proven epoch (or 0), so a restart reprocesses the partially-proven epoch rather + * than trusting a checkpointed tip that may sit ahead of unproven checkpoints. Clamped down on a prune. + */ + protected lastProcessedCheckpoint: CheckpointNumber = CheckpointNumber.ZERO; + + /** Periodic tick that runs the epoch-expiry sweep during idle periods when no block-stream events arrive. */ + private expiryTicker: RunningPromise | undefined; + public readonly tracer: Tracer; protected publishingService: ProofPublishingService | undefined; @@ -219,7 +231,7 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra public async handleBlockStreamEvent(event: L2BlockStreamEvent): Promise { switch (event.type) { case 'chain-checkpointed': - await this.handleCheckpointEvent(event.checkpoint); + await this.processCheckpointJump(event.checkpoint.number); break; case 'chain-pruned': await this.handlePruneEvent(event.checkpointed.checkpoint); @@ -239,32 +251,64 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra await this.tipsStore.handleBlockStreamEvent(event); } - /** Register a new checkpoint with the store and notify the session manager. */ - private async handleCheckpointEvent(publishedCheckpoint: PublishedCheckpoint) { - const checkpoint = publishedCheckpoint.checkpoint; - const slotNumber = checkpoint.header.slotNumber; - const l1Constants = await this.getL1Constants(); - const epochNumber = getEpochAtSlot(slotNumber, l1Constants); - - if (await this.isEpochFullyProven(epochNumber, l1Constants)) { - this.log.debug(`Skipping checkpoint ${checkpoint.number} for already-proven epoch ${epochNumber}`); + /** + * Walks every checkpoint between the local cursor and the newly-reported checkpointed tip, registering + * each one that belongs to an epoch that can still be proven. The block stream now delivers a single thin + * `chain-checkpointed` tip event per pass rather than one fat event per checkpoint, so this drives the + * catch-up itself: light metadata first (`getCheckpointsData`) to decide relevance per epoch, then a heavy + * `getCheckpoints` fetch only for checkpoints in provable epochs. + * + * The cursor advances one checkpoint at a time and only after that checkpoint's proving-side handling has + * fully succeeded, preserving the A-1041 at-least-once semantics: a mid-jump failure leaves the cursor + * behind so the next pass retries from the first checkpoint that did not complete. + */ + private async processCheckpointJump(targetCheckpoint: CheckpointNumber): Promise { + if (targetCheckpoint <= this.lastProcessedCheckpoint) { return; } + const from = CheckpointNumber(this.lastProcessedCheckpoint + 1); + const limit = Number(targetCheckpoint - from) + 1; + const metadatas = await this.l2BlockSource.getCheckpointsData({ from, limit }); + const l1Constants = await this.getL1Constants(); - if (await this.isEpochPastProofSubmissionWindow(epochNumber, l1Constants)) { - this.log.debug( - `Skipping checkpoint ${checkpoint.number} for epoch ${epochNumber} past its proof-submission window`, - ); - return; + // Per-epoch relevance is cached so a multi-checkpoint epoch resolves it once. Skipping is whole-epoch + // only: the SessionManager requires an epoch's checkpoints fully covered before it opens a session, so we + // never drop an individual checkpoint inside an epoch we will prove. + const epochSkippable = new Map(); + for (const metadata of metadatas) { + const epochNumber = getEpochAtSlot(metadata.header.slotNumber, l1Constants); + let skippable = epochSkippable.get(epochNumber); + if (skippable === undefined) { + skippable = + (await this.isEpochFullyProven(epochNumber, l1Constants)) || + (await this.isEpochPastProofSubmissionWindow(epochNumber, l1Constants)); + epochSkippable.set(epochNumber, skippable); + } + if (skippable) { + this.log.debug(`Skipping checkpoint ${metadata.checkpointNumber} for unprovable epoch ${epochNumber}`); + } else { + await this.registerCheckpoint(metadata.checkpointNumber, epochNumber); + } + // Advance only after the checkpoint's handling succeeded (or it was legitimately skipped). registerCheckpoint + // throws on failure, which leaves the cursor here for the next pass to retry (A-1041). + this.lastProcessedCheckpoint = metadata.checkpointNumber; } + } + /** Heavy-fetch a single checkpoint, register it with the store, and notify the session manager. */ + private async registerCheckpoint(checkpointNumber: CheckpointNumber, epochNumber: EpochNumber): Promise { + const published = await this.l2BlockSource.getCheckpoint({ number: checkpointNumber }); + if (!published) { + throw new Error(`Checkpoint ${checkpointNumber} not found in block source during catch-up`); + } + const checkpoint = published.checkpoint; this.log.info(`New checkpoint ${checkpoint.number} for epoch ${epochNumber}`, { checkpointNumber: checkpoint.number, epochNumber, - slotNumber, + slotNumber: checkpoint.header.slotNumber, }); - const registerData = await this.collectRegisterData(checkpoint, publishedCheckpoint.attestations); + const registerData = await this.collectRegisterData(checkpoint, published.attestations); await this.checkpointStore.addOrUpdate(checkpoint, registerData); await this.sessionManager?.onCheckpointAdded(epochNumber); } @@ -298,6 +342,11 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra /** Mark every prover above the prune threshold as pruned and notify the session manager. */ private async handlePruneEvent(prunedCheckpoint: { number: CheckpointNumber; hash: string }) { this.log.warn(`Chain pruned to checkpoint ${prunedCheckpoint.number}`, { prunedCheckpoint }); + // Clamp the catch-up cursor down to the (post-prune) checkpointed tip so reprocessing resumes from the + // first checkpoint above the prune target rather than from a stale, now-orphaned cursor. + if (this.lastProcessedCheckpoint > prunedCheckpoint.number) { + this.lastProcessedCheckpoint = prunedCheckpoint.number; + } const affected = this.checkpointStore.markPrunedAfter(prunedCheckpoint.number); if (affected.length === 0) { return; @@ -411,12 +460,22 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra const { startingBlock, lastFullyProvenEpoch } = await this.computeStartupState(); this.lastExpiredEpoch = lastFullyProvenEpoch; + this.lastProcessedCheckpoint = await this.computeStartingCheckpoint(lastFullyProvenEpoch); this.blockStream = new L2BlockStream(this.l2BlockSource, this.tipsStore, this, this.log, { pollIntervalMS: this.config.proverNodePollingIntervalMs, startingBlock, }); this.blockStream.start(); + // With thin once-per-pass tip events, the expiry sweep no longer fires once per checkpoint; drive it + // from a periodic tick so epochs still expire during idle/no-event periods. + this.expiryTicker = new RunningPromise( + () => this.checkEpochExpiry(), + this.log, + this.config.proverNodePollingIntervalMs, + ); + this.expiryTicker.start(); + await this.rewardsMetrics.start(); this.l1Metrics.start(); this.log.info(`Started Prover Node with prover id ${this.prover.getProverId().toString()}`, this.config); @@ -426,6 +485,7 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra this.log.info('Stopping ProverNode'); this.jobMetrics.stopObservingState(); await this.blockStream?.stop(); + await this.expiryTicker?.stop(); if (this.sessionManager) { await this.sessionManager.stop(); } @@ -586,6 +646,19 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra return { startingBlock: firstBlockOfEpoch, lastFullyProvenEpoch }; } + /** + * Resolves the catch-up cursor seed: the last checkpoint of the last fully-proven epoch, or 0 if none. Seeding + * from a checkpoint (rather than a checkpointed tip) guarantees a restart reprocesses every checkpoint of the + * partially-proven epoch, since the checkpointed tip can sit ahead of the last fully-proven checkpoint. + */ + protected async computeStartingCheckpoint(lastFullyProvenEpoch: EpochNumber | undefined): Promise { + if (lastFullyProvenEpoch === undefined) { + return CheckpointNumber.ZERO; + } + const checkpoints = await this.l2BlockSource.getCheckpointsData({ epoch: lastFullyProvenEpoch }); + return checkpoints.at(-1)?.checkpointNumber ?? CheckpointNumber.ZERO; + } + private async gatherPreviousBlockHeader(previousBlockNumber: number) { const data = await this.l2BlockSource.getBlockData({ number: BlockNumber(previousBlockNumber) }); if (!data?.header) { From 2507b5837ff2a05c8e6241eaad7f1f706889cded Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Wed, 10 Jun 2026 22:25:11 -0300 Subject: [PATCH 3/4] refactor(sentinel): drop the block stream, fetch checkpoints on demand The sentinel's only use of its L2BlockStream was to feed a private slotNumberToCheckpoint map read solely by getSlotActivity, plus a local tips mirror for the p2p-synced gate. Delete the stream, its L2TipsMemoryStore, the map and its pruning, and the manual sync() in work(). getSlotActivity now fetches archiver.getCheckpoint({ slot }) on demand (computing checkpointNumber/archive/proposalPayloadHash/attestors as handleCheckpoint did), and the p2p-synced gate reads archiver.getL2Tips() directly. On-demand reads are always against the canonical chain, which also fixes a latent reorg bug: the map had no chain-pruned handling, so a reorged-out checkpoint's attestation entry lingered and could credit attestors on a non-canonical checkpoint. Co-Authored-By: Claude Fable 5 --- .../aztec-node/src/sentinel/README.md | 6 +- .../aztec-node/src/sentinel/sentinel.test.ts | 43 +++---- .../aztec-node/src/sentinel/sentinel.ts | 106 ++++++------------ 3 files changed, 54 insertions(+), 101 deletions(-) diff --git a/yarn-project/aztec-node/src/sentinel/README.md b/yarn-project/aztec-node/src/sentinel/README.md index 5ab68d8d8cff..9f114ea65c0f 100644 --- a/yarn-project/aztec-node/src/sentinel/README.md +++ b/yarn-project/aztec-node/src/sentinel/README.md @@ -17,10 +17,10 @@ The sentinel is one of several watchers registered with the slasher; it does not | Source | What it provides | |---|---| | `EpochCache` | Slot/epoch helpers, committee + proposer for a slot, escape-hatch state | -| `L2BlockSource` (archiver) | Synced slot, `chain-checkpointed` events, block headers | +| `L2BlockSource` (archiver) | Synced slot, `getCheckpoint({ slot })`, `getL2Tips()`, block headers | | `P2PClient` | `getCheckpointAttestationsForSlot(slot, payloadHash)`, `hasBlockProposalsForSlot(slot)` | | `CheckpointReexecutionTracker` | Local re-execution outcome for the proposal at each slot (`valid` / `invalid` / `unvalidated`) — populated by the validator client's `ProposalHandler` | -| L1-checkpointed events | `chain-checkpointed` populates `slotNumberToCheckpoint` with the canonical attestor set | +| L1-confirmed checkpoints | Fetched on demand per slot via `archiver.getCheckpoint({ slot })`, yielding the canonical attestor set | ## Two cadences @@ -49,7 +49,7 @@ For each slot, the proposer is assigned one of six statuses, ranked highest-conf | # | Status | Trigger | Inactive party | |---|---|---|---| -| 6 | `checkpoint-mined` | `slotNumberToCheckpoint.has(slot)` (a checkpoint covering this slot has landed on L1) | Attestors who didn't attest | +| 6 | `checkpoint-mined` | `archiver.getCheckpoint({ slot })` returns a checkpoint (one covering this slot has landed on L1) | Attestors who didn't attest | | 5 | `checkpoint-valid` | `tracker.getOutcomeForSlot(slot) === 'valid'` | Attestors who didn't attest | | 4 | `checkpoint-invalid` | `tracker.getOutcomeForSlot(slot) === 'invalid'` (re-executed and rejected) | Proposer | | 3 | `checkpoint-unvalidated` | `tracker.getOutcomeForSlot(slot) === 'unvalidated'` (validation aborted: missing data, timeout, etc.) | Proposer | diff --git a/yarn-project/aztec-node/src/sentinel/sentinel.test.ts b/yarn-project/aztec-node/src/sentinel/sentinel.test.ts index e2c489e37254..ee95f5af5187 100644 --- a/yarn-project/aztec-node/src/sentinel/sentinel.test.ts +++ b/yarn-project/aztec-node/src/sentinel/sentinel.test.ts @@ -11,7 +11,6 @@ import { GENESIS_BLOCK_HEADER_HASH, L2Block, type L2BlockSource, - type L2BlockStream, getAttestationInfoFromPublishedCheckpoint, } from '@aztec/stdlib/block'; import { @@ -44,7 +43,6 @@ describe('sentinel', () => { let epochCache: MockProxy; let archiver: MockProxy; let p2p: MockProxy; - let blockStream: MockProxy; let reexecutionTracker: CheckpointReexecutionTracker; let kvStore: AztecLMDBStoreV2; @@ -70,7 +68,6 @@ describe('sentinel', () => { archiver = mock(); archiver.getGenesisBlockHash.mockReturnValue(GENESIS_BLOCK_HEADER_HASH); p2p = mock(); - blockStream = mock(); reexecutionTracker = new CheckpointReexecutionTracker(); kvStore = await openTmpStore('sentinel-test'); @@ -100,7 +97,7 @@ describe('sentinel', () => { epochCache.getEpochNow.mockReturnValue(epoch); epochCache.getL1Constants.mockReturnValue(l1Constants); - sentinel = new TestSentinel(epochCache, archiver, p2p, store, reexecutionTracker, config, blockStream); + sentinel = new TestSentinel(epochCache, archiver, p2p, store, reexecutionTracker, config); }); afterEach(async () => { @@ -142,16 +139,22 @@ describe('sentinel', () => { let proposer: EthAddress; let committee: EthAddress[]; - /** Helper to create and emit a chain-checkpointed event */ - const emitCheckpointEvent = async (checkpoint: Checkpoint, checkpointAttestations: CommitteeAttestation[] = []) => { + /** + * Stubs the archiver so the slot's on-demand `getCheckpoint({ slot })` lookup returns this checkpoint, mirroring + * a checkpoint that has landed on L1 covering the slot. + */ + const mineCheckpointForSlot = (checkpoint: Checkpoint, checkpointAttestations: CommitteeAttestation[] = []) => { const published = new PublishedCheckpoint(checkpoint, L1PublishedData.random(), checkpointAttestations); - const lastBlock = checkpoint.blocks.at(-1)!; - const block = { number: lastBlock.number, hash: (await lastBlock.hash()).toString() }; - await sentinel.handleBlockStreamEvent({ type: 'chain-checkpointed', checkpoint: published, block }); + const checkpointSlot = checkpoint.header.slotNumber; + archiver.getCheckpoint.mockImplementation(query => + Promise.resolve('slot' in query && query.slot === checkpointSlot ? published : undefined), + ); return published; }; beforeEach(async () => { + // No L1 checkpoint by default; individual tests mine one via mineCheckpointForSlot. + archiver.getCheckpoint.mockResolvedValue(undefined); signers = times(4, Secp256k1Signer.random); validators = signers.map(signer => signer.address); block = await L2Block.random(BlockNumber(1), { slotNumber: slot }); @@ -165,7 +168,7 @@ describe('sentinel', () => { it('flags checkpoint as mined when L1 has it (case 6)', async () => { // Create a checkpoint with a block at the target slot and emit chain-checkpointed event const checkpoint = await Checkpoint.random(CheckpointNumber(1), { numBlocks: 1, slotNumber: slot }); - await emitCheckpointEvent(checkpoint); + mineCheckpointForSlot(checkpoint); const activity = await sentinel.getSlotActivity(slot, epoch, proposer, committee); expect(activity[proposer.toString()]).toEqual('checkpoint-mined'); @@ -208,7 +211,7 @@ describe('sentinel', () => { it('prefers L1-mined over tracker outcome', async () => { reexecutionTracker.recordOutcome(slot, block.archive.root, 'invalid', CheckpointNumber(1)); const checkpoint = await Checkpoint.random(CheckpointNumber(1), { numBlocks: 1, slotNumber: slot }); - await emitCheckpointEvent(checkpoint); + mineCheckpointForSlot(checkpoint); const activity = await sentinel.getSlotActivity(slot, epoch, proposer, committee); expect(activity[proposer.toString()]).toEqual('checkpoint-mined'); @@ -224,7 +227,7 @@ describe('sentinel', () => { }); // Emit the chain-checkpointed event with attestations from signers 0 and 1 - publishedCheckpoint = await emitCheckpointEvent(checkpoint, checkpointAttestations); + publishedCheckpoint = mineCheckpointForSlot(checkpoint, checkpointAttestations); const attestorsFromCheckpoint = compactArray( getAttestationInfoFromPublishedCheckpoint(publishedCheckpoint, TEST_COORDINATION_SIGNATURE_CONTEXT).map(info => @@ -267,7 +270,7 @@ describe('sentinel', () => { // Emit chain-checkpointed event with both signed and placeholder attestations // The Sentinel should only count the recovered-from-signature ones - publishedCheckpoint = await emitCheckpointEvent(checkpoint, allAttestations); + publishedCheckpoint = mineCheckpointForSlot(checkpoint, allAttestations); // Verify that getAttestationInfoFromPublishedCheckpoint returns 4 entries total: // - 2 with status 'recovered-from-signature' (actual attestations with valid signatures) @@ -299,7 +302,7 @@ describe('sentinel', () => { it('identifies missed attestors if block is mined', async () => { // Create checkpoint with a block at the target slot const checkpoint = await Checkpoint.random(CheckpointNumber(1), { numBlocks: 1, slotNumber: slot }); - await emitCheckpointEvent(checkpoint); + mineCheckpointForSlot(checkpoint); // P2P provides attestations from validators 0, 1, 2 (not validator 3) p2p.getCheckpointAttestationsForSlot.mockResolvedValue(attestations.slice(0, -1)); @@ -1030,18 +1033,6 @@ describe('sentinel', () => { }); class TestSentinel extends Sentinel { - constructor( - epochCache: EpochCache, - archiver: L2BlockSource, - p2p: P2PClient, - store: SentinelStore, - reexecutionTracker: CheckpointReexecutionTracker, - config: SentinelRuntimeConfig, - protected override blockStream: L2BlockStream, - ) { - super(epochCache, archiver, p2p, store, reexecutionTracker, config); - } - public override init() { this.initialSlot = this.epochCache.getEpochAndSlotNow().slot; return Promise.resolve(); diff --git a/yarn-project/aztec-node/src/sentinel/sentinel.ts b/yarn-project/aztec-node/src/sentinel/sentinel.ts index 78a50ff8c075..ef2fc3c69340 100644 --- a/yarn-project/aztec-node/src/sentinel/sentinel.ts +++ b/yarn-project/aztec-node/src/sentinel/sentinel.ts @@ -1,16 +1,9 @@ import type { EpochCache } from '@aztec/epoch-cache'; -import { - BlockNumber, - CheckpointNumber, - CheckpointProposalHash, - EpochNumber, - SlotNumber, -} from '@aztec/foundation/branded-types'; +import { CheckpointNumber, CheckpointProposalHash, EpochNumber, SlotNumber } from '@aztec/foundation/branded-types'; import { countWhile, filterAsync, fromEntries, getEntries, mapValues } from '@aztec/foundation/collection'; import { EthAddress } from '@aztec/foundation/eth-address'; import { createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; -import { L2TipsMemoryStore, type L2TipsStore } from '@aztec/kv-store/stores'; import type { P2PClient } from '@aztec/p2p'; import { OffenseType, @@ -21,13 +14,7 @@ import { getOffenseTypeName, } from '@aztec/slasher'; import type { SlasherConfig } from '@aztec/slasher/config'; -import { - type L2BlockSource, - L2BlockStream, - type L2BlockStreamEvent, - type L2BlockStreamEventHandler, - getAttestationInfoFromPublishedCheckpoint, -} from '@aztec/stdlib/block'; +import { type L2BlockSource, getAttestationInfoFromPublishedCheckpoint } from '@aztec/stdlib/block'; import type { CheckpointReexecutionTracker } from '@aztec/stdlib/checkpoint'; import type { ChainConfig } from '@aztec/stdlib/config'; import { getEpochAtSlot, getSlotRangeForEpoch, getTimestampForSlot } from '@aztec/stdlib/epoch-helpers'; @@ -97,7 +84,7 @@ function statusToCategory(status: ValidatorStatusInSlot): ValidatorStatusType { * first: * * - `checkpoint-mined` — a checkpoint covering this slot has landed on L1 - * (`slotNumberToCheckpoint` populated from `chain-checkpointed`). + * (fetched on demand via `archiver.getCheckpoint({ slot })`). * - `checkpoint-valid` — the local node re-executed a checkpoint proposal for this slot * successfully (consulted via `CheckpointReexecutionTracker`). * - `checkpoint-invalid` — the local node re-executed a checkpoint proposal for this slot @@ -135,25 +122,13 @@ function statusToCategory(status: ValidatorStatusInSlot): ValidatorStatusType { * (no history entries for that slot) and per-epoch evaluation writes an empty performance map * (no slashing). */ -export class Sentinel extends (EventEmitter as new () => WatcherEmitter) implements L2BlockStreamEventHandler, Watcher { +export class Sentinel extends (EventEmitter as new () => WatcherEmitter) implements Watcher { protected runningPromise: RunningPromise; - protected blockStream!: L2BlockStream; - protected l2TipsStore: L2TipsStore; protected initialSlot: SlotNumber | undefined; protected lastProcessedSlot: SlotNumber | undefined; /** Largest epoch number for which the end-of-epoch aggregator has run. */ protected lastEvaluatedEpoch: EpochNumber | undefined; - protected slotNumberToCheckpoint: Map< - SlotNumber, - { - checkpointNumber: CheckpointNumber; - archive: string; - /** Hex keccak256 of the consensus payload bytes; used to fetch matching p2p attestations. */ - proposalPayloadHash: CheckpointProposalHash; - attestors: EthAddress[]; - } - > = new Map(); constructor( protected epochCache: EpochCache, @@ -165,7 +140,6 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme protected logger = createLogger('node:sentinel'), ) { super(); - this.l2TipsStore = new L2TipsMemoryStore(archiver.getGenesisBlockHash()); const interval = (epochCache.getL1Constants().ethereumSlotDuration * 1000) / 4; this.runningPromise = new RunningPromise(this.work.bind(this), logger, interval); } @@ -187,17 +161,14 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme } /** - * Loads initial slot and initializes blockstream. We will not process anything at or before - * the initial slot. Floors at the archiver's synced L2 slot so the sentinel keeps making - * forward progress when L1 is advancing but L2 has no activity (the synced slot is driven by - * L1 sync, not by L2 blocks). Falls back to the wallclock if the archiver isn't ready yet - * (cold start). + * Loads the initial slot. We will not process anything at or before the initial slot. Floors at the + * archiver's synced L2 slot so the sentinel keeps making forward progress when L1 is advancing but L2 has no + * activity (the synced slot is driven by L1 sync, not by L2 blocks). Falls back to the wallclock if the + * archiver isn't ready yet (cold start). */ protected async init() { this.initialSlot = await this.getCurrentSlot(); - const startingBlock = BlockNumber(await this.archiver.getBlockNumber()); - this.logger.info(`Starting validator sentinel with initial slot ${this.initialSlot} and block ${startingBlock}`); - this.blockStream = new L2BlockStream(this.archiver, this.l2TipsStore, this, this.logger, { startingBlock }); + this.logger.info(`Starting validator sentinel with initial slot ${this.initialSlot}`); } /** @@ -215,44 +186,38 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme return this.runningPromise.stop(); } - public async handleBlockStreamEvent(event: L2BlockStreamEvent): Promise { - await this.l2TipsStore.handleBlockStreamEvent(event); - if (event.type === 'chain-checkpointed') { - this.handleCheckpoint(event); - } - } - - protected handleCheckpoint(event: L2BlockStreamEvent) { - if (event.type !== 'chain-checkpointed') { - return; + /** + * Fetches the L1-confirmed checkpoint covering a slot (if any) and derives the slot-level data the + * activity classifier needs: the checkpoint number, archive root, consensus payload hash (used to fetch + * matching p2p attestations regardless of feeAssetPriceModifier variants), and the recovered attestor set. + * Reads on demand so the result is always against the canonical chain — a reorged-out checkpoint simply + * stops being returned, with no stale mapping to clean up. + */ + protected async getCheckpointForSlot(slot: SlotNumber): Promise< + | { + checkpointNumber: CheckpointNumber; + archive: string; + proposalPayloadHash: CheckpointProposalHash; + attestors: EthAddress[]; + } + | undefined + > { + const checkpoint = await this.archiver.getCheckpoint({ slot }); + if (!checkpoint) { + return undefined; } - const checkpoint = event.checkpoint; - - // Store mapping from slot to archive, checkpoint number, attestors, and the consensus payload - // hash (used to query matching p2p attestations regardless of feeAssetPriceModifier variants). const signatureContext = this.getSignatureContext(); const proposalPayloadHash = CheckpointProposalHash.fromBuffer( ConsensusPayload.fromCheckpoint(checkpoint.checkpoint, signatureContext).getPayloadHash(), ); - this.slotNumberToCheckpoint.set(checkpoint.checkpoint.header.slotNumber, { + return { checkpointNumber: checkpoint.checkpoint.number, archive: checkpoint.checkpoint.archive.root.toString(), proposalPayloadHash, attestors: getAttestationInfoFromPublishedCheckpoint(checkpoint, signatureContext) .filter(a => a.status === 'recovered-from-signature') .map(a => a.address!), - }); - - // Prune the archive map to only keep at most N entries - const historyLength = this.store.getHistoryLength(); - if (this.slotNumberToCheckpoint.size > historyLength) { - const toDelete = Array.from(this.slotNumberToCheckpoint.keys()) - .sort((a, b) => Number(a - b)) - .slice(0, this.slotNumberToCheckpoint.size - historyLength); - for (const key of toDelete) { - this.slotNumberToCheckpoint.delete(key); - } - } + }; } /** @@ -387,10 +352,6 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme public async work() { const currentSlot = await this.getCurrentSlot(); try { - // Manually sync the block stream to ensure we have the latest data. - // Note we never `start` the blockstream, so it loops at the same pace as we do. - await this.blockStream.sync(); - // Per-slot activity recording (lag = 2 slots for P2P attestation settlement). const targetSlot = await this.isReadyToProcess(currentSlot); if (targetSlot !== false) { @@ -478,7 +439,7 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme return false; } - const archiverLastBlockHash = await this.l2TipsStore.getL2Tips().then(tip => tip.proposed.hash); + const archiverLastBlockHash = await this.archiver.getL2Tips().then(tip => tip.proposed.hash); const p2pLastBlockHash = await this.p2p.getL2Tips().then(tips => tips.proposed.hash); const isP2pSynced = archiverLastBlockHash === p2pLastBlockHash; if (!isP2pSynced) { @@ -534,8 +495,9 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme this.logger.debug(`Computing stats for slot ${slot} at epoch ${epoch}`, { slot, epoch, proposer, committee }); // Gather attestors from both p2p (live attestations) and the archiver (signers on the - // checkpoint if one has landed on L1). Used regardless of which case applies. - const checkpoint = this.slotNumberToCheckpoint.get(slot); + // checkpoint if one has landed on L1). Fetched on demand so it always reflects the canonical chain. + // Used regardless of which case applies. + const checkpoint = await this.getCheckpointForSlot(slot); const p2pAttested = await this.p2p.getCheckpointAttestationsForSlot(slot, checkpoint?.proposalPayloadHash); const p2pAttestors = p2pAttested.map(a => a.getSender()).filter((s): s is EthAddress => s !== undefined); const attestors = new Set( From f3f8a3d1d3d0854561c055ba3d213eab6eb9320c Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Thu, 11 Jun 2026 20:12:29 -0300 Subject: [PATCH 4/4] fix(prover-node): cap checkpoint catch-up fetch at two epochs When resyncing far behind the checkpointed tip, processCheckpointJump fetched the entire gap of checkpoints between its cursor and the tip, which could load thousands of checkpoints after a long time offline. Cap the catch-up at the two most recent epochs' worth (2 * epochDuration checkpoints, since an epoch spans at most epochDuration checkpoints); older checkpoints are past their proof- submission window and cannot be proven anyway, so skip them and jump the cursor forward. Also rewrite the L2BlockStream post-prune tier-reconciliation comment to explain the stale-snapshot reasoning without referencing an external finding number. --- .../prover-node/src/prover-node.test.ts | 30 +++++++++++++++++++ yarn-project/prover-node/src/prover-node.ts | 23 ++++++++++++-- .../block/l2_block_stream/l2_block_stream.ts | 5 ++-- 3 files changed, 54 insertions(+), 4 deletions(-) diff --git a/yarn-project/prover-node/src/prover-node.test.ts b/yarn-project/prover-node/src/prover-node.test.ts index ba7cdd7f553c..1725c0c13f3e 100644 --- a/yarn-project/prover-node/src/prover-node.test.ts +++ b/yarn-project/prover-node/src/prover-node.test.ts @@ -98,6 +98,32 @@ describe('ProverNode', () => { expect(sessionManager.onCheckpointAdded).toHaveBeenCalledWith(EpochNumber(1)); }); + it('caps the catch-up fetch at two epochs when resyncing far behind the checkpointed tip', async () => { + // epochDuration=1 ⇒ two epochs' worth of checkpoints is 2. Cursor sits at checkpoint 0 while the + // checkpointed tip has jumped to 100 (e.g. the prover node was offline for a long time). We must not + // fetch all 100 checkpoints: epochs that far back are past their proof-submission window and cannot be + // proven anyway, so the catch-up should fetch only the most recent two epochs' worth (checkpoints 99 and + // 100) and skip the rest, leaving the cursor advanced past them so they are never retried. + setupNotFullyProven(); + const fetchSpy = l2BlockSource.getCheckpointsData; + mineCheckpoint(makeCheckpoint(99, 99, 99)); + const event = mineCheckpoint(makeCheckpoint(100, 100, 100)); + proverNode.setLastProcessedCheckpoint(CheckpointNumber.ZERO); + + await proverNode.handleBlockStreamEvent(event); + + // Only the most recent two epochs' worth were fetched and registered; the cursor lands at the tip. + const fetchRanges = fetchSpy.mock.calls.map(([q]) => q as any).filter(q => 'from' in q); + expect(fetchRanges).toEqual([{ from: CheckpointNumber(99), limit: 2 }]); + expect( + proverNode + .getCheckpointStore() + .listAll() + .map(p => p.id), + ).toHaveLength(2); + expect(proverNode.getLastProcessedCheckpoint()).toEqual(CheckpointNumber(100)); + }); + it('dispatches chain-pruned through markPrunedAfter and notifies the session manager only when affected', async () => { // No registered checkpoints — nothing to prune. await proverNode.handleBlockStreamEvent({ @@ -674,4 +700,8 @@ class TestProverNode extends ProverNode { public getLastProcessedCheckpoint(): CheckpointNumber { return this.lastProcessedCheckpoint; } + + public setLastProcessedCheckpoint(checkpoint: CheckpointNumber): void { + this.lastProcessedCheckpoint = checkpoint; + } } diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index 400388286974..296ec231a724 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -266,10 +266,29 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra if (targetCheckpoint <= this.lastProcessedCheckpoint) { return; } - const from = CheckpointNumber(this.lastProcessedCheckpoint + 1); + const l1Constants = await this.getL1Constants(); + + // Cap the catch-up at the two most recent epochs' worth of checkpoints. With at most one checkpoint per + // slot, an epoch spans at most `epochDuration` checkpoints, so two epochs is `2 * epochDuration`. When the + // cursor is much further behind (e.g. resyncing after a long time offline), fetching the whole gap could + // load thousands of checkpoints we cannot act on: anything older than the last two epochs is already past + // its proof-submission window, so we skip it and jump the cursor forward to the start of the capped range. + const maxCheckpoints = 2 * l1Constants.epochDuration; + let from = CheckpointNumber(this.lastProcessedCheckpoint + 1); + if (Number(targetCheckpoint - from) + 1 > maxCheckpoints) { + const cappedFrom = CheckpointNumber(targetCheckpoint - maxCheckpoints + 1); + this.log.warn(`Skipping unprovable checkpoints during catch-up; the prover node is far behind`, { + from, + cappedFrom, + targetCheckpoint, + maxCheckpoints, + }); + // Advance the cursor past the skipped checkpoints so they are never retried. + this.lastProcessedCheckpoint = CheckpointNumber(cappedFrom - 1); + from = cappedFrom; + } const limit = Number(targetCheckpoint - from) + 1; const metadatas = await this.l2BlockSource.getCheckpointsData({ from, limit }); - const l1Constants = await this.getL1Constants(); // Per-epoch relevance is cached so a multi-checkpoint epoch resolves it once. Skipping is whole-epoch // only: the SessionManager requires an epoch's checkpoints fully covered before it opens a session, so we diff --git a/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.ts b/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.ts index cd8fd2bef027..33f648c03694 100644 --- a/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.ts +++ b/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.ts @@ -159,8 +159,9 @@ export class L2BlockStream { // End-of-pass tier reconciliation. For each tier, emit a single event iff the source tip differs from the // local one. All three source tips come from the SAME `sourceTips` snapshot, so no extra source fetches are - // needed. We re-read the local tips after a prune so the comparison reflects the cursors the prune handler - // just clamped (the initial `localTips` snapshot is stale post-prune — finding 2). + // needed. We re-read the local tips after a prune because the prune handler has already clamped the local + // cursors back; the `localTips` snapshot taken before the prune would be stale and would mis-drive the tier + // comparison (emitting events relative to cursors that no longer exist). const reconcileTips = pruned ? await this.localData.getL2Tips() : localTips; if (!this.opts.ignoreCheckpoints && this.tipDiffers(reconcileTips.checkpointed?.block, sourceTips.checkpointed)) { await this.emitEvent({