diff --git a/yarn-project/aztec-node/src/sentinel/README.md b/yarn-project/aztec-node/src/sentinel/README.md index 5ab68d8d8cff..9f114ea65c0f 100644 --- a/yarn-project/aztec-node/src/sentinel/README.md +++ b/yarn-project/aztec-node/src/sentinel/README.md @@ -17,10 +17,10 @@ The sentinel is one of several watchers registered with the slasher; it does not | Source | What it provides | |---|---| | `EpochCache` | Slot/epoch helpers, committee + proposer for a slot, escape-hatch state | -| `L2BlockSource` (archiver) | Synced slot, `chain-checkpointed` events, block headers | +| `L2BlockSource` (archiver) | Synced slot, `getCheckpoint({ slot })`, `getL2Tips()`, block headers | | `P2PClient` | `getCheckpointAttestationsForSlot(slot, payloadHash)`, `hasBlockProposalsForSlot(slot)` | | `CheckpointReexecutionTracker` | Local re-execution outcome for the proposal at each slot (`valid` / `invalid` / `unvalidated`) — populated by the validator client's `ProposalHandler` | -| L1-checkpointed events | `chain-checkpointed` populates `slotNumberToCheckpoint` with the canonical attestor set | +| L1-confirmed checkpoints | Fetched on demand per slot via `archiver.getCheckpoint({ slot })`, yielding the canonical attestor set | ## Two cadences @@ -49,7 +49,7 @@ For each slot, the proposer is assigned one of six statuses, ranked highest-conf | # | Status | Trigger | Inactive party | |---|---|---|---| -| 6 | `checkpoint-mined` | `slotNumberToCheckpoint.has(slot)` (a checkpoint covering this slot has landed on L1) | Attestors who didn't attest | +| 6 | `checkpoint-mined` | `archiver.getCheckpoint({ slot })` returns a checkpoint (one covering this slot has landed on L1) | Attestors who didn't attest | | 5 | `checkpoint-valid` | `tracker.getOutcomeForSlot(slot) === 'valid'` | Attestors who didn't attest | | 4 | `checkpoint-invalid` | `tracker.getOutcomeForSlot(slot) === 'invalid'` (re-executed and rejected) | Proposer | | 3 | `checkpoint-unvalidated` | `tracker.getOutcomeForSlot(slot) === 'unvalidated'` (validation aborted: missing data, timeout, etc.) | Proposer | diff --git a/yarn-project/aztec-node/src/sentinel/sentinel.test.ts b/yarn-project/aztec-node/src/sentinel/sentinel.test.ts index e2c489e37254..ee95f5af5187 100644 --- a/yarn-project/aztec-node/src/sentinel/sentinel.test.ts +++ b/yarn-project/aztec-node/src/sentinel/sentinel.test.ts @@ -11,7 +11,6 @@ import { GENESIS_BLOCK_HEADER_HASH, L2Block, type L2BlockSource, - type L2BlockStream, getAttestationInfoFromPublishedCheckpoint, } from '@aztec/stdlib/block'; import { @@ -44,7 +43,6 @@ describe('sentinel', () => { let epochCache: MockProxy; let archiver: MockProxy; let p2p: MockProxy; - let blockStream: MockProxy; let reexecutionTracker: CheckpointReexecutionTracker; let kvStore: AztecLMDBStoreV2; @@ -70,7 +68,6 @@ describe('sentinel', () => { archiver = mock(); archiver.getGenesisBlockHash.mockReturnValue(GENESIS_BLOCK_HEADER_HASH); p2p = mock(); - blockStream = mock(); reexecutionTracker = new CheckpointReexecutionTracker(); kvStore = await openTmpStore('sentinel-test'); @@ -100,7 +97,7 @@ describe('sentinel', () => { epochCache.getEpochNow.mockReturnValue(epoch); epochCache.getL1Constants.mockReturnValue(l1Constants); - sentinel = new TestSentinel(epochCache, archiver, p2p, store, reexecutionTracker, config, blockStream); + sentinel = new TestSentinel(epochCache, archiver, p2p, store, reexecutionTracker, config); }); afterEach(async () => { @@ -142,16 +139,22 @@ describe('sentinel', () => { let proposer: EthAddress; let committee: EthAddress[]; - /** Helper to create and emit a chain-checkpointed event */ - const emitCheckpointEvent = async (checkpoint: Checkpoint, checkpointAttestations: CommitteeAttestation[] = []) => { + /** + * Stubs the archiver so the slot's on-demand `getCheckpoint({ slot })` lookup returns this checkpoint, mirroring + * a checkpoint that has landed on L1 covering the slot. + */ + const mineCheckpointForSlot = (checkpoint: Checkpoint, checkpointAttestations: CommitteeAttestation[] = []) => { const published = new PublishedCheckpoint(checkpoint, L1PublishedData.random(), checkpointAttestations); - const lastBlock = checkpoint.blocks.at(-1)!; - const block = { number: lastBlock.number, hash: (await lastBlock.hash()).toString() }; - await sentinel.handleBlockStreamEvent({ type: 'chain-checkpointed', checkpoint: published, block }); + const checkpointSlot = checkpoint.header.slotNumber; + archiver.getCheckpoint.mockImplementation(query => + Promise.resolve('slot' in query && query.slot === checkpointSlot ? published : undefined), + ); return published; }; beforeEach(async () => { + // No L1 checkpoint by default; individual tests mine one via mineCheckpointForSlot. + archiver.getCheckpoint.mockResolvedValue(undefined); signers = times(4, Secp256k1Signer.random); validators = signers.map(signer => signer.address); block = await L2Block.random(BlockNumber(1), { slotNumber: slot }); @@ -165,7 +168,7 @@ describe('sentinel', () => { it('flags checkpoint as mined when L1 has it (case 6)', async () => { // Create a checkpoint with a block at the target slot and emit chain-checkpointed event const checkpoint = await Checkpoint.random(CheckpointNumber(1), { numBlocks: 1, slotNumber: slot }); - await emitCheckpointEvent(checkpoint); + mineCheckpointForSlot(checkpoint); const activity = await sentinel.getSlotActivity(slot, epoch, proposer, committee); expect(activity[proposer.toString()]).toEqual('checkpoint-mined'); @@ -208,7 +211,7 @@ describe('sentinel', () => { it('prefers L1-mined over tracker outcome', async () => { reexecutionTracker.recordOutcome(slot, block.archive.root, 'invalid', CheckpointNumber(1)); const checkpoint = await Checkpoint.random(CheckpointNumber(1), { numBlocks: 1, slotNumber: slot }); - await emitCheckpointEvent(checkpoint); + mineCheckpointForSlot(checkpoint); const activity = await sentinel.getSlotActivity(slot, epoch, proposer, committee); expect(activity[proposer.toString()]).toEqual('checkpoint-mined'); @@ -224,7 +227,7 @@ describe('sentinel', () => { }); // Emit the chain-checkpointed event with attestations from signers 0 and 1 - publishedCheckpoint = await emitCheckpointEvent(checkpoint, checkpointAttestations); + publishedCheckpoint = mineCheckpointForSlot(checkpoint, checkpointAttestations); const attestorsFromCheckpoint = compactArray( getAttestationInfoFromPublishedCheckpoint(publishedCheckpoint, TEST_COORDINATION_SIGNATURE_CONTEXT).map(info => @@ -267,7 +270,7 @@ describe('sentinel', () => { // Emit chain-checkpointed event with both signed and placeholder attestations // The Sentinel should only count the recovered-from-signature ones - publishedCheckpoint = await emitCheckpointEvent(checkpoint, allAttestations); + publishedCheckpoint = mineCheckpointForSlot(checkpoint, allAttestations); // Verify that getAttestationInfoFromPublishedCheckpoint returns 4 entries total: // - 2 with status 'recovered-from-signature' (actual attestations with valid signatures) @@ -299,7 +302,7 @@ describe('sentinel', () => { it('identifies missed attestors if block is mined', async () => { // Create checkpoint with a block at the target slot const checkpoint = await Checkpoint.random(CheckpointNumber(1), { numBlocks: 1, slotNumber: slot }); - await emitCheckpointEvent(checkpoint); + mineCheckpointForSlot(checkpoint); // P2P provides attestations from validators 0, 1, 2 (not validator 3) p2p.getCheckpointAttestationsForSlot.mockResolvedValue(attestations.slice(0, -1)); @@ -1030,18 +1033,6 @@ describe('sentinel', () => { }); class TestSentinel extends Sentinel { - constructor( - epochCache: EpochCache, - archiver: L2BlockSource, - p2p: P2PClient, - store: SentinelStore, - reexecutionTracker: CheckpointReexecutionTracker, - config: SentinelRuntimeConfig, - protected override blockStream: L2BlockStream, - ) { - super(epochCache, archiver, p2p, store, reexecutionTracker, config); - } - public override init() { this.initialSlot = this.epochCache.getEpochAndSlotNow().slot; return Promise.resolve(); diff --git a/yarn-project/aztec-node/src/sentinel/sentinel.ts b/yarn-project/aztec-node/src/sentinel/sentinel.ts index 78a50ff8c075..ef2fc3c69340 100644 --- a/yarn-project/aztec-node/src/sentinel/sentinel.ts +++ b/yarn-project/aztec-node/src/sentinel/sentinel.ts @@ -1,16 +1,9 @@ import type { EpochCache } from '@aztec/epoch-cache'; -import { - BlockNumber, - CheckpointNumber, - CheckpointProposalHash, - EpochNumber, - SlotNumber, -} from '@aztec/foundation/branded-types'; +import { CheckpointNumber, CheckpointProposalHash, EpochNumber, SlotNumber } from '@aztec/foundation/branded-types'; import { countWhile, filterAsync, fromEntries, getEntries, mapValues } from '@aztec/foundation/collection'; import { EthAddress } from '@aztec/foundation/eth-address'; import { createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; -import { L2TipsMemoryStore, type L2TipsStore } from '@aztec/kv-store/stores'; import type { P2PClient } from '@aztec/p2p'; import { OffenseType, @@ -21,13 +14,7 @@ import { getOffenseTypeName, } from '@aztec/slasher'; import type { SlasherConfig } from '@aztec/slasher/config'; -import { - type L2BlockSource, - L2BlockStream, - type L2BlockStreamEvent, - type L2BlockStreamEventHandler, - getAttestationInfoFromPublishedCheckpoint, -} from '@aztec/stdlib/block'; +import { type L2BlockSource, getAttestationInfoFromPublishedCheckpoint } from '@aztec/stdlib/block'; import type { CheckpointReexecutionTracker } from '@aztec/stdlib/checkpoint'; import type { ChainConfig } from '@aztec/stdlib/config'; import { getEpochAtSlot, getSlotRangeForEpoch, getTimestampForSlot } from '@aztec/stdlib/epoch-helpers'; @@ -97,7 +84,7 @@ function statusToCategory(status: ValidatorStatusInSlot): ValidatorStatusType { * first: * * - `checkpoint-mined` — a checkpoint covering this slot has landed on L1 - * (`slotNumberToCheckpoint` populated from `chain-checkpointed`). + * (fetched on demand via `archiver.getCheckpoint({ slot })`). * - `checkpoint-valid` — the local node re-executed a checkpoint proposal for this slot * successfully (consulted via `CheckpointReexecutionTracker`). * - `checkpoint-invalid` — the local node re-executed a checkpoint proposal for this slot @@ -135,25 +122,13 @@ function statusToCategory(status: ValidatorStatusInSlot): ValidatorStatusType { * (no history entries for that slot) and per-epoch evaluation writes an empty performance map * (no slashing). */ -export class Sentinel extends (EventEmitter as new () => WatcherEmitter) implements L2BlockStreamEventHandler, Watcher { +export class Sentinel extends (EventEmitter as new () => WatcherEmitter) implements Watcher { protected runningPromise: RunningPromise; - protected blockStream!: L2BlockStream; - protected l2TipsStore: L2TipsStore; protected initialSlot: SlotNumber | undefined; protected lastProcessedSlot: SlotNumber | undefined; /** Largest epoch number for which the end-of-epoch aggregator has run. */ protected lastEvaluatedEpoch: EpochNumber | undefined; - protected slotNumberToCheckpoint: Map< - SlotNumber, - { - checkpointNumber: CheckpointNumber; - archive: string; - /** Hex keccak256 of the consensus payload bytes; used to fetch matching p2p attestations. */ - proposalPayloadHash: CheckpointProposalHash; - attestors: EthAddress[]; - } - > = new Map(); constructor( protected epochCache: EpochCache, @@ -165,7 +140,6 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme protected logger = createLogger('node:sentinel'), ) { super(); - this.l2TipsStore = new L2TipsMemoryStore(archiver.getGenesisBlockHash()); const interval = (epochCache.getL1Constants().ethereumSlotDuration * 1000) / 4; this.runningPromise = new RunningPromise(this.work.bind(this), logger, interval); } @@ -187,17 +161,14 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme } /** - * Loads initial slot and initializes blockstream. We will not process anything at or before - * the initial slot. Floors at the archiver's synced L2 slot so the sentinel keeps making - * forward progress when L1 is advancing but L2 has no activity (the synced slot is driven by - * L1 sync, not by L2 blocks). Falls back to the wallclock if the archiver isn't ready yet - * (cold start). + * Loads the initial slot. We will not process anything at or before the initial slot. Floors at the + * archiver's synced L2 slot so the sentinel keeps making forward progress when L1 is advancing but L2 has no + * activity (the synced slot is driven by L1 sync, not by L2 blocks). Falls back to the wallclock if the + * archiver isn't ready yet (cold start). */ protected async init() { this.initialSlot = await this.getCurrentSlot(); - const startingBlock = BlockNumber(await this.archiver.getBlockNumber()); - this.logger.info(`Starting validator sentinel with initial slot ${this.initialSlot} and block ${startingBlock}`); - this.blockStream = new L2BlockStream(this.archiver, this.l2TipsStore, this, this.logger, { startingBlock }); + this.logger.info(`Starting validator sentinel with initial slot ${this.initialSlot}`); } /** @@ -215,44 +186,38 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme return this.runningPromise.stop(); } - public async handleBlockStreamEvent(event: L2BlockStreamEvent): Promise { - await this.l2TipsStore.handleBlockStreamEvent(event); - if (event.type === 'chain-checkpointed') { - this.handleCheckpoint(event); - } - } - - protected handleCheckpoint(event: L2BlockStreamEvent) { - if (event.type !== 'chain-checkpointed') { - return; + /** + * Fetches the L1-confirmed checkpoint covering a slot (if any) and derives the slot-level data the + * activity classifier needs: the checkpoint number, archive root, consensus payload hash (used to fetch + * matching p2p attestations regardless of feeAssetPriceModifier variants), and the recovered attestor set. + * Reads on demand so the result is always against the canonical chain — a reorged-out checkpoint simply + * stops being returned, with no stale mapping to clean up. + */ + protected async getCheckpointForSlot(slot: SlotNumber): Promise< + | { + checkpointNumber: CheckpointNumber; + archive: string; + proposalPayloadHash: CheckpointProposalHash; + attestors: EthAddress[]; + } + | undefined + > { + const checkpoint = await this.archiver.getCheckpoint({ slot }); + if (!checkpoint) { + return undefined; } - const checkpoint = event.checkpoint; - - // Store mapping from slot to archive, checkpoint number, attestors, and the consensus payload - // hash (used to query matching p2p attestations regardless of feeAssetPriceModifier variants). const signatureContext = this.getSignatureContext(); const proposalPayloadHash = CheckpointProposalHash.fromBuffer( ConsensusPayload.fromCheckpoint(checkpoint.checkpoint, signatureContext).getPayloadHash(), ); - this.slotNumberToCheckpoint.set(checkpoint.checkpoint.header.slotNumber, { + return { checkpointNumber: checkpoint.checkpoint.number, archive: checkpoint.checkpoint.archive.root.toString(), proposalPayloadHash, attestors: getAttestationInfoFromPublishedCheckpoint(checkpoint, signatureContext) .filter(a => a.status === 'recovered-from-signature') .map(a => a.address!), - }); - - // Prune the archive map to only keep at most N entries - const historyLength = this.store.getHistoryLength(); - if (this.slotNumberToCheckpoint.size > historyLength) { - const toDelete = Array.from(this.slotNumberToCheckpoint.keys()) - .sort((a, b) => Number(a - b)) - .slice(0, this.slotNumberToCheckpoint.size - historyLength); - for (const key of toDelete) { - this.slotNumberToCheckpoint.delete(key); - } - } + }; } /** @@ -387,10 +352,6 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme public async work() { const currentSlot = await this.getCurrentSlot(); try { - // Manually sync the block stream to ensure we have the latest data. - // Note we never `start` the blockstream, so it loops at the same pace as we do. - await this.blockStream.sync(); - // Per-slot activity recording (lag = 2 slots for P2P attestation settlement). const targetSlot = await this.isReadyToProcess(currentSlot); if (targetSlot !== false) { @@ -478,7 +439,7 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme return false; } - const archiverLastBlockHash = await this.l2TipsStore.getL2Tips().then(tip => tip.proposed.hash); + const archiverLastBlockHash = await this.archiver.getL2Tips().then(tip => tip.proposed.hash); const p2pLastBlockHash = await this.p2p.getL2Tips().then(tips => tips.proposed.hash); const isP2pSynced = archiverLastBlockHash === p2pLastBlockHash; if (!isP2pSynced) { @@ -534,8 +495,9 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme this.logger.debug(`Computing stats for slot ${slot} at epoch ${epoch}`, { slot, epoch, proposer, committee }); // Gather attestors from both p2p (live attestations) and the archiver (signers on the - // checkpoint if one has landed on L1). Used regardless of which case applies. - const checkpoint = this.slotNumberToCheckpoint.get(slot); + // checkpoint if one has landed on L1). Fetched on demand so it always reflects the canonical chain. + // Used regardless of which case applies. + const checkpoint = await this.getCheckpointForSlot(slot); const p2pAttested = await this.p2p.getCheckpointAttestationsForSlot(slot, checkpoint?.proposalPayloadHash); const p2pAttestors = p2pAttested.map(a => a.getSender()).filter((s): s is EthAddress => s !== undefined); const attestors = new Set( diff --git a/yarn-project/prover-node/src/prover-node.test.ts b/yarn-project/prover-node/src/prover-node.test.ts index b9e23f2846c0..1725c0c13f3e 100644 --- a/yarn-project/prover-node/src/prover-node.test.ts +++ b/yarn-project/prover-node/src/prover-node.test.ts @@ -77,6 +77,7 @@ describe('ProverNode', () => { // exercise. proverNode.setSessionManager(sessionManager); proverNode.setPublishingService(publishingService); + mined.clear(); }); // ---------------- event dispatch ---------------- @@ -87,14 +88,9 @@ describe('ProverNode', () => { checkpoint: { number: CheckpointNumber(n), hash: `0x0${n}` }, }); - it('dispatches chain-checkpointed to handleCheckpointEvent', async () => { + it('dispatches chain-checkpointed: catches up and registers the checkpoint', async () => { setupNotFullyProven(); - const checkpoint = makeCheckpoint(1, 1, 1); - const event: L2BlockStreamEvent = { - type: 'chain-checkpointed', - checkpoint: makePublishedCheckpoint(checkpoint), - block: { number: BlockNumber(1), hash: '0x01' }, - }; + const event = mineCheckpoint(makeCheckpoint(1, 1, 1)); await proverNode.handleBlockStreamEvent(event); @@ -102,6 +98,32 @@ describe('ProverNode', () => { expect(sessionManager.onCheckpointAdded).toHaveBeenCalledWith(EpochNumber(1)); }); + it('caps the catch-up fetch at two epochs when resyncing far behind the checkpointed tip', async () => { + // epochDuration=1 ⇒ two epochs' worth of checkpoints is 2. Cursor sits at checkpoint 0 while the + // checkpointed tip has jumped to 100 (e.g. the prover node was offline for a long time). We must not + // fetch all 100 checkpoints: epochs that far back are past their proof-submission window and cannot be + // proven anyway, so the catch-up should fetch only the most recent two epochs' worth (checkpoints 99 and + // 100) and skip the rest, leaving the cursor advanced past them so they are never retried. + setupNotFullyProven(); + const fetchSpy = l2BlockSource.getCheckpointsData; + mineCheckpoint(makeCheckpoint(99, 99, 99)); + const event = mineCheckpoint(makeCheckpoint(100, 100, 100)); + proverNode.setLastProcessedCheckpoint(CheckpointNumber.ZERO); + + await proverNode.handleBlockStreamEvent(event); + + // Only the most recent two epochs' worth were fetched and registered; the cursor lands at the tip. + const fetchRanges = fetchSpy.mock.calls.map(([q]) => q as any).filter(q => 'from' in q); + expect(fetchRanges).toEqual([{ from: CheckpointNumber(99), limit: 2 }]); + expect( + proverNode + .getCheckpointStore() + .listAll() + .map(p => p.id), + ).toHaveLength(2); + expect(proverNode.getLastProcessedCheckpoint()).toEqual(CheckpointNumber(100)); + }); + it('dispatches chain-pruned through markPrunedAfter and notifies the session manager only when affected', async () => { // No registered checkpoints — nothing to prune. await proverNode.handleBlockStreamEvent({ @@ -114,11 +136,7 @@ describe('ProverNode', () => { // Register a checkpoint, then prune. setupNotFullyProven(); - await proverNode.handleBlockStreamEvent({ - type: 'chain-checkpointed', - checkpoint: makePublishedCheckpoint(makeCheckpoint(2, 2, 2)), - block: { number: BlockNumber(2), hash: '0x02' }, - }); + await proverNode.handleBlockStreamEvent(mineCheckpoint(makeCheckpoint(2, 2, 2))); await proverNode.handleBlockStreamEvent({ type: 'chain-pruned', @@ -207,18 +225,16 @@ describe('ProverNode', () => { // the tips stay put for the L2BlockStream to retry. worldState.syncImmediate.mockRejectedValue(new Error('boom')); - const event: L2BlockStreamEvent = { - type: 'chain-checkpointed', - checkpoint: makePublishedCheckpoint(makeCheckpoint(1, 1, 1)), - block: { number: BlockNumber(1), hash: '0x01' }, - }; + const event = mineCheckpoint(makeCheckpoint(1, 1, 1)); await expect(proverNode.handleBlockStreamEvent(event)).rejects.toThrow('boom'); - // Tips left unadvanced; nothing was registered and the session manager wasn't notified. + // Tips left unadvanced; nothing was registered, the session manager wasn't notified, and the catch-up + // cursor stays behind so the next pass retries this checkpoint. expect(await proverNode.getTipsStore().getL2BlockHash(1)).toBeUndefined(); expect(proverNode.getCheckpointStore().listAll()).toHaveLength(0); expect(sessionManager.onCheckpointAdded).not.toHaveBeenCalled(); + expect(proverNode.getLastProcessedCheckpoint()).toEqual(CheckpointNumber.ZERO); }); it('leaves the tips store unadvanced when a handler propagates an error (A-1041)', async () => { @@ -227,11 +243,7 @@ describe('ProverNode', () => { // tips-store update, so the error surfaces to the L2BlockStream and the tips stay put. l2BlockSource.getSyncedL2SlotNumber.mockRejectedValue(new Error('archiver down')); - const event: L2BlockStreamEvent = { - type: 'chain-checkpointed', - checkpoint: makePublishedCheckpoint(makeCheckpoint(1, 1, 1)), - block: { number: BlockNumber(1), hash: '0x01' }, - }; + const event = mineCheckpoint(makeCheckpoint(1, 1, 1)); await expect(proverNode.handleBlockStreamEvent(event)).rejects.toThrow('archiver down'); @@ -251,25 +263,19 @@ describe('ProverNode', () => { ); l2BlockSource.isEpochComplete.mockResolvedValue(true); - await proverNode.handleBlockStreamEvent({ - type: 'chain-checkpointed', - checkpoint: makePublishedCheckpoint(makeCheckpoint(1, 1, 1)), - block: { number: BlockNumber(1), hash: '0x01' }, - }); + await proverNode.handleBlockStreamEvent(mineCheckpoint(makeCheckpoint(1, 1, 1))); expect(proverNode.getCheckpointStore().listAll().length).toBe(0); expect(sessionManager.onCheckpointAdded).not.toHaveBeenCalled(); + // The whole-epoch skip still advances the cursor so we don't re-evaluate it next pass. + expect(proverNode.getLastProcessedCheckpoint()).toEqual(CheckpointNumber(1)); }); it('content-addresses the prover by the checkpoint archive root', async () => { setupNotFullyProven(); const archiveRoot = Fr.random(); - await proverNode.handleBlockStreamEvent({ - type: 'chain-checkpointed', - checkpoint: makePublishedCheckpoint(makeCheckpoint(1, 1, 2, archiveRoot)), - block: { number: BlockNumber(2), hash: '0x02' }, - }); + await proverNode.handleBlockStreamEvent(mineCheckpoint(makeCheckpoint(1, 1, 2, archiveRoot))); const prover = proverNode.getCheckpointStore().listAll()[0]; expect(prover.id).toContain(archiveRoot.toString()); @@ -370,16 +376,8 @@ describe('ProverNode', () => { setupRegistrationSuccess(); // Register two checkpoints at slots 6 and 7 (both in epoch 3). - await proverNode.handleBlockStreamEvent({ - type: 'chain-checkpointed', - checkpoint: makePublishedCheckpoint(makeCheckpoint(1, 6, 6)), - block: { number: BlockNumber(6), hash: '0x06' }, - }); - await proverNode.handleBlockStreamEvent({ - type: 'chain-checkpointed', - checkpoint: makePublishedCheckpoint(makeCheckpoint(2, 7, 7)), - block: { number: BlockNumber(7), hash: '0x07' }, - }); + await proverNode.handleBlockStreamEvent(mineCheckpoint(makeCheckpoint(1, 6, 6))); + await proverNode.handleBlockStreamEvent(mineCheckpoint(makeCheckpoint(2, 7, 7))); expect(proverNode.getCheckpointStore().listAll().length).toBe(2); // Pruning above checkpoint 0 marks both as pruned — onPrune must receive [EpochNumber(3)], @@ -617,6 +615,44 @@ describe('ProverNode', () => { function makePublishedCheckpoint(checkpoint: Checkpoint): PublishedCheckpoint { return { checkpoint, attestations: [] } as unknown as PublishedCheckpoint; } + + /** Registry of mined checkpoints. */ + const mined = new Map(); + + /** + * Registers `checkpoint` with the block source mocks: its light metadata is returned by `getCheckpointsData` + * range queries, and its full payload by `getCheckpoint({ number })`. Returns the thin `chain-checkpointed` + * tip event that points at it — the block stream now delivers only the tip, and the prover-node fetches + * everything between its cursor and the tip itself. + */ + function mineCheckpoint(checkpoint: Checkpoint): L2BlockStreamEvent { + mined.set(Number(checkpoint.number), checkpoint); + l2BlockSource.getCheckpoint.mockImplementation((query: any) => + Promise.resolve('number' in query ? makeMaybePublished(mined.get(Number(query.number))) : undefined), + ); + l2BlockSource.getCheckpointsData.mockImplementation((query: any) => { + if (!('from' in query)) { + return Promise.resolve([]); + } + const data = []; + for (let n = Number(query.from); n < Number(query.from) + query.limit; n++) { + const cp = mined.get(n); + if (cp) { + data.push({ checkpointNumber: cp.number, header: cp.header } as any); + } + } + return Promise.resolve(data); + }); + return { + type: 'chain-checkpointed', + block: { number: BlockNumber(checkpoint.blocks[0].number), hash: '0x01' }, + checkpoint: { number: checkpoint.number, hash: checkpoint.hash().toString() }, + }; + } + + function makeMaybePublished(checkpoint: Checkpoint | undefined): PublishedCheckpoint | undefined { + return checkpoint ? makePublishedCheckpoint(checkpoint) : undefined; + } }); /** ProverNode subclass that exposes hooks for injecting a mocked SessionManager + reads. */ @@ -660,4 +696,12 @@ class TestProverNode extends ProverNode { public getLastExpiredEpoch(): EpochNumber | undefined { return this.lastExpiredEpoch; } + + public getLastProcessedCheckpoint(): CheckpointNumber { + return this.lastProcessedCheckpoint; + } + + public setLastProcessedCheckpoint(checkpoint: CheckpointNumber): void { + this.lastProcessedCheckpoint = checkpoint; + } } diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index 9e5b7fbd7508..296ec231a724 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -5,6 +5,7 @@ import { BlockNumber, CheckpointNumber, EpochNumber } from '@aztec/foundation/br import { assertRequired, compact, pick } from '@aztec/foundation/collection'; import { memoize } from '@aztec/foundation/decorators'; import { createLogger } from '@aztec/foundation/log'; +import { RunningPromise } from '@aztec/foundation/running-promise'; import { DateProvider, executeTimeout } from '@aztec/foundation/timer'; import type { EpochProverFactory } from '@aztec/prover-client'; import { getLastSiblingPath } from '@aztec/prover-client/helpers'; @@ -95,6 +96,17 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra */ protected lastExpiredEpoch: EpochNumber | undefined; + /** + * Highest checkpoint number whose proving-side handling has completed (or that was legitimately skipped). + * The catch-up loop walks from here to each `chain-checkpointed` tip event. Seeded at start() from the last + * checkpoint of the last fully-proven epoch (or 0), so a restart reprocesses the partially-proven epoch rather + * than trusting a checkpointed tip that may sit ahead of unproven checkpoints. Clamped down on a prune. + */ + protected lastProcessedCheckpoint: CheckpointNumber = CheckpointNumber.ZERO; + + /** Periodic tick that runs the epoch-expiry sweep during idle periods when no block-stream events arrive. */ + private expiryTicker: RunningPromise | undefined; + public readonly tracer: Tracer; protected publishingService: ProofPublishingService | undefined; @@ -219,7 +231,7 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra public async handleBlockStreamEvent(event: L2BlockStreamEvent): Promise { switch (event.type) { case 'chain-checkpointed': - await this.handleCheckpointEvent(event.checkpoint); + await this.processCheckpointJump(event.checkpoint.number); break; case 'chain-pruned': await this.handlePruneEvent(event.checkpointed.checkpoint); @@ -239,32 +251,83 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra await this.tipsStore.handleBlockStreamEvent(event); } - /** Register a new checkpoint with the store and notify the session manager. */ - private async handleCheckpointEvent(publishedCheckpoint: PublishedCheckpoint) { - const checkpoint = publishedCheckpoint.checkpoint; - const slotNumber = checkpoint.header.slotNumber; - const l1Constants = await this.getL1Constants(); - const epochNumber = getEpochAtSlot(slotNumber, l1Constants); - - if (await this.isEpochFullyProven(epochNumber, l1Constants)) { - this.log.debug(`Skipping checkpoint ${checkpoint.number} for already-proven epoch ${epochNumber}`); + /** + * Walks every checkpoint between the local cursor and the newly-reported checkpointed tip, registering + * each one that belongs to an epoch that can still be proven. The block stream now delivers a single thin + * `chain-checkpointed` tip event per pass rather than one fat event per checkpoint, so this drives the + * catch-up itself: light metadata first (`getCheckpointsData`) to decide relevance per epoch, then a heavy + * `getCheckpoints` fetch only for checkpoints in provable epochs. + * + * The cursor advances one checkpoint at a time and only after that checkpoint's proving-side handling has + * fully succeeded, preserving the A-1041 at-least-once semantics: a mid-jump failure leaves the cursor + * behind so the next pass retries from the first checkpoint that did not complete. + */ + private async processCheckpointJump(targetCheckpoint: CheckpointNumber): Promise { + if (targetCheckpoint <= this.lastProcessedCheckpoint) { return; } + const l1Constants = await this.getL1Constants(); - if (await this.isEpochPastProofSubmissionWindow(epochNumber, l1Constants)) { - this.log.debug( - `Skipping checkpoint ${checkpoint.number} for epoch ${epochNumber} past its proof-submission window`, - ); - return; + // Cap the catch-up at the two most recent epochs' worth of checkpoints. With at most one checkpoint per + // slot, an epoch spans at most `epochDuration` checkpoints, so two epochs is `2 * epochDuration`. When the + // cursor is much further behind (e.g. resyncing after a long time offline), fetching the whole gap could + // load thousands of checkpoints we cannot act on: anything older than the last two epochs is already past + // its proof-submission window, so we skip it and jump the cursor forward to the start of the capped range. + const maxCheckpoints = 2 * l1Constants.epochDuration; + let from = CheckpointNumber(this.lastProcessedCheckpoint + 1); + if (Number(targetCheckpoint - from) + 1 > maxCheckpoints) { + const cappedFrom = CheckpointNumber(targetCheckpoint - maxCheckpoints + 1); + this.log.warn(`Skipping unprovable checkpoints during catch-up; the prover node is far behind`, { + from, + cappedFrom, + targetCheckpoint, + maxCheckpoints, + }); + // Advance the cursor past the skipped checkpoints so they are never retried. + this.lastProcessedCheckpoint = CheckpointNumber(cappedFrom - 1); + from = cappedFrom; } + const limit = Number(targetCheckpoint - from) + 1; + const metadatas = await this.l2BlockSource.getCheckpointsData({ from, limit }); + + // Per-epoch relevance is cached so a multi-checkpoint epoch resolves it once. Skipping is whole-epoch + // only: the SessionManager requires an epoch's checkpoints fully covered before it opens a session, so we + // never drop an individual checkpoint inside an epoch we will prove. + const epochSkippable = new Map(); + for (const metadata of metadatas) { + const epochNumber = getEpochAtSlot(metadata.header.slotNumber, l1Constants); + let skippable = epochSkippable.get(epochNumber); + if (skippable === undefined) { + skippable = + (await this.isEpochFullyProven(epochNumber, l1Constants)) || + (await this.isEpochPastProofSubmissionWindow(epochNumber, l1Constants)); + epochSkippable.set(epochNumber, skippable); + } + if (skippable) { + this.log.debug(`Skipping checkpoint ${metadata.checkpointNumber} for unprovable epoch ${epochNumber}`); + } else { + await this.registerCheckpoint(metadata.checkpointNumber, epochNumber); + } + // Advance only after the checkpoint's handling succeeded (or it was legitimately skipped). registerCheckpoint + // throws on failure, which leaves the cursor here for the next pass to retry (A-1041). + this.lastProcessedCheckpoint = metadata.checkpointNumber; + } + } + /** Heavy-fetch a single checkpoint, register it with the store, and notify the session manager. */ + private async registerCheckpoint(checkpointNumber: CheckpointNumber, epochNumber: EpochNumber): Promise { + const published = await this.l2BlockSource.getCheckpoint({ number: checkpointNumber }); + if (!published) { + throw new Error(`Checkpoint ${checkpointNumber} not found in block source during catch-up`); + } + const checkpoint = published.checkpoint; this.log.info(`New checkpoint ${checkpoint.number} for epoch ${epochNumber}`, { checkpointNumber: checkpoint.number, epochNumber, - slotNumber, + slotNumber: checkpoint.header.slotNumber, }); - const registerData = await this.collectRegisterData(checkpoint, publishedCheckpoint.attestations); + const registerData = await this.collectRegisterData(checkpoint, published.attestations); await this.checkpointStore.addOrUpdate(checkpoint, registerData); await this.sessionManager?.onCheckpointAdded(epochNumber); } @@ -298,6 +361,11 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra /** Mark every prover above the prune threshold as pruned and notify the session manager. */ private async handlePruneEvent(prunedCheckpoint: { number: CheckpointNumber; hash: string }) { this.log.warn(`Chain pruned to checkpoint ${prunedCheckpoint.number}`, { prunedCheckpoint }); + // Clamp the catch-up cursor down to the (post-prune) checkpointed tip so reprocessing resumes from the + // first checkpoint above the prune target rather than from a stale, now-orphaned cursor. + if (this.lastProcessedCheckpoint > prunedCheckpoint.number) { + this.lastProcessedCheckpoint = prunedCheckpoint.number; + } const affected = this.checkpointStore.markPrunedAfter(prunedCheckpoint.number); if (affected.length === 0) { return; @@ -411,12 +479,22 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra const { startingBlock, lastFullyProvenEpoch } = await this.computeStartupState(); this.lastExpiredEpoch = lastFullyProvenEpoch; + this.lastProcessedCheckpoint = await this.computeStartingCheckpoint(lastFullyProvenEpoch); this.blockStream = new L2BlockStream(this.l2BlockSource, this.tipsStore, this, this.log, { pollIntervalMS: this.config.proverNodePollingIntervalMs, startingBlock, }); this.blockStream.start(); + // With thin once-per-pass tip events, the expiry sweep no longer fires once per checkpoint; drive it + // from a periodic tick so epochs still expire during idle/no-event periods. + this.expiryTicker = new RunningPromise( + () => this.checkEpochExpiry(), + this.log, + this.config.proverNodePollingIntervalMs, + ); + this.expiryTicker.start(); + await this.rewardsMetrics.start(); this.l1Metrics.start(); this.log.info(`Started Prover Node with prover id ${this.prover.getProverId().toString()}`, this.config); @@ -426,6 +504,7 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra this.log.info('Stopping ProverNode'); this.jobMetrics.stopObservingState(); await this.blockStream?.stop(); + await this.expiryTicker?.stop(); if (this.sessionManager) { await this.sessionManager.stop(); } @@ -586,6 +665,19 @@ export class ProverNode implements L2BlockStreamEventHandler, ProverNodeApi, Tra return { startingBlock: firstBlockOfEpoch, lastFullyProvenEpoch }; } + /** + * Resolves the catch-up cursor seed: the last checkpoint of the last fully-proven epoch, or 0 if none. Seeding + * from a checkpoint (rather than a checkpointed tip) guarantees a restart reprocesses every checkpoint of the + * partially-proven epoch, since the checkpointed tip can sit ahead of the last fully-proven checkpoint. + */ + protected async computeStartingCheckpoint(lastFullyProvenEpoch: EpochNumber | undefined): Promise { + if (lastFullyProvenEpoch === undefined) { + return CheckpointNumber.ZERO; + } + const checkpoints = await this.l2BlockSource.getCheckpointsData({ epoch: lastFullyProvenEpoch }); + return checkpoints.at(-1)?.checkpointNumber ?? CheckpointNumber.ZERO; + } + private async gatherPreviousBlockHeader(previousBlockNumber: number) { const data = await this.l2BlockSource.getBlockData({ number: BlockNumber(previousBlockNumber) }); if (!data?.header) { diff --git a/yarn-project/pxe/src/block_synchronizer/block_stream_source.ts b/yarn-project/pxe/src/block_synchronizer/block_stream_source.ts index 2009d350b2a1..cbd6ed780cb2 100644 --- a/yarn-project/pxe/src/block_synchronizer/block_stream_source.ts +++ b/yarn-project/pxe/src/block_synchronizer/block_stream_source.ts @@ -1,24 +1,15 @@ -import { Fr } from '@aztec/foundation/curves/bn254'; -import { - type BlockData, - type BlockQuery, - type BlocksQuery, - type CheckpointsQuery, - L2Block, - type L2BlockSource, -} from '@aztec/stdlib/block'; -import { Checkpoint, L1PublishedData, PublishedCheckpoint } from '@aztec/stdlib/checkpoint'; +import { type BlockData, type BlockQuery, type BlocksQuery, L2Block, type L2BlockSource } from '@aztec/stdlib/block'; import type { AztecNode } from '@aztec/stdlib/interfaces/client'; /** * Lifts an {@link AztecNode} RPC client into the shape {@link L2BlockStream} expects. - * `getBlocks` requests transaction bodies so that real `L2Block` instances can be constructed; - * `getCheckpoints` requests blocks + L1 info + attestations so that `PublishedCheckpoint` - * instances are fully populated. + * `getBlocks` requests transaction bodies so that real `L2Block` instances can be constructed. The stream no + * longer fetches checkpoint payloads (the `chain-checkpointed` event is a thin tip event), so `getCheckpoints` + * is not part of the lifted shape. */ export function blockStreamSourceFromAztecNode( node: AztecNode, -): Pick { +): Pick { return { getL2Tips: async () => { const tips = await node.getChainTips(); @@ -50,32 +41,5 @@ export function blockStreamSourceFromAztecNode( const responses = await node.getBlocks(query.from, query.limit, { includeTransactions: true }); return responses.map(r => new L2Block(r.archive, r.header, r.body!, r.checkpointNumber, r.indexWithinCheckpoint)); }, - - async getCheckpoints(query: CheckpointsQuery): Promise { - if (!('from' in query)) { - throw new Error('getCheckpoints with epoch query not supported via AztecNode RPC'); - } - const { from, limit } = query; - const responses = await node.getCheckpoints(from, limit, { - includeBlocks: true, - includeTransactions: true, - includeL1PublishInfo: true, - includeAttestations: true, - }); - return responses.map(r => { - const checkpoint = new Checkpoint( - r.archive, - r.header, - r.blocks!.map(b => new L2Block(b.archive, b.header, b.body!, b.checkpointNumber, b.indexWithinCheckpoint)), - r.number, - r.feeAssetPriceModifier, - ); - const l1 = - r.l1?.published === true - ? new L1PublishedData(r.l1.blockNumber, r.l1.timestamp, r.l1.blockHash) - : new L1PublishedData(0n, 0n, Fr.ZERO.toString()); - return new PublishedCheckpoint(checkpoint, l1, r.attestations ?? []); - }); - }, }; } diff --git a/yarn-project/pxe/src/block_synchronizer/block_synchronizer.test.ts b/yarn-project/pxe/src/block_synchronizer/block_synchronizer.test.ts index bbd6d2ae9729..c15898ba2109 100644 --- a/yarn-project/pxe/src/block_synchronizer/block_synchronizer.test.ts +++ b/yarn-project/pxe/src/block_synchronizer/block_synchronizer.test.ts @@ -17,7 +17,6 @@ import { makeL2BlockId, makeL2CheckpointId, } from '@aztec/stdlib/block'; -import { Checkpoint, L1PublishedData, PublishedCheckpoint } from '@aztec/stdlib/checkpoint'; import type { AztecNode, BlockResponse } from '@aztec/stdlib/interfaces/client'; import { NoteDao, NoteStatus } from '@aztec/stdlib/note'; import { TxHash } from '@aztec/stdlib/tx'; @@ -376,24 +375,53 @@ describe('BlockSynchronizer', () => { const initialBlock = await L2Block.random(BlockNumber(0)); await anchorBlockStore.setHeader(initialBlock.header); - // Create a checkpoint with a block + // The checkpointed tip block, fetched by hash from the node when the thin event arrives. const checkpointBlock = await L2Block.random(BlockNumber(1)); - const checkpoint = await Checkpoint.random(CheckpointNumber(1), { numBlocks: 1 }); - // Replace the random block with our known block - checkpoint.blocks[0] = checkpointBlock; - - const publishedCheckpoint = new PublishedCheckpoint(checkpoint, L1PublishedData.random(), []); + const checkpointBlockHash = await checkpointBlock.hash(); + aztecNode.getBlockData.mockImplementation(query => + Promise.resolve( + query instanceof BlockHash && query.equals(checkpointBlockHash) + ? ({ + header: checkpointBlock.header, + archive: checkpointBlock.archive, + blockHash: checkpointBlockHash, + checkpointNumber: checkpointBlock.checkpointNumber, + indexWithinCheckpoint: checkpointBlock.indexWithinCheckpoint, + } as BlockData) + : undefined, + ), + ); await synchronizer.handleBlockStreamEvent({ type: 'chain-checkpointed', - checkpoint: publishedCheckpoint, - block: { number: BlockNumber(1), hash: '0x456' }, + block: { number: BlockNumber(1), hash: checkpointBlockHash.toString() }, + checkpoint: makeL2CheckpointId(CheckpointNumber(1), Fr.random().toString()), }); const obtainedHeader = await anchorBlockStore.getBlockHeader(); expect(obtainedHeader.equals(checkpointBlock.header)).toBe(true); }); + it('skips the anchor update on chain-checkpointed when the block was reorged out (missing by hash)', async () => { + synchronizer = createSynchronizer({ syncChainTip: 'checkpointed' }); + + const initialBlock = await L2Block.random(BlockNumber(0)); + await anchorBlockStore.setHeader(initialBlock.header); + + // The node no longer serves the checkpointed block at that hash (transient reorg). + aztecNode.getBlockData.mockResolvedValue(undefined); + + await synchronizer.handleBlockStreamEvent({ + type: 'chain-checkpointed', + block: { number: BlockNumber(1), hash: Fr.random().toString() }, + checkpoint: makeL2CheckpointId(CheckpointNumber(1), Fr.random().toString()), + }); + + // Anchor is left untouched; a later event corrects it. + const obtainedHeader = await anchorBlockStore.getBlockHeader(); + expect(obtainedHeader.equals(initialBlock.header)).toBe(true); + }); + it('does not update anchor on chain-checkpointed when syncChainTip is proposed', async () => { synchronizer = createSynchronizer({ syncChainTip: 'proposed' }); @@ -401,14 +429,10 @@ describe('BlockSynchronizer', () => { const initialBlock = await L2Block.random(BlockNumber(1)); await synchronizer.handleBlockStreamEvent({ type: 'blocks-added', blocks: [initialBlock] }); - // Create a different checkpoint - const checkpoint = await Checkpoint.random(CheckpointNumber(1), { numBlocks: 1 }); - const publishedCheckpoint = new PublishedCheckpoint(checkpoint, L1PublishedData.random(), []); - await synchronizer.handleBlockStreamEvent({ type: 'chain-checkpointed', - checkpoint: publishedCheckpoint, block: { number: BlockNumber(1), hash: '0x456' }, + checkpoint: makeL2CheckpointId(CheckpointNumber(1), Fr.random().toString()), }); // Anchor should still be the initial block, not the checkpoint block diff --git a/yarn-project/pxe/src/block_synchronizer/block_synchronizer.ts b/yarn-project/pxe/src/block_synchronizer/block_synchronizer.ts index f0e0735f9a0d..3bfdc6141def 100644 --- a/yarn-project/pxe/src/block_synchronizer/block_synchronizer.ts +++ b/yarn-project/pxe/src/block_synchronizer/block_synchronizer.ts @@ -74,9 +74,17 @@ export class BlockSynchronizer implements L2BlockStreamEventHandler { } case 'chain-checkpointed': { if (this.config.syncChainTip === 'checkpointed') { - // Get the last block header from the checkpoint - const lastBlock = event.checkpoint.checkpoint.blocks.at(-1)!; - await this.updateAnchorBlockHeader(lastBlock.header); + // Fetch the checkpointed tip header by hash. By-hash is safer than by-number against a + // same-height reorg; a missing result means the block was reorged out between the event + // and this fetch, so we skip the anchor update and let a later event correct it. + const block = await this.node.getBlockData(BlockHash.fromString(event.block.hash)); + if (block) { + await this.updateAnchorBlockHeader(block.header); + } else { + this.log.warn( + `Block header not found for checkpointed block ${event.block.number}, skipping anchor update`, + ); + } } break; } diff --git a/yarn-project/pxe/src/storage/backwards_compatibility_tests/schema_tests.ts b/yarn-project/pxe/src/storage/backwards_compatibility_tests/schema_tests.ts index e6e6fc8ac71d..9b9f037de3e8 100644 --- a/yarn-project/pxe/src/storage/backwards_compatibility_tests/schema_tests.ts +++ b/yarn-project/pxe/src/storage/backwards_compatibility_tests/schema_tests.ts @@ -261,7 +261,10 @@ export const SCHEMA_TESTS: readonly SchemaTest[] = [ await l2TipsStore.handleBlockStreamEvent({ type: 'chain-checkpointed', block: { number: BlockNumber(71), hash: new Fr(73n).toString() }, - checkpoint: publishedCheckpoint, + checkpoint: { + number: publishedCheckpoint.checkpoint.number, + hash: publishedCheckpoint.checkpoint.hash().toString(), + }, }); // `'chain-proven'` writes the 'proven' tag. `'finalized'` is omitted because its handler runs delete-before // logic that would depend on the order of preceding events. diff --git a/yarn-project/stdlib/src/block/l2_block_stream/interfaces.ts b/yarn-project/stdlib/src/block/l2_block_stream/interfaces.ts index e9e13deca97a..6ed09b019739 100644 --- a/yarn-project/stdlib/src/block/l2_block_stream/interfaces.ts +++ b/yarn-project/stdlib/src/block/l2_block_stream/interfaces.ts @@ -1,6 +1,5 @@ import type { BlockNumber } from '@aztec/foundation/branded-types'; -import type { PublishedCheckpoint } from '../../checkpoint/published_checkpoint.js'; import type { L2Block } from '../l2_block.js'; import type { CheckpointId, L2BlockId, L2TipId, LocalL2Tips } from '../l2_block_source.js'; @@ -21,7 +20,7 @@ export type LocalL2BlockId = { number: BlockNumber; hash?: string }; */ export type LocalChainTips = { proposed: LocalL2BlockId; - checkpointed?: { checkpoint: CheckpointId }; + checkpointed?: { block: LocalL2BlockId; checkpoint: CheckpointId }; proven: { block: LocalL2BlockId }; finalized: { block: LocalL2BlockId }; }; @@ -46,10 +45,14 @@ export type L2BlockStreamEvent = type: 'blocks-added'; blocks: L2Block[]; } - | /** Emits checkpoints published to L1. */ { + | /** + * Reports a new checkpointed tip. Emitted at most once per sync pass when the source's checkpointed tip + * leads the local one. Carries only the block + checkpoint ids; consumers that need the full checkpoint + * payload fetch it on demand from the block source. + */ { type: 'chain-checkpointed'; - checkpoint: PublishedCheckpoint; block: L2BlockId; + checkpoint: CheckpointId; } | /** * Reports last correct block (new tip of the proposed chain). Note that this is not necessarily the anchor block diff --git a/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.test.ts b/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.test.ts index 1b17a151c4f5..a123e9160682 100644 --- a/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.test.ts +++ b/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.test.ts @@ -3,22 +3,14 @@ import { compactArray } from '@aztec/foundation/collection'; import { Fr } from '@aztec/foundation/curves/bn254'; import type { Logger } from '@aztec/foundation/log'; -import { jest } from '@jest/globals'; import { type MockProxy, mock } from 'jest-mock-extended'; import times from 'lodash.times'; -import type { PublishedCheckpoint } from '../../checkpoint/published_checkpoint.js'; import type { BlockHeader } from '../../tx/block_header.js'; import type { BlockData } from '../block_data.js'; -import { BlockHash, GENESIS_BLOCK_HEADER_HASH } from '../block_hash.js'; +import { BlockHash } from '../block_hash.js'; import type { L2Block } from '../l2_block.js'; -import { - type BlocksQuery, - GENESIS_CHECKPOINT_HEADER_HASH, - type L2BlockId, - type L2BlockSource, - type LocalL2Tips, -} from '../l2_block_source.js'; +import type { BlocksQuery, L2BlockId, L2BlockSource, LocalL2Tips } from '../l2_block_source.js'; import type { L2BlockStreamEvent, L2BlockStreamEventHandler, @@ -32,7 +24,6 @@ describe('L2BlockStream', () => { let blockSource: MockProxy; let latest: number = 0; - let checkpointed: number = 0; const makeHash = (number: number) => new Fr(number).toString(); @@ -43,15 +34,6 @@ describe('L2BlockStream', () => { indexWithinCheckpoint: 0, }) as L2Block; - /** Makes a block with hash method (for use in mocks that need hash) */ - const makeBlockWithHash = (number: number) => - ({ - number: BlockNumber(number), - checkpointNumber: CheckpointNumber(number), - indexWithinCheckpoint: 0, - hash: () => Promise.resolve(new BlockHash(new Fr(number))), - }) as L2Block; - const makeBlockData = (number: number, checkpointNum: number): BlockData => ({ header: makeHeader(number), @@ -64,29 +46,6 @@ describe('L2BlockStream', () => { const makeBlockId = (number: number): L2BlockId => ({ number: BlockNumber(number), hash: makeHash(number) }); - /** Helper to match a blocks-added event with blocks that may have extra properties like hash */ - const expectBlocksAdded = (blockNumbers: number[]) => - expect.objectContaining({ - type: 'blocks-added', - blocks: blockNumbers.map(n => - expect.objectContaining({ - number: BlockNumber(n), - }), - ), - }); - - /** Helper to match a chain-checkpointed event */ - const expectCheckpointed = (checkpointNumber?: number) => - checkpointNumber !== undefined - ? expect.objectContaining({ - type: 'chain-checkpointed', - checkpoint: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: checkpointNumber }), - }), - block: expect.objectContaining({ number: expect.any(Number) }), - }) - : expect.objectContaining({ type: 'chain-checkpointed' }); - const makeCheckpointId = (number: number) => ({ number: CheckpointNumber(number), hash: makeHash(number) }); const makeTipId = (number: number) => ({ @@ -94,6 +53,13 @@ describe('L2BlockStream', () => { checkpoint: { number: CheckpointNumber(number), hash: makeHash(number) }, }); + /** A thin chain-checkpointed event for the source's checkpointed tip at `number`. */ + const checkpointedEvent = (number: number): L2BlockStreamEvent => ({ + type: 'chain-checkpointed', + block: makeBlockId(number), + checkpoint: makeCheckpointId(number), + }); + /** Sets the remote tips. All tips default to 0 except latest. */ const setRemoteTips = ( latest_: number, @@ -107,7 +73,6 @@ describe('L2BlockStream', () => { proven = proven ?? 0; finalized = finalized ?? 0; latest = latest_; - checkpointed = checkpointed_; blockSource.getL2Tips.mockResolvedValue({ proposed: { number: BlockNumber(latest), hash: makeHash(latest) }, @@ -121,7 +86,7 @@ describe('L2BlockStream', () => { beforeEach(() => { blockSource = mock(); - // Returns blocks up until what was reported as the latest block (for uncheckpointed blocks) + // Returns blocks up until what was reported as the latest block. blockSource.getBlocks.mockImplementation((query: BlocksQuery) => 'from' in query ? Promise.resolve( @@ -137,31 +102,6 @@ describe('L2BlockStream', () => { } return Promise.resolve(query.number > latest ? undefined : makeBlockData(query.number, query.number)); }); - - // Returns published checkpoints - each checkpoint contains just the one block for simplicity - // Respects the limit parameter and returns up to `limit` checkpoints - blockSource.getCheckpoints.mockImplementation(query => { - if (!('from' in query)) { - return Promise.resolve([]); - } - const { from: checkpointNumber, limit } = query; - return Promise.resolve( - compactArray( - times(limit, i => { - const cpNum = checkpointNumber + i; - return cpNum > checkpointed - ? undefined - : ({ - checkpoint: { - number: cpNum, - hash: () => new Fr(cpNum), - blocks: [makeBlockWithHash(cpNum)], - }, - } as unknown as PublishedCheckpoint); - }), - ), - ); - }); }); describe('with mock local data provider', () => { @@ -172,10 +112,7 @@ describe('L2BlockStream', () => { beforeEach(() => { localData = new TestL2BlockStreamLocalDataProvider(); handler = new TestL2BlockStreamEventHandler(); - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - checkpointPrefetchLimit: 1, - }); + blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { batchSize: 10 }); }); it('pulls new blocks from start', async () => { @@ -267,50 +204,41 @@ describe('L2BlockStream', () => { ] satisfies L2BlockStreamEvent[]); }); - it('fetches checkpointed blocks and emits chain-checkpointed events', async () => { - // All blocks are checkpointed (checkpointed=5, proposed=5) + it('emits a single chain-checkpointed event carrying the source checkpointed tip', async () => { + // All blocks are checkpointed (checkpointed=5, proposed=5). Download all 5 blocks in one batch, then a + // single thin checkpointed event for the source tip. setRemoteTips(5, 5); await blockStream.work(); - // Each checkpointed block triggers a blocks-added and chain-checkpointed event - // (since each checkpoint contains one block in our mock) expect(handler.events).toEqual([ - expectBlocksAdded([1]), - expectCheckpointed(), - expectBlocksAdded([2]), - expectCheckpointed(), - expectBlocksAdded([3]), - expectCheckpointed(), - expectBlocksAdded([4]), - expectCheckpointed(), - expectBlocksAdded([5]), - expectCheckpointed(), + { type: 'blocks-added', blocks: times(5, i => makeBlock(i + 1)) }, + checkpointedEvent(5), ]); - // 2 calls: one for block 0 in reorg detection (hash compare at genesis), one for block 1 in loop 2. - expect(blockSource.getBlockData).toHaveBeenCalledTimes(2); - expect(blockSource.getBlocks).not.toHaveBeenCalled(); + // No checkpoint payloads are fetched anymore. + expect(blockSource.getBlocks).toHaveBeenCalledWith({ from: BlockNumber(1), limit: 5 }); }); - it('fetches checkpointed blocks first, then uncheckpointed blocks', async () => { - // Blocks 1-3 are checkpointed, blocks 4-5 are uncheckpointed + it('emits checkpointed once even when the checkpointed tip trails the proposed tip', async () => { + // Blocks 1-3 checkpointed, blocks 4-5 uncheckpointed. setRemoteTips(5, 3); await blockStream.work(); - // First 3 blocks come via checkpoints, last 2 via getBlocks + // Download all 5 blocks, then a single checkpointed event for checkpoint 3. expect(handler.events).toEqual([ - expectBlocksAdded([1]), - expectCheckpointed(), - expectBlocksAdded([2]), - expectCheckpointed(), - expectBlocksAdded([3]), - expectCheckpointed(), - expectBlocksAdded([4, 5]), + { type: 'blocks-added', blocks: times(5, i => makeBlock(i + 1)) }, + checkpointedEvent(3), ]); - // 2 calls: one for block 0 in reorg detection (hash compare at genesis), one for block 1 in loop 2. - expect(blockSource.getBlockData).toHaveBeenCalledTimes(2); - expect(blockSource.getBlocks).toHaveBeenCalledWith({ from: BlockNumber(4), limit: 2 }); + }); + + it('does not re-emit the checkpointed event once the local tip matches the source', async () => { + setRemoteTips(5, 5); + localData.setProposed(5); + localData.setCheckpointed(5, 5); + + await blockStream.work(); + expect(handler.events.filter(e => e.type === 'chain-checkpointed')).toEqual([]); }); it('handles reorg with uncheckpointed reason when pruned to checkpointed tip', async () => { @@ -346,22 +274,60 @@ describe('L2BlockStream', () => { // The reorg-search loop must NOT walk past block 0; it should throw a clear error // pointing at the genesis-hash mismatch instead of cascading into "block hash not found - // for -1" further down. The error is caught and logged by `work` rather than rethrown, - // so we assert via the logged error and ensure no events were emitted. - const errorSpy = jest.spyOn( - (blockStream as unknown as { log: { error: (...args: any[]) => void } }).log, - 'error', - ); + // for -1" further down. The error is caught and logged by `work` rather than rethrown. + const log = mock(); + blockStream = new TestL2BlockStream(blockSource, localData, handler, log, { batchSize: 10 }); await blockStream.work(); expect(handler.events).toEqual([]); - expect(errorSpy).toHaveBeenCalledWith( - expect.stringContaining('Error processing block stream'), - expect.objectContaining({ - message: expect.stringContaining('Genesis block hash mismatch'), - }), - ); + expect(log.error).toHaveBeenCalledWith(expect.stringContaining('Genesis block hash mismatch'), expect.anything()); + }); + }); + + describe('A-1061 regression: startingBlock past the source checkpointed tip', () => { + let localData: TestL2BlockStreamLocalDataProvider; + let handler: TestL2BlockStreamEventHandler; + let blockStream: TestL2BlockStream; + + beforeEach(() => { + localData = new TestL2BlockStreamLocalDataProvider(); + handler = new TestL2BlockStreamEventHandler(); + }); + + it('emits the source checkpointed tip on the FIRST pass even when startingBlock is past it', async () => { + // Source: checkpointed=30, proven=25, proposed=35. The node restarts with startingBlock=33 (past the + // checkpointed tip of block 30). Pre-rewrite, the startingBlock fast-forward suppressed all checkpoint + // emission, leaving the local checkpointed cursor stuck at genesis while proven still advanced. + setRemoteTips(35, 30, 25, 10); + blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { + batchSize: 10, + startingBlock: 33, + }); + + await blockStream.work(); + + const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); + expect(checkpointEvents).toEqual([checkpointedEvent(30)]); + + // proven is resolvable (block 25), and the checkpointed cursor is NOT stuck at genesis. + const provenEvents = handler.events.filter(e => e.type === 'chain-proven'); + expect(provenEvents).toEqual([ + { type: 'chain-proven', block: makeBlockId(25), checkpoint: makeCheckpointId(25) }, + ]); + }); + + it('downloads blocks from startingBlock, not from genesis', async () => { + setRemoteTips(35, 30, 25, 10); + blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { + batchSize: 10, + startingBlock: 33, + }); + + await blockStream.work(); + + // First block download begins at startingBlock (33), skipping 1..32. + expect(blockSource.getBlocks).toHaveBeenCalledWith({ from: BlockNumber(33), limit: 3 }); }); }); @@ -384,13 +350,12 @@ describe('L2BlockStream', () => { startingBlock: 30, }); - // We first seed a few blocks into the blockstream - // Block 30 comes via checkpoint, blocks 31-35 via uncheckpointed + // We first seed a few blocks into the blockstream: blocks 30-35 (startingBlock=30), plus the + // checkpointed/proven/finalized reconciliation events. await blockStream.work(); expect(handler.events).toEqual([ - expectBlocksAdded([30]), - expectCheckpointed(30), - { type: 'blocks-added', blocks: times(5, i => makeBlock(i + 31)) }, + { type: 'blocks-added', blocks: times(6, i => makeBlock(i + 30)) }, + checkpointedEvent(30), { type: 'chain-proven', block: makeBlockId(25), checkpoint: makeCheckpointId(25) }, { type: 'chain-finalized', block: makeBlockId(10), checkpoint: makeCheckpointId(10) }, ]); @@ -404,19 +369,16 @@ describe('L2BlockStream', () => { ]); }); - // Regression test for the checkpoint-replay storm: pruning to an uncheckpointed block ahead of - // the checkpointed tip must not reset the checkpointed cursor, otherwise the next work() replays - // every checkpoint from 1 to the source tip. - it('does not replay checkpoints after pruning to an uncheckpointed block ahead of the checkpointed tip', async () => { - // Sync blocks 1-7: blocks 1-5 are checkpointed (checkpoints 1-5), blocks 6-7 uncheckpointed. + // Regression: pruning to an uncheckpointed block ahead of the checkpointed tip must not reset the + // checkpointed cursor, otherwise the next work() re-emits the checkpointed tip. + it('does not re-emit the checkpointed tip after pruning to a block ahead of it', async () => { + // Sync blocks 1-7: blocks 1-5 are checkpointed (checkpoint 5), blocks 6-7 uncheckpointed. setRemoteTips(7, 5); await blockStream.work(); - const checkpointEventsOnSync = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEventsOnSync).toHaveLength(5); + expect(handler.events.filter(e => e.type === 'chain-checkpointed')).toEqual([checkpointedEvent(5)]); handler.clearEvents(); // Source drops its proposed tip to block 6 (uncheckpointed, still ahead of checkpointed=5). - // The stream prunes the local proposed tip from 7 back to 6. setRemoteTips(6, 5); await blockStream.work(); expect(handler.events).toEqual([ @@ -424,1231 +386,161 @@ describe('L2BlockStream', () => { ]); handler.clearEvents(); - // The next sync must NOT re-emit any chain-checkpointed events: the checkpointed cursor was - // left at block 5 / checkpoint 5, so there is nothing to replay. + // The next sync must NOT re-emit a chain-checkpointed event: the checkpointed cursor was left at + // block 5 / checkpoint 5. await blockStream.work(); expect(handler.events.filter(e => e.type === 'chain-checkpointed')).toEqual([]); }); - }); - - describe('multiple blocks per checkpoint', () => { - let localData: TestL2BlockStreamLocalDataProvider; - let handler: TestL2BlockStreamEventHandler; - let blockStream: TestL2BlockStream; - - // Configuration for checkpoint structure: each checkpoint contains 3 blocks - const blocksPerCheckpoint = 3; - - /** Gets the checkpoint number for a given block number */ - const getCheckpointForBlock = (blockNum: number) => Math.ceil(blockNum / blocksPerCheckpoint); - - /** Gets the first block number in a checkpoint */ - const getFirstBlockInCheckpoint = (checkpointNum: number) => (checkpointNum - 1) * blocksPerCheckpoint + 1; - - /** Gets the last block number in a checkpoint */ - const getLastBlockInCheckpoint = (checkpointNum: number) => checkpointNum * blocksPerCheckpoint; - - /** Makes a block with hash method (for use in mocks that need hash) */ - const makeBlockInCheckpointWithHash = (blockNum: number) => { - const checkpointNum = getCheckpointForBlock(blockNum); - const firstBlockInCheckpoint = getFirstBlockInCheckpoint(checkpointNum); - return { - number: BlockNumber(blockNum), - checkpointNumber: CheckpointNumber(checkpointNum), - indexWithinCheckpoint: blockNum - firstBlockInCheckpoint, - hash: () => Promise.resolve(new BlockHash(new Fr(blockNum))), - } as L2Block; - }; - - /** Makes block data for a checkpointed block */ - const makeBlockDataInCheckpoint = (blockNum: number): BlockData => - ({ - header: makeHeader(blockNum), - checkpointNumber: CheckpointNumber(getCheckpointForBlock(blockNum)), - indexWithinCheckpoint: blockNum - getFirstBlockInCheckpoint(getCheckpointForBlock(blockNum)), - }) as unknown as BlockData; - - /** Sets the remote tips with correct checkpoint numbers for multi-block checkpoints. */ - const setRemoteTipsMultiBlock = ( - latest_: number, - checkpointedBlock?: number, - proven?: number, - finalized?: number, - proposedCheckpointBlock?: number, - ) => { - checkpointedBlock = checkpointedBlock ?? 0; - proven = proven ?? 0; - finalized = finalized ?? 0; - proposedCheckpointBlock = proposedCheckpointBlock ?? 0; - latest = latest_; - checkpointed = checkpointedBlock; - - const checkpointedCheckpointNum = checkpointedBlock > 0 ? getCheckpointForBlock(checkpointedBlock) : 0; - const provenCheckpointNum = proven > 0 ? getCheckpointForBlock(proven) : 0; - const finalizedCheckpointNum = finalized > 0 ? getCheckpointForBlock(finalized) : 0; - const proposedCheckpointNum = proposedCheckpointBlock > 0 ? getCheckpointForBlock(proposedCheckpointBlock) : 0; - - blockSource.getL2Tips.mockResolvedValue({ - proposed: { number: BlockNumber(latest), hash: makeHash(latest) }, - checkpointed: { - block: { number: BlockNumber(checkpointedBlock), hash: makeHash(checkpointedBlock) }, - checkpoint: { - number: CheckpointNumber(checkpointedCheckpointNum), - hash: makeHash(checkpointedCheckpointNum), - }, - }, - proposedCheckpoint: { - block: { number: BlockNumber(proposedCheckpointBlock), hash: makeHash(proposedCheckpointBlock) }, - checkpoint: { - number: CheckpointNumber(proposedCheckpointNum), - hash: makeHash(proposedCheckpointNum), - }, - }, - proven: { - block: { number: BlockNumber(proven), hash: makeHash(proven) }, - checkpoint: { number: CheckpointNumber(provenCheckpointNum), hash: makeHash(provenCheckpointNum) }, - }, - finalized: { - block: { number: BlockNumber(finalized), hash: makeHash(finalized) }, - checkpoint: { number: CheckpointNumber(finalizedCheckpointNum), hash: makeHash(finalizedCheckpointNum) }, - }, - }); - }; - - beforeEach(() => { - localData = new TestL2BlockStreamLocalDataProvider(); - handler = new TestL2BlockStreamEventHandler(); - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { batchSize: 10 }); - - // Override the mock to support multiple blocks per checkpoint - blockSource.getBlockData.mockImplementation(query => { - if (!('number' in query)) { - return Promise.resolve(undefined); - } - return Promise.resolve(query.number > latest ? undefined : makeBlockDataInCheckpoint(query.number)); - }); - - // Returns published checkpoints with multiple blocks each, respecting the limit parameter - blockSource.getCheckpoints.mockImplementation(query => { - if (!('from' in query)) { - return Promise.resolve([]); - } - const { from: checkpointNumber, limit } = query; - const checkpoints: PublishedCheckpoint[] = []; - for (let i = 0; i < limit; i++) { - const cpNum = CheckpointNumber(checkpointNumber + i); - const firstBlock = getFirstBlockInCheckpoint(cpNum); - const lastBlock = getLastBlockInCheckpoint(cpNum); - // Only include checkpoints that are within the checkpointed range - if (lastBlock > checkpointed) { - break; - } - checkpoints.push({ - checkpoint: { - number: cpNum, - hash: () => new Fr(cpNum), - blocks: times(blocksPerCheckpoint, j => makeBlockInCheckpointWithHash(firstBlock + j)), - }, - } as unknown as PublishedCheckpoint); - } - return Promise.resolve(checkpoints); - }); - }); - - it('emits all blocks in a checkpoint before chain-checkpointed event', async () => { - // Set up: 6 blocks in 2 checkpoints (blocks 1-3 in checkpoint 1, blocks 4-6 in checkpoint 2) - setRemoteTipsMultiBlock(6, 6); + // prune + same-pass reconciliation: a prune walk-back and the catch-up tier events fire in one pass. + it('emits the prune event and the new tier events in the same pass', async () => { + // Sync up to a checkpointed/proven chain: proposed=9, checkpointed=9, proven=6, finalized=3. + setRemoteTips(9, 9, 6, 3); await blockStream.work(); + handler.clearEvents(); - // Should emit blocks 1-3, then checkpoint 1, then blocks 4-6, then checkpoint 2 - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - ]); - }); - - it('handles partial checkpoint at the end (uncheckpointed blocks)', async () => { - // Set up: 5 blocks total, but only first 3 are checkpointed (checkpoint 1 complete) - // Blocks 4-5 are uncheckpointed - setRemoteTipsMultiBlock(5, 3); - + // Reorg: the source drops its proposed/checkpointed tip to block 6 (the memory store still holds 7-9, + // which the source no longer serves) and finalized advances to 6 within the same snapshot. + setRemoteTips(6, 6, 6, 6); await blockStream.work(); - // Should emit checkpoint 1 blocks, then checkpoint event, then uncheckpointed blocks 4-5 - expect(handler.events).toEqual([expectBlocksAdded([1, 2, 3]), expectCheckpointed(1), expectBlocksAdded([4, 5])]); + // First the prune to block 6, then the finalized reconciliation event for the advanced finalized tip. + expect(handler.events[0]).toEqual({ + type: 'chain-pruned', + block: makeBlockId(6), + checkpointed: makeTipId(6), + proven: makeTipId(6), + }); + const finalizedEvents = handler.events.filter(e => e.type === 'chain-finalized'); + expect(finalizedEvents).toEqual([ + { type: 'chain-finalized', block: makeBlockId(6), checkpoint: makeCheckpointId(6) }, + ]); }); + }); - it('handles starting from middle of a checkpoint', async () => { - // Set up: 9 blocks in 3 checkpoints, but we start from block 5 (middle of checkpoint 2) - // Local has blocks 1-4, local checkpointed = 0 - setRemoteTipsMultiBlock(9, 9); - localData.proposed.number = BlockNumber(4); + describe('hash-gated tier reconciliation', () => { + // World-state-shaped provider: reports `undefined` block hashes for its proven/finalized tips. The + // reconciliation must skip the hash comparison so it does not re-emit on every poll. + class WorldStateShapedProvider implements L2BlockStreamLocalDataProvider { + public proposedNumber = BlockNumber.ZERO; + public provenNumber = BlockNumber.ZERO; + public finalizedNumber = BlockNumber.ZERO; - await blockStream.work(); + public getL2BlockHash(number: number): Promise { + return Promise.resolve(number > this.proposedNumber ? undefined : new Fr(number).toString()); + } - // Should first emit checkpoint 1 (blocks 1-4 already local) - // Then continue from block 5, which is in checkpoint 2 - // Blocks 5-6 complete checkpoint 2, then blocks 7-9 complete checkpoint 3 - expect(handler.events).toEqual([ - expectCheckpointed(1), // checkpoint 1 for already-local blocks 1-4 - expectBlocksAdded([5, 6]), - expectCheckpointed(2), // checkpoint 2 - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), // checkpoint 3 - ]); + public getL2Tips(): Promise { + return Promise.resolve({ + proposed: { number: this.proposedNumber, hash: new Fr(this.proposedNumber).toString() }, + // proven/finalized hashes are intentionally undefined, as world-state reports them. + proven: { block: { number: this.provenNumber } }, + finalized: { block: { number: this.finalizedNumber } }, + }); + } + } - // Verify checkpoint order - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(3); - expect((checkpointEvents[0] as any).checkpoint.checkpoint.number).toBe(CheckpointNumber(1)); - expect((checkpointEvents[1] as any).checkpoint.checkpoint.number).toBe(CheckpointNumber(2)); - expect((checkpointEvents[2] as any).checkpoint.checkpoint.number).toBe(CheckpointNumber(3)); - }); + it('does not re-emit proven/finalized when the local hash is undefined and numbers match', async () => { + const localData = new WorldStateShapedProvider(); + const handler = new TestL2BlockStreamEventHandler(); + const blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { + batchSize: 10, + ignoreCheckpoints: true, + }); - it('correctly identifies checkpoint number in chain-checkpointed events', async () => { - // Set up: 6 blocks in 2 checkpoints - setRemoteTipsMultiBlock(6, 6); + // Source proven/finalized are at the same numbers the local provider already tracks. + setRemoteTips(9, 0, 6, 3); + localData.proposedNumber = BlockNumber(9); + localData.provenNumber = BlockNumber(6); + localData.finalizedNumber = BlockNumber(3); await blockStream.work(); - // Extract the chain-checkpointed events - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(2); - expect((checkpointEvents[0] as any).checkpoint.checkpoint.number).toBe(CheckpointNumber(1)); - expect((checkpointEvents[1] as any).checkpoint.checkpoint.number).toBe(CheckpointNumber(2)); + // Numbers match and local hashes are undefined ⇒ no re-emission. + expect(handler.events.filter(e => e.type === 'chain-proven')).toEqual([]); + expect(handler.events.filter(e => e.type === 'chain-finalized')).toEqual([]); }); - it('handles many checkpoints with batching', async () => { - // Set up: 12 blocks in 4 checkpoints (3 blocks each), with batch size of 5 - // Batch size doesn't align with checkpoint boundaries, so the stream must - // respect checkpoint boundaries and emit events correctly - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { batchSize: 5 }); - setRemoteTipsMultiBlock(12, 12); - - await blockStream.work(); - - // Even though batch size is 5, checkpoint boundaries (every 3 blocks) take precedence - // Expected sequence: - // - Blocks 1-3 (checkpoint 1), then checkpoint 1 event - // - Blocks 4-6 (checkpoint 2), then checkpoint 2 event - // - Blocks 7-9 (checkpoint 3), then checkpoint 3 event - // - Blocks 10-12 (checkpoint 4), then checkpoint 4 event - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - expectBlocksAdded([10, 11, 12]), - expectCheckpointed(4), - ]); - }); + // Finding 3: a same-number, different-hash proven tip IS re-emitted. + it('re-emits the proven tip when numbers match but the known local hash differs', async () => { + const localData = new TestL2BlockStreamLocalDataProvider(); + const handler = new TestL2BlockStreamEventHandler(); + const blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { + batchSize: 10, + ignoreCheckpoints: true, + }); - it('does not emit more than batchSize blocks in a single blocks-added event for checkpointed blocks', async () => { - // Set up: 12 blocks in 4 checkpoints (3 blocks each), but batch size is 2 - // Each checkpoint has 3 blocks, but we should never emit more than 2 at once - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { batchSize: 2 }); - setRemoteTipsMultiBlock(12, 12); + setRemoteTips(9, 0, 6, 3); + localData.proposed.number = BlockNumber(9); + // Local proven sits at the same block number but a stale hash (e.g. a same-height reorg). + localData.proven.block.number = BlockNumber(6); + localData.proven.block.hash = '0xstale6'; + localData.finalized.block.number = BlockNumber(3); + localData.finalized.block.hash = makeHash(3); await blockStream.work(); - // Verify no blocks-added event has more than 2 blocks - const blocksAddedEvents = handler.events.filter(e => e.type === 'blocks-added'); - for (const event of blocksAddedEvents) { - if (event.type === 'blocks-added') { - expect(event.blocks.length).toBeLessThanOrEqual(2); - } - } - - // Expected sequence with batchSize=2: - // - Blocks 1-2, then blocks 3, then checkpoint 1 - // - Blocks 4-5, then block 6, then checkpoint 2 - // etc. - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2]), - expectBlocksAdded([3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5]), - expectBlocksAdded([6]), - expectCheckpointed(2), - expectBlocksAdded([7, 8]), - expectBlocksAdded([9]), - expectCheckpointed(3), - expectBlocksAdded([10, 11]), - expectBlocksAdded([12]), - expectCheckpointed(4), + expect(handler.events.filter(e => e.type === 'chain-proven')).toEqual([ + { type: 'chain-proven', block: makeBlockId(6), checkpoint: makeCheckpointId(6) }, ]); + // Finalized matched on both number and hash ⇒ not re-emitted. + expect(handler.events.filter(e => e.type === 'chain-finalized')).toEqual([]); }); + }); - it('emits checkpoint event when blocks become checkpointed after being added as uncheckpointed', async () => { - // Phase 1: Start with 3 checkpointed blocks (checkpoint 1), then add blocks 4-6 as uncheckpointed - setRemoteTipsMultiBlock(6, 3); - - await blockStream.work(); - - // Expect: blocks 1-3 via checkpoint, then uncheckpointed blocks 4-6 - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - ]); - - handler.clearEvents(); - - // Update local state to reflect what the handler would have stored - localData.proposed.number = BlockNumber(6); - localData.checkpointed.block.number = BlockNumber(3); - localData.checkpointed.checkpoint.number = CheckpointNumber(1); - - // Phase 2: Now checkpoint 2 completes (blocks 4-6 become checkpointed) - setRemoteTipsMultiBlock(6, 6); - - await blockStream.work(); + describe('ignoreCheckpoints', () => { + let localData: TestL2BlockStreamLocalDataProvider; + let handler: TestL2BlockStreamEventHandler; + let blockStream: TestL2BlockStream; - // Should emit a checkpoint event for checkpoint 2 (blocks 4-6), even though blocks were already added - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(1); - expect((checkpointEvents[0] as any).checkpoint.checkpoint.number).toBe(CheckpointNumber(2)); + beforeEach(() => { + localData = new TestL2BlockStreamLocalDataProvider(); + handler = new TestL2BlockStreamEventHandler(); + blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { + batchSize: 10, + ignoreCheckpoints: true, + }); }); - it('emits checkpoint event BEFORE new uncheckpointed blocks when checkpoint completes', async () => { - // Phase 1: Start with 3 checkpointed blocks (checkpoint 1), then add blocks 4-6 as uncheckpointed - setRemoteTipsMultiBlock(6, 3); - - await blockStream.work(); - - // Expect: blocks 1-3 via checkpoint, then uncheckpointed blocks 4-6 - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - ]); - - handler.clearEvents(); - - // Update local state to reflect what the handler would have stored - localData.proposed.number = BlockNumber(6); - localData.checkpointed.block.number = BlockNumber(3); - localData.checkpointed.checkpoint.number = CheckpointNumber(1); - - // Phase 2: Checkpoint 2 completes (blocks 4-6) AND a new block 7 arrives - setRemoteTipsMultiBlock(7, 6); + it('does not emit checkpoint events for new checkpointed blocks', async () => { + setRemoteTips(6, 6); await blockStream.work(); - // Should emit checkpoint 2 FIRST, then the new uncheckpointed block 7 - // NOT: block 7 first, then checkpoint 2 - expect(handler.events).toEqual([expectCheckpointed(2), expectBlocksAdded([7])]); + expect(handler.events).toEqual([{ type: 'blocks-added', blocks: times(6, i => makeBlock(i + 1)) }]); }); - it('emits checkpoint as soon as last block in checkpoint arrives', async () => { - // This tests the realistic scenario where checkpoints are published as blocks arrive. - // Uncheckpointed blocks are always just a partial checkpoint (the current incomplete one). - - // Sync 1: Source has checkpointed=6 (checkpoint 2), proposed=9 - // Client gets blocks 1-6 via checkpoints, blocks 7-9 as uncheckpointed - setRemoteTipsMultiBlock(9, 6); - + it('still emits prune events but no checkpoint events', async () => { + setRemoteTips(9, 9); await blockStream.work(); - - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - expectBlocksAdded([7, 8, 9]), // uncheckpointed - ]); - handler.clearEvents(); - // Update local state localData.proposed.number = BlockNumber(9); - localData.checkpointed.block.number = BlockNumber(6); - localData.checkpointed.checkpoint.number = CheckpointNumber(2); - - // Sync 2: Checkpoint 3 is now published (blocks 7-9), new blocks 10-12 are uncheckpointed - setRemoteTipsMultiBlock(12, 9); - - await blockStream.work(); - - // Should emit checkpoint 3 for already-local blocks 7-9, then uncheckpointed blocks 10-12 - expect(handler.events).toEqual([ - expectCheckpointed(3), - expectBlocksAdded([10, 11, 12]), // uncheckpointed - ]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(12); localData.checkpointed.block.number = BlockNumber(9); - localData.checkpointed.checkpoint.number = CheckpointNumber(3); - - // Sync 3: Checkpoint 4 is now published (blocks 10-12), no new blocks - setRemoteTipsMultiBlock(12, 12); - - await blockStream.work(); - - // Should emit checkpoint 4 for already-local blocks 10-12 - expect(handler.events).toEqual([expectCheckpointed(4)]); - }); - - it('emits all checkpoints when source jumps ahead with multiple new checkpoints', async () => { - // Phase 1: Start with checkpoint 1 complete (blocks 1-3), blocks 4-6 uncheckpointed - setRemoteTipsMultiBlock(6, 3); + localData.checkpointed.checkpoint.number = CheckpointNumber(9); + for (let i = 4; i <= 9; i++) { + localData.blockHashes[i] = `0xbad${i}`; + } + setRemoteTips(3, 3); await blockStream.work(); expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), + { type: 'chain-pruned', block: makeBlockId(3), checkpointed: makeTipId(3), proven: makeTipId(0) }, ]); + }); - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(6); - localData.checkpointed.block.number = BlockNumber(3); - localData.checkpointed.checkpoint.number = CheckpointNumber(1); - - // Phase 2: Source jumps to block 12 with checkpoints at 6, 9, and 12 - // - Checkpoint 2 (blocks 4-6) - blocks already local, needs checkpoint event - // - Checkpoint 3 (blocks 7-9) - new blocks + checkpoint event - // - Checkpoint 4 (blocks 10-12) - new blocks + checkpoint event - setRemoteTipsMultiBlock(12, 12); + it('still emits proven and finalized events', async () => { + setRemoteTips(9, 9, 6, 3); await blockStream.work(); - // Should emit: - // 1. Checkpoint 2 event (blocks 4-6 were already local) - // 2. Blocks 7-9 + checkpoint 3 event - // 3. Blocks 10-12 + checkpoint 4 event expect(handler.events).toEqual([ - expectCheckpointed(2), - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - expectBlocksAdded([10, 11, 12]), - expectCheckpointed(4), + { type: 'blocks-added', blocks: times(9, i => makeBlock(i + 1)) }, + { type: 'chain-proven', block: makeBlockId(6), checkpoint: makeCheckpointId(6) }, + { type: 'chain-finalized', block: makeBlockId(3), checkpoint: makeCheckpointId(3) }, ]); }); - - describe('startingBlock with stale checkpoint state', () => { - // When a node restarts with startingBlock set and has local blocks but no checkpoint - // state (e.g. checkpoint tracking is new, or checkpoint state was reset), Loop 1 - // should not spam checkpoint events for all historical checkpoints. - - it('skips historical checkpoint events before startingBlock on restart with stale checkpoint state', async () => { - // node has blocks 1-15 locally (proposed=15) but no checkpoint state. - // Checkpoint 5 covers blocks 13-15 (the last checkpoint). - setRemoteTipsMultiBlock(15, 15); - localData.proposed.number = BlockNumber(15); - // localData.checkpointed starts at 0 - simulating stale/missing checkpoint state - - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - startingBlock: 13, // start from checkpoint 5 (blocks 13-15) - }); - - await blockStream.work(); - - // Should only emit checkpoint 5 (the one containing startingBlock=13), not all 5 checkpoints - expect(handler.events).toEqual([expectCheckpointed(5)]); - // Verify we don't spam checkpoints 1-4 - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(1); - }); - - it('without startingBlock emits all historical checkpoints for already-local blocks', async () => { - // Same scenario without startingBlock: should emit all 5 checkpoints (correct catch-up behavior) - setRemoteTipsMultiBlock(15, 15); - localData.proposed.number = BlockNumber(15); - // localData.checkpointed starts at 0 - - await blockStream.work(); - - // All 5 checkpoints should be emitted since they're all for already-local blocks - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(5); - }); - - it('skips Loop 1 entirely when startingBlock is past the checkpointed tip', async () => { - // proposed=15, checkpointed=9 (ckpt 3 covers blocks 7-9). startingBlock=12 is past the - // checkpointed tip, in the proposed range. Loop 1 must skip without an RPC for block 12. - setRemoteTipsMultiBlock(15, 9); - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - startingBlock: 12, - }); - - await blockStream.work(); - - // No chain-checkpointed events because startingBlock is past the checkpointed tip. - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(0); - // Loop 1 must not query block 12 — past-the-tip is decided from sourceTips alone. - const loop1Calls = blockSource.getBlockData.mock.calls.filter( - c => 'number' in c[0] && (c[0] as { number: number }).number === 12, - ); - expect(loop1Calls).toHaveLength(0); - }); - - it('calls getBlockData for block 0 only for reorg detection, not checkpoint lookup, when startingBlock is 0', async () => { - // With startingBlock=0, the stream skips the checkpoint-number lookup (line 121 path) - // so getBlockData is called for block 0 only once: for the genesis reorg detection. - setRemoteTipsMultiBlock(15, 15); - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - startingBlock: 0, - }); - - await blockStream.work(); - - const calls = blockSource.getBlockData.mock.calls; - const block0Calls = calls.filter(c => 'number' in c[0] && (c[0] as { number: number }).number === 0); - // Only the genesis reorg-detection call — not an additional checkpoint-lookup call. - expect(block0Calls).toHaveLength(1); - }); - }); - - describe('checkpoint prefetching', () => { - it('prefetches multiple checkpoints in a single RPC call', async () => { - // Set up: 9 blocks in 3 checkpoints - setRemoteTipsMultiBlock(9, 9); - - // Create a stream with prefetch limit of 10 (will fetch all 3 checkpoints in one call) - // This also tests that we handle getting fewer checkpoints (3) than requested (10) - const prefetchStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - checkpointPrefetchLimit: 10, - }); - - await prefetchStream.work(); - - // Should have fetched all 3 checkpoints in a single call (Loop 2 makes 1 call with limit 10) - // Even though we requested 10, only 3 exist - verify we handle this correctly - const calls = blockSource.getCheckpoints.mock.calls; - const loop2Calls = calls.filter(([query]) => 'limit' in query && query.limit === 10); - expect(loop2Calls.length).toBe(1); - expect((loop2Calls[0][0] as { from: number }).from).toBe(1); // Starting from checkpoint 1 - - // All 3 checkpoints should be emitted correctly (not 10) - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(3); - - // Verify correct event order - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - ]); - }); - - it('prefetches checkpoints correctly when starting from an offset', async () => { - // Set up: 15 blocks in 5 checkpoints, but we already have blocks 1-6 locally (checkpoints 1-2) - setRemoteTipsMultiBlock(15, 15); - localData.proposed.number = BlockNumber(6); - localData.checkpointed.checkpoint.number = CheckpointNumber(2); - - // Create a stream with prefetch limit of 10 - const prefetchStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - checkpointPrefetchLimit: 10, - }); - - await prefetchStream.work(); - - // Loop 2 should start fetching from checkpoint 3 (block 7 is in checkpoint 3) - const calls = blockSource.getCheckpoints.mock.calls; - const loop2Calls = calls.filter(([query]) => 'limit' in query && query.limit === 10); - expect(loop2Calls.length).toBe(1); - expect((loop2Calls[0][0] as { from: number }).from).toBe(3); // Starting from checkpoint 3, not 1 - - // Should only emit blocks 7-15 and checkpoints 3-5 (not 1-2, those are already local) - expect(handler.events).toEqual([ - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - expectBlocksAdded([10, 11, 12]), - expectCheckpointed(4), - expectBlocksAdded([13, 14, 15]), - expectCheckpointed(5), - ]); - - // Verify only 3 new checkpoints emitted - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(3); - }); - - it('prefetches correctly when starting from middle of a checkpoint', async () => { - // Local has blocks 1-7: checkpoints 1-2 complete, block 7 is first block of checkpoint 3 - setRemoteTipsMultiBlock(15, 15); - localData.proposed.number = BlockNumber(7); - localData.checkpointed.checkpoint.number = CheckpointNumber(2); - - const prefetchStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - checkpointPrefetchLimit: 10, - }); - - await prefetchStream.work(); - - // Should start prefetching from checkpoint 3 - const calls = blockSource.getCheckpoints.mock.calls; - const loop2Calls = calls.filter(([query]) => 'limit' in query && query.limit === 10); - expect(loop2Calls.length).toBe(1); - expect((loop2Calls[0][0] as { from: number }).from).toBe(3); // Starting from checkpoint 3 - - // Should emit only blocks 8-9 from checkpoint 3 (block 7 is already local) - expect(handler.events).toEqual([ - expectBlocksAdded([8, 9]), // Rest of checkpoint 3 - expectCheckpointed(3), - expectBlocksAdded([10, 11, 12]), - expectCheckpointed(4), - expectBlocksAdded([13, 14, 15]), - expectCheckpointed(5), - ]); - - // Verify only 3 checkpoints emitted - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(3); - }); - - it('refills prefetch buffer when exhausted', async () => { - // Set up: 15 blocks in 5 checkpoints - setRemoteTipsMultiBlock(15, 15); - - // Create a stream with prefetch limit of 2 (will need 3 calls to fetch 5 checkpoints) - const prefetchStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - checkpointPrefetchLimit: 2, - }); - - await prefetchStream.work(); - - // Should have made 3 calls with limit 2 to fetch 5 checkpoints - const calls = blockSource.getCheckpoints.mock.calls; - const loop2Calls = calls.filter(([query]) => 'limit' in query && query.limit === 2); - expect(loop2Calls.length).toBe(3); // ceil(5/2) = 3 - expect((loop2Calls[0][0] as { from: number }).from).toBe(1); // First batch: checkpoints 1-2 - expect((loop2Calls[1][0] as { from: number }).from).toBe(3); // Second batch: checkpoints 3-4 - expect((loop2Calls[2][0] as { from: number }).from).toBe(5); // Third batch: checkpoint 5 - - // All 5 checkpoints should be emitted - const checkpointEvents = handler.events.filter(e => e.type === 'chain-checkpointed'); - expect(checkpointEvents).toHaveLength(5); - }); - - it('uses default prefetch limit when not specified', async () => { - // Set up: 9 blocks in 3 checkpoints - setRemoteTipsMultiBlock(9, 9); - - // Create a stream without specifying checkpointPrefetchLimit (should use default of 50) - const defaultPrefetchStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - }); - - await defaultPrefetchStream.work(); - - // Should have used default limit of 50 for Loop 2 calls - const calls = blockSource.getCheckpoints.mock.calls; - const loop2Calls = calls.filter(([query]) => 'limit' in query && query.limit === 50); - expect(loop2Calls.length).toBeGreaterThanOrEqual(1); - }); - }); - - describe('prune scenarios', () => { - it('prunes proposed chain back to checkpointed tip, then continues', async () => { - // Phase 1: Sync blocks 1-9 with checkpoints 1-2, blocks 7-9 uncheckpointed - setRemoteTipsMultiBlock(9, 6); - - await blockStream.work(); - - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - expectBlocksAdded([7, 8, 9]), // uncheckpointed - ]); - - handler.clearEvents(); - - // Update local state to reflect what the handler stored - localData.proposed.number = BlockNumber(9); - localData.checkpointed.block.number = BlockNumber(6); - localData.checkpointed.checkpoint.number = CheckpointNumber(2); - - // Phase 2: Prune - proposed chain pruned back to checkpointed tip (block 6) - // This happens when uncheckpointed blocks (7-9) are invalid - // Mess up hashes for blocks 7-9 to simulate reorg - localData.blockHashes[7] = '0xbad7'; - localData.blockHashes[8] = '0xbad8'; - localData.blockHashes[9] = '0xbad9'; - - // Source now has proposed=6, checkpointed=6 - setRemoteTipsMultiBlock(6, 6); - - await blockStream.work(); - - // Should emit chain-pruned back to block 6 - expect(handler.events).toEqual([ - { - type: 'chain-pruned', - block: makeBlockId(6), - checkpointed: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(2) }), - }), - proven: expect.objectContaining({ - block: expect.objectContaining({ number: BlockNumber(0) }), - }), - }, - ]); - - handler.clearEvents(); - - // Update local state after prune - localData.proposed.number = BlockNumber(6); - delete localData.blockHashes[7]; - delete localData.blockHashes[8]; - delete localData.blockHashes[9]; - - // Phase 3: Chain continues - new blocks 7-12 arrive with checkpoints 3-4 - setRemoteTipsMultiBlock(12, 12); - - await blockStream.work(); - - // Should continue normally: blocks 7-9 + checkpoint 3, blocks 10-12 + checkpoint 4 - expect(handler.events).toEqual([ - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - expectBlocksAdded([10, 11, 12]), - expectCheckpointed(4), - ]); - }); - - it('prunes proposed and checkpointed chains back to proven tip, then continues', async () => { - // Phase 1: Sync blocks 1-12 with checkpoints 1-3, proven at checkpoint 2 (block 6) - setRemoteTipsMultiBlock(12, 9, 6); - - await blockStream.work(); - - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - expectBlocksAdded([10, 11, 12]), - { type: 'chain-proven', block: makeBlockId(6), checkpoint: makeCheckpointId(2) }, - ]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(12); - localData.checkpointed.block.number = BlockNumber(9); - localData.checkpointed.checkpoint.number = CheckpointNumber(3); - localData.proven.block.number = BlockNumber(6); - localData.proven.checkpoint.number = CheckpointNumber(2); - - // Phase 2: Prune - checkpoint 3 failed to prove, prune back to proven tip (block 6) - // Mess up hashes for blocks 7-12 to simulate reorg - for (let i = 7; i <= 12; i++) { - localData.blockHashes[i] = `0xbad${i}`; - } - - // Source now has proposed=6, checkpointed=6, proven=6 - setRemoteTipsMultiBlock(6, 6, 6); - - await blockStream.work(); - - // Should emit chain-pruned back to block 6, carrying the source proven tip (block 6 / ckpt 2) - expect(handler.events).toEqual([ - { - type: 'chain-pruned', - block: makeBlockId(6), - checkpointed: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(2) }), - }), - proven: expect.objectContaining({ - block: expect.objectContaining({ number: BlockNumber(6) }), - checkpoint: expect.objectContaining({ number: CheckpointNumber(2) }), - }), - }, - ]); - - handler.clearEvents(); - - // Update local state after prune - localData.proposed.number = BlockNumber(6); - localData.checkpointed.block.number = BlockNumber(6); - localData.checkpointed.checkpoint.number = CheckpointNumber(2); - for (let i = 7; i <= 12; i++) { - delete localData.blockHashes[i]; - } - - // Phase 3: Chain continues with new blocks and checkpoints - // New blocks 7-15 arrive with checkpoints 3-5, proven advances to checkpoint 3 - setRemoteTipsMultiBlock(15, 15, 9); - - await blockStream.work(); - - // Should continue normally with new blocks and checkpoints - expect(handler.events).toEqual([ - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - expectBlocksAdded([10, 11, 12]), - expectCheckpointed(4), - expectBlocksAdded([13, 14, 15]), - expectCheckpointed(5), - { type: 'chain-proven', block: makeBlockId(9), checkpoint: makeCheckpointId(3) }, - ]); - }); - - it('prunes uncheckpointed blocks and immediately receives new ones', async () => { - // Phase 1: Sync blocks 1-6 with checkpoint 1, blocks 4-6 uncheckpointed - setRemoteTipsMultiBlock(6, 3); - - await blockStream.work(); - - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - ]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(6); - localData.checkpointed.block.number = BlockNumber(3); - localData.checkpointed.checkpoint.number = CheckpointNumber(1); - - // Phase 2: Prune blocks 4-6 due to bad hashes - localData.blockHashes[4] = '0xbad4'; - localData.blockHashes[5] = '0xbad5'; - localData.blockHashes[6] = '0xbad6'; - - // Source still at checkpointed=3 (no new checkpoints yet) - setRemoteTipsMultiBlock(3, 3); - - await blockStream.work(); - - // Should emit prune back to block 3 - expect(handler.events).toEqual([ - { - type: 'chain-pruned', - block: makeBlockId(3), - checkpointed: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(1) }), - }), - proven: expect.objectContaining({ - block: expect.objectContaining({ number: BlockNumber(0) }), - }), - }, - ]); - - handler.clearEvents(); - - // Update local state after prune - localData.proposed.number = BlockNumber(3); - delete localData.blockHashes[4]; - delete localData.blockHashes[5]; - delete localData.blockHashes[6]; - - // Phase 3: New blocks 4-9 arrive with checkpoints 2-3 - setRemoteTipsMultiBlock(9, 9); - - await blockStream.work(); - - // Should continue normally with new blocks and checkpoints - expect(handler.events).toEqual([ - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - ]); - }); - - it('prunes proposed chain back to genesis when no checkpoints exist', async () => { - // Phase 1: Sync blocks 1-6, no checkpoints (checkpointed=0, proven=0, finalized=0) - setRemoteTipsMultiBlock(6, 0); - - await blockStream.work(); - - // All blocks come as uncheckpointed - expect(handler.events).toEqual([expectBlocksAdded([1, 2, 3, 4, 5, 6])]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(6); - - // Phase 2: All blocks are invalid, prune back to genesis (block 0) - for (let i = 1; i <= 6; i++) { - localData.blockHashes[i] = `0xbad${i}`; - } - - // Source now has proposed=0, checkpointed=0 - setRemoteTipsMultiBlock(0, 0); - - await blockStream.work(); - - // Should emit chain-pruned back to block 0 - expect(handler.events).toEqual([ - { - type: 'chain-pruned', - block: makeBlockId(0), - checkpointed: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(0) }), - }), - proven: expect.objectContaining({ - block: expect.objectContaining({ number: BlockNumber(0) }), - }), - }, - ]); - - handler.clearEvents(); - - // Update local state after prune - localData.proposed.number = BlockNumber(0); - for (let i = 1; i <= 6; i++) { - delete localData.blockHashes[i]; - } - - // Phase 3: New blocks 1-6 arrive with checkpoints 1-2 - setRemoteTipsMultiBlock(6, 6); - - await blockStream.work(); - - // Should continue normally from genesis - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - ]); - }); - - it('prunes both proposed and checkpointed chains back to genesis', async () => { - // Phase 1: Sync blocks 1-6 with checkpoint 1 (blocks 1-3), blocks 4-6 uncheckpointed - setRemoteTipsMultiBlock(6, 3); - - await blockStream.work(); - - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - ]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(6); - localData.checkpointed.block.number = BlockNumber(3); - localData.checkpointed.checkpoint.number = CheckpointNumber(1); - - // Phase 2: All blocks are invalid (even checkpointed ones), prune back to genesis - for (let i = 1; i <= 6; i++) { - localData.blockHashes[i] = `0xbad${i}`; - } - - // Source now has proposed=0, checkpointed=0 (full chain reset) - setRemoteTipsMultiBlock(0, 0); - - await blockStream.work(); - - // Should emit chain-pruned back to block 0 - expect(handler.events).toEqual([ - { - type: 'chain-pruned', - block: makeBlockId(0), - checkpointed: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(0) }), - }), - proven: expect.objectContaining({ - block: expect.objectContaining({ number: BlockNumber(0) }), - }), - }, - ]); - - handler.clearEvents(); - - // Update local state after prune - localData.proposed.number = BlockNumber(0); - localData.checkpointed.block.number = BlockNumber(0); - localData.checkpointed.checkpoint.number = CheckpointNumber(0); - for (let i = 1; i <= 6; i++) { - delete localData.blockHashes[i]; - } - - // Phase 3: New chain starts fresh with blocks 1-9 and checkpoints 1-3 - setRemoteTipsMultiBlock(9, 9); - - await blockStream.work(); - - // Should continue normally from genesis - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectCheckpointed(1), - expectBlocksAdded([4, 5, 6]), - expectCheckpointed(2), - expectBlocksAdded([7, 8, 9]), - expectCheckpointed(3), - ]); - }); - }); - - describe('ignoreCheckpoints', () => { - beforeEach(() => { - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - ignoreCheckpoints: true, - }); - }); - - it('does not emit checkpoint events for new checkpointed blocks', async () => { - // 6 blocks in 2 checkpoints (blocks 1-3 in checkpoint 1, blocks 4-6 in checkpoint 2) - setRemoteTipsMultiBlock(6, 6); - - await blockStream.work(); - - // Should emit blocks-added events but NO chain-checkpointed events - expect(handler.events).toEqual([expectBlocksAdded([1, 2, 3]), expectBlocksAdded([4, 5, 6])]); - }); - - it('does not emit checkpoint events when blocks become checkpointed after being added', async () => { - // Phase 1: Blocks 1-3 checkpointed (checkpoint 1), blocks 4-6 uncheckpointed - setRemoteTipsMultiBlock(6, 3); - - await blockStream.work(); - - // Blocks 1-3 via checkpoint, blocks 4-6 as uncheckpointed, no checkpoint events - expect(handler.events).toEqual([expectBlocksAdded([1, 2, 3]), expectBlocksAdded([4, 5, 6])]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(6); - localData.checkpointed.block.number = BlockNumber(3); - localData.checkpointed.checkpoint.number = CheckpointNumber(1); - - // Phase 2: Now blocks 4-6 become checkpointed too (checkpoint 2) - setRemoteTipsMultiBlock(6, 6); - - await blockStream.work(); - - // Checkpoint 2 event would normally be emitted for blocks 4-6, but not with ignoreCheckpoints - expect(handler.events).toEqual([]); - }); - - it('does not emit checkpoint events but still emits prune events for uncheckpointed blocks', async () => { - // Phase 1: Sync blocks 1-9, blocks 1-6 checkpointed (checkpoints 1-2), blocks 7-9 uncheckpointed - setRemoteTipsMultiBlock(9, 6); - - await blockStream.work(); - - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectBlocksAdded([4, 5, 6]), - expectBlocksAdded([7, 8, 9]), - ]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(9); - localData.checkpointed.block.number = BlockNumber(6); - localData.checkpointed.checkpoint.number = CheckpointNumber(2); - - // Phase 2: Prune uncheckpointed blocks 7-9 (bad hashes) - localData.blockHashes[7] = '0xbad7'; - localData.blockHashes[8] = '0xbad8'; - localData.blockHashes[9] = '0xbad9'; - - setRemoteTipsMultiBlock(6, 6); - - await blockStream.work(); - - // Should emit chain-pruned event (prune events are always emitted), no checkpoint events - expect(handler.events).toEqual([ - { - type: 'chain-pruned', - block: makeBlockId(6), - checkpointed: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(2) }), - }), - proven: expect.objectContaining({ - block: expect.objectContaining({ number: BlockNumber(0) }), - }), - }, - ]); - }); - - it('does not emit checkpoint events but still emits prune events for checkpointed blocks', async () => { - // Phase 1: Sync blocks 1-9, all checkpointed (checkpoints 1-3) - setRemoteTipsMultiBlock(9, 9); - - await blockStream.work(); - - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectBlocksAdded([4, 5, 6]), - expectBlocksAdded([7, 8, 9]), - ]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(9); - localData.checkpointed.block.number = BlockNumber(9); - localData.checkpointed.checkpoint.number = CheckpointNumber(3); - - // Phase 2: Prune checkpointed blocks (reorg of checkpointed chain back to checkpoint 1) - localData.blockHashes[4] = '0xbad4'; - localData.blockHashes[5] = '0xbad5'; - localData.blockHashes[6] = '0xbad6'; - localData.blockHashes[7] = '0xbad7'; - localData.blockHashes[8] = '0xbad8'; - localData.blockHashes[9] = '0xbad9'; - - setRemoteTipsMultiBlock(3, 3); - - await blockStream.work(); - - // Should emit chain-pruned event - expect(handler.events).toEqual([ - { - type: 'chain-pruned', - block: makeBlockId(3), - checkpointed: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(1) }), - }), - proven: expect.objectContaining({ - block: expect.objectContaining({ number: BlockNumber(0) }), - }), - }, - ]); - }); - - it('does not emit checkpoint events but still emits proven and finalized events', async () => { - // 9 blocks in 3 checkpoints, proven at block 6 (checkpoint 2), finalized at block 3 (checkpoint 1) - setRemoteTipsMultiBlock(9, 9, 6, 3); - - await blockStream.work(); - - // Should have blocks-added, chain-proven, chain-finalized, but NO checkpoint events - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectBlocksAdded([4, 5, 6]), - expectBlocksAdded([7, 8, 9]), - { type: 'chain-proven', block: makeBlockId(6), checkpoint: makeCheckpointId(2) }, - { type: 'chain-finalized', block: makeBlockId(3), checkpoint: makeCheckpointId(1) }, - ]); - }); - - it('does not emit checkpoint events but still emits proven and finalized events with skipFinalized', async () => { - // Use skipFinalized in addition to ignoreCheckpoints - blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { - batchSize: 10, - ignoreCheckpoints: true, - skipFinalized: true, - }); - - // 12 blocks in 4 checkpoints, proven at block 9 (checkpoint 3), finalized at block 6 (checkpoint 2) - setRemoteTipsMultiBlock(12, 12, 9, 6); - - // Local is behind - at block 3, finalized at block 3 - localData.proposed.number = BlockNumber(3); - localData.checkpointed.block.number = BlockNumber(3); - localData.checkpointed.checkpoint.number = CheckpointNumber(1); - localData.proven.block.number = BlockNumber(3); - localData.proven.checkpoint.number = CheckpointNumber(1); - localData.finalized.block.number = BlockNumber(3); - localData.finalized.checkpoint.number = CheckpointNumber(1); - - await blockStream.work(); - - // With skipFinalized, we skip to the finalized tip (block 6), then sync from there - // Should emit blocks 6-12, proven, finalized, but NO checkpoint events - expect(handler.events).toEqual([ - expectBlocksAdded([6]), - expectBlocksAdded([7, 8, 9]), - expectBlocksAdded([10, 11, 12]), - { type: 'chain-proven', block: makeBlockId(9), checkpoint: makeCheckpointId(3) }, - { type: 'chain-finalized', block: makeBlockId(6), checkpoint: makeCheckpointId(2) }, - ]); - }); - - it('does not emit checkpoint events after prune and re-sync with new blocks', async () => { - // Phase 1: Sync blocks 1-9, all checkpointed (checkpoints 1-3) - setRemoteTipsMultiBlock(9, 9); - - await blockStream.work(); - - expect(handler.events).toEqual([ - expectBlocksAdded([1, 2, 3]), - expectBlocksAdded([4, 5, 6]), - expectBlocksAdded([7, 8, 9]), - ]); - - handler.clearEvents(); - - // Update local state - localData.proposed.number = BlockNumber(9); - localData.checkpointed.block.number = BlockNumber(9); - localData.checkpointed.checkpoint.number = CheckpointNumber(3); - - // Phase 2: Prune back to block 6 (checkpoint 2) - localData.blockHashes[7] = '0xbad7'; - localData.blockHashes[8] = '0xbad8'; - localData.blockHashes[9] = '0xbad9'; - - setRemoteTipsMultiBlock(6, 6); - - await blockStream.work(); - - expect(handler.events).toEqual([ - { - type: 'chain-pruned', - block: makeBlockId(6), - checkpointed: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(2) }), - }), - proven: expect.objectContaining({ - block: expect.objectContaining({ number: BlockNumber(0) }), - }), - }, - ]); - - handler.clearEvents(); - - // Update local state after prune - localData.proposed.number = BlockNumber(6); - localData.checkpointed.block.number = BlockNumber(6); - localData.checkpointed.checkpoint.number = CheckpointNumber(2); - delete localData.blockHashes[7]; - delete localData.blockHashes[8]; - delete localData.blockHashes[9]; - - // Phase 3: New blocks 7-12 arrive, all checkpointed (checkpoints 3-4) - setRemoteTipsMultiBlock(12, 12); - - await blockStream.work(); - - // Should have new blocks-added events but still no checkpoint events - expect(handler.events).toEqual([expectBlocksAdded([7, 8, 9]), expectBlocksAdded([10, 11, 12])]); - }); - }); }); describe('skipFinalized', () => { @@ -1668,9 +560,9 @@ describe('L2BlockStream', () => { it('skips ahead to the latest finalized block', async () => { setRemoteTips(40, 0, 38, 35); - localData.proposed.number = BlockNumber(5); - localData.proven.block.number = BlockNumber(2); - localData.finalized.block.number = BlockNumber(2); + localData.setProposed(5); + localData.setProven(2); + localData.setFinalized(2); await blockStream.work(); @@ -1685,133 +577,17 @@ describe('L2BlockStream', () => { it('does not skip if already ahead of finalized', async () => { setRemoteTips(40, 0, 38, 35); - localData.proposed.number = BlockNumber(38); - localData.proven.block.number = BlockNumber(38); - localData.finalized.block.number = BlockNumber(35); + localData.setProposed(38); + localData.setProven(38); + localData.setFinalized(35); await blockStream.work(); + // proven and finalized tips already match the source on (number, hash), so only new blocks are emitted. expect(handler.events).toEqual([ { type: 'blocks-added', blocks: times(2, i => makeBlock(i + 39)) }, ] satisfies L2BlockStreamEvent[]); }); - - it('emits the finalized checkpoint when fetching the finalized block (inconsistency)', async () => { - // This test demonstrates an inconsistency: Loop 1 skips finalized checkpoints via - // nextCheckpointToEmit adjustment, but Loop 2 still emits the finalized checkpoint - // when we fetch the finalized block. - // - // Scenario: Fresh start with skipFinalized=true - // - Checkpoint 6 (block 6) is finalized - // - Checkpoints 7-9 are checkpointed - // - Blocks 10-12 are uncheckpointed - // - // Expected (if consistent): Skip checkpoint 6, only emit checkpoints 7+ - // Actual: Checkpoint 6 IS emitted because Loop 2 fetches block 6 and emits its checkpoint - - // 12 blocks total, checkpointed up to 9, proven at 9, finalized at 6 - setRemoteTips(12, 9, 9, 6); - - // Fresh start - no local blocks - localData.proposed.number = BlockNumber(0); - localData.checkpointed.block.number = BlockNumber(0); - localData.checkpointed.checkpoint.number = CheckpointNumber(0); - localData.proven.block.number = BlockNumber(0); - localData.proven.checkpoint.number = CheckpointNumber(0); - localData.finalized.block.number = BlockNumber(0); - localData.finalized.checkpoint.number = CheckpointNumber(0); - - await blockStream.work(); - - // With skipFinalized, nextCheckpointToEmit starts at checkpoint 6 (finalized checkpoint) - // Loop 1 skips immediately (no local blocks) - // Loop 2 starts at block 6 (finalized block), finds checkpoint 6, emits block 6, then emits checkpoint 6 - // This IS the inconsistency: we emit checkpoint 6 even though it's finalized - expect(handler.events).toEqual([ - expectBlocksAdded([6]), - expectCheckpointed(6), // <-- This is the finalized checkpoint being emitted! - expectBlocksAdded([7]), - expectCheckpointed(7), - expectBlocksAdded([8]), - expectCheckpointed(8), - expectBlocksAdded([9]), - expectCheckpointed(9), - { type: 'blocks-added', blocks: times(3, i => makeBlock(i + 10)) }, - { type: 'chain-proven', block: makeBlockId(9), checkpoint: makeCheckpointId(9) }, - { type: 'chain-finalized', block: makeBlockId(6), checkpoint: makeCheckpointId(6) }, - ]); - }); - - it('does not emit finalized checkpointed blocks when skipFinalized is true', async () => { - // Source: finalized=35, proven=38, checkpointed=38, proposed=40 - // All blocks up to 38 are checkpointed (each block is its own checkpoint), blocks 39-40 are uncheckpointed - setRemoteTips(40, 38, 38, 35); - - // Local is at block 5, finalized at 2 - localData.proposed.number = BlockNumber(5); - localData.checkpointed.block.number = BlockNumber(2); - localData.checkpointed.checkpoint.number = CheckpointNumber(2); - localData.proven.block.number = BlockNumber(2); - localData.proven.checkpoint.number = CheckpointNumber(2); - localData.finalized.block.number = BlockNumber(2); - localData.finalized.checkpoint.number = CheckpointNumber(2); - - await blockStream.work(); - - // With skipFinalized=true, we should skip to block 35 (finalized tip) - // We should NOT emit blocks 6-34 since they are finalized - // We should only emit blocks from 35 onwards, and checkpoint events from checkpoint 36 onwards - // (checkpoint 35 is the finalized checkpoint, so we skip it too) - expect(handler.events).toEqual([ - expect.objectContaining({ - type: 'blocks-added', - blocks: [expect.objectContaining({ number: BlockNumber(35) })], - }), - expect.objectContaining({ - type: 'chain-checkpointed', - block: expect.objectContaining({ number: BlockNumber(35) }), - checkpoint: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(35) }), - }), - }), - expect.objectContaining({ - type: 'blocks-added', - blocks: [expect.objectContaining({ number: BlockNumber(36) })], - }), - expect.objectContaining({ - type: 'chain-checkpointed', - block: expect.objectContaining({ number: BlockNumber(36) }), - checkpoint: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(36) }), - }), - }), - expect.objectContaining({ - type: 'blocks-added', - blocks: [expect.objectContaining({ number: BlockNumber(37) })], - }), - expect.objectContaining({ - type: 'chain-checkpointed', - block: expect.objectContaining({ number: BlockNumber(37) }), - checkpoint: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(37) }), - }), - }), - expect.objectContaining({ - type: 'blocks-added', - blocks: [expect.objectContaining({ number: BlockNumber(38) })], - }), - expect.objectContaining({ - type: 'chain-checkpointed', - block: expect.objectContaining({ number: BlockNumber(38) }), - checkpoint: expect.objectContaining({ - checkpoint: expect.objectContaining({ number: CheckpointNumber(38) }), - }), - }), - { type: 'blocks-added', blocks: times(2, i => makeBlock(i + 39)) }, - { type: 'chain-proven', block: makeBlockId(38), checkpoint: makeCheckpointId(38) }, - { type: 'chain-finalized', block: makeBlockId(35), checkpoint: makeCheckpointId(35) }, - ]); - }); }); describe('local provider without checkpointed tip', () => { @@ -1832,14 +608,8 @@ describe('L2BlockStream', () => { await blockStream.work(); - // All 5 blocks are synced (one per checkpoint in the mock) and no checkpoint events are emitted. - expect(handler.events).toEqual([ - expectBlocksAdded([1]), - expectBlocksAdded([2]), - expectBlocksAdded([3]), - expectBlocksAdded([4]), - expectBlocksAdded([5]), - ]); + // All 5 blocks are synced and no checkpoint events are emitted. + expect(handler.events).toEqual([{ type: 'blocks-added', blocks: times(5, i => makeBlock(i + 1)) }]); expect(handler.events.every(e => e.type === 'blocks-added')).toBe(true); }); @@ -1859,6 +629,11 @@ describe('L2BlockStream', () => { }); }); +/** Builds a checkpoint id from a plain number, isolated so the branded-type lint rule sees no BlockNumber flow. */ +function makeTipCheckpointId(checkpointNumber: number) { + return { number: CheckpointNumber(checkpointNumber), hash: new Fr(checkpointNumber).toString() }; +} + class TestL2BlockStreamEventHandler implements L2BlockStreamEventHandler { public readonly events: L2BlockStreamEvent[] = []; public throwing: boolean = false; @@ -1885,20 +660,48 @@ class TestL2BlockStreamEventHandler implements L2BlockStreamEventHandler { class TestL2BlockStreamLocalDataProvider implements L2BlockStreamLocalDataProvider { public readonly blockHashes: Record = {}; - public proposed = { number: BlockNumber.ZERO, hash: GENESIS_BLOCK_HEADER_HASH.toString() }; + // Genesis tip hashes match `getL2BlockHash(0)` (`new Fr(0)`) and the mock source's genesis tips + // (`makeHash(0)`), so the tier reconciliation finds no spurious difference at genesis. + public proposed = { number: BlockNumber.ZERO, hash: new Fr(0).toString() }; public checkpointed = { - block: { number: BlockNumber.ZERO, hash: GENESIS_BLOCK_HEADER_HASH.toString() }, - checkpoint: { number: CheckpointNumber.ZERO, hash: GENESIS_CHECKPOINT_HEADER_HASH.toString() }, + block: { number: BlockNumber.ZERO, hash: new Fr(0).toString() }, + checkpoint: { number: CheckpointNumber.ZERO, hash: new Fr(0).toString() }, }; public proven = { - block: { number: BlockNumber.ZERO, hash: GENESIS_BLOCK_HEADER_HASH.toString() }, - checkpoint: { number: CheckpointNumber.ZERO, hash: GENESIS_CHECKPOINT_HEADER_HASH.toString() }, + block: { number: BlockNumber.ZERO, hash: new Fr(0).toString() }, + checkpoint: { number: CheckpointNumber.ZERO, hash: new Fr(0).toString() }, }; public finalized = { - block: { number: BlockNumber.ZERO, hash: GENESIS_BLOCK_HEADER_HASH.toString() }, - checkpoint: { number: CheckpointNumber.ZERO, hash: GENESIS_CHECKPOINT_HEADER_HASH.toString() }, + block: { number: BlockNumber.ZERO, hash: new Fr(0).toString() }, + checkpoint: { number: CheckpointNumber.ZERO, hash: new Fr(0).toString() }, }; + /** Sets a tip's number and a matching hash, so the tier reconciliation sees consistent (number, hash) pairs. */ + public setProposed(number: number) { + this.proposed = { number: BlockNumber(number), hash: new Fr(number).toString() }; + } + + public setCheckpointed(blockNumber: number, checkpointNumber: number) { + this.checkpointed = { + block: { number: BlockNumber(blockNumber), hash: new Fr(blockNumber).toString() }, + checkpoint: makeTipCheckpointId(checkpointNumber), + }; + } + + public setProven(blockNumber: number) { + this.proven = { + block: { number: BlockNumber(blockNumber), hash: new Fr(blockNumber).toString() }, + checkpoint: makeTipCheckpointId(blockNumber), + }; + } + + public setFinalized(blockNumber: number) { + this.finalized = { + block: { number: BlockNumber(blockNumber), hash: new Fr(blockNumber).toString() }, + checkpoint: makeTipCheckpointId(blockNumber), + }; + } + public getL2BlockHash(number: number): Promise { return Promise.resolve( number > this.proposed.number ? undefined : (this.blockHashes[number] ?? new Fr(number).toString()), @@ -1919,9 +722,9 @@ class TestL2BlockStreamLocalDataProvider implements L2BlockStreamLocalDataProvid class TestLocalChainTipsProvider implements L2BlockStreamLocalDataProvider { public readonly blockHashes: Record = {}; - public proposed = { number: BlockNumber.ZERO, hash: GENESIS_BLOCK_HEADER_HASH.toString() }; - public proven = { block: { number: BlockNumber.ZERO, hash: GENESIS_BLOCK_HEADER_HASH.toString() } }; - public finalized = { block: { number: BlockNumber.ZERO, hash: GENESIS_BLOCK_HEADER_HASH.toString() } }; + public proposed = { number: BlockNumber.ZERO, hash: new Fr(0).toString() }; + public proven = { block: { number: BlockNumber.ZERO, hash: new Fr(0).toString() } }; + public finalized = { block: { number: BlockNumber.ZERO, hash: new Fr(0).toString() } }; public getL2BlockHash(number: number): Promise { return Promise.resolve( diff --git a/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.ts b/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.ts index 3d5280e1684d..33f648c03694 100644 --- a/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.ts +++ b/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.ts @@ -1,14 +1,18 @@ -import { BlockNumber, CheckpointNumber } from '@aztec/foundation/branded-types'; +import { BlockNumber } from '@aztec/foundation/branded-types'; import { AbortError } from '@aztec/foundation/error'; import { createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; -import type { PublishedCheckpoint } from '../../checkpoint/published_checkpoint.js'; -import { type L2BlockId, type L2BlockSource, makeL2BlockId } from '../l2_block_source.js'; -import type { L2BlockStreamEvent, L2BlockStreamEventHandler, L2BlockStreamLocalDataProvider } from './interfaces.js'; +import { type L2BlockId, type L2BlockSource, type L2TipId, makeL2BlockId } from '../l2_block_source.js'; +import type { + L2BlockStreamEvent, + L2BlockStreamEventHandler, + L2BlockStreamLocalDataProvider, + LocalL2BlockId, +} from './interfaces.js'; -/** Maximum number of checkpoints to prefetch at once during sync. Matches MAX_RPC_CHECKPOINTS_LEN. */ -export const CHECKPOINT_PREFETCH_LIMIT = 50; +/** Subset of the block source the stream depends on. Checkpoint payloads are no longer fetched here. */ +type L2BlockStreamSource = Pick; /** Creates a stream of events for new blocks, chain tips updates, and reorgs, out of polling an archiver or a node. */ export class L2BlockStream { @@ -17,7 +21,7 @@ export class L2BlockStream { private hasStarted = false; constructor( - private l2BlockSource: Pick, + private l2BlockSource: L2BlockStreamSource, private localData: L2BlockStreamLocalDataProvider, private handler: L2BlockStreamEventHandler, private readonly log = createLogger('types:block_stream'), @@ -27,10 +31,8 @@ export class L2BlockStream { startingBlock?: number; /** Instead of downloading all blocks, only fetch the smallest subset that results in reliable reorg detection. */ skipFinalized?: boolean; - /** When true, checkpoint events will not be emitted. Blocks are still fetched via checkpoints but only blocks-added events are emitted. */ + /** When true, checkpoint events will not be emitted. Blocks are still fetched but only blocks-added events are emitted. */ ignoreCheckpoints?: boolean; - /** Maximum number of checkpoints to prefetch at once during sync. Defaults to CHECKPOINT_PREFETCH_LIMIT (50). */ - checkpointPrefetchLimit?: number; } = {}, ) { // Note that RunningPromise is in stopped state by default. This promise won't run until someone invokes `start`, @@ -100,6 +102,7 @@ export class L2BlockStream { latestBlockNumber--; } + let pruned = false; if (latestBlockNumber < localTips.proposed.number) { latestBlockNumber = BlockNumber(Math.min(latestBlockNumber, sourceTips.proposed.number)); // see #13471 const hash = sourceCache.get(latestBlockNumber) ?? (await this.getBlockHashFromSource(latestBlockNumber)); @@ -115,149 +118,34 @@ export class L2BlockStream { checkpointed: sourceTips.checkpointed, proven: sourceTips.proven, }); + pruned = true; } - // If we are just starting, use the starting block number from the options. - const startingBlock = this.opts.startingBlock !== undefined ? BlockNumber(this.opts.startingBlock) : undefined; - if (latestBlockNumber === 0 && startingBlock !== undefined) { - latestBlockNumber = BlockNumber(Math.max(startingBlock - 1, 0)); - } - - // Only log this entry once (for sanity) - if (!this.hasStarted) { - this.log.verbose(`Starting sync from block number ${latestBlockNumber}`); - this.hasStarted = true; - } - + // The post-prune cursor: the highest block number both sides agree on. Block downloads resume from here. let nextBlockNumber = latestBlockNumber + 1; - // When checkpoints are ignored the local provider may omit `checkpointed`; in that case the fallback to - // CheckpointNumber.ZERO is harmless because `nextCheckpointToEmit` is never consumed for emission (Loop 1 and - // the startingBlock/skipFinalized adjustments below only feed checkpoint emission, which is gated off). - let nextCheckpointToEmit = CheckpointNumber( - (localTips.checkpointed?.checkpoint.number ?? CheckpointNumber.ZERO) + 1, - ); - // When startingBlock is set, also skip ahead for checkpoints. - if ( - startingBlock !== undefined && - startingBlock >= 1 && - nextCheckpointToEmit <= sourceTips.checkpointed.checkpoint.number - ) { - if (startingBlock > sourceTips.checkpointed.block.number) { - // startingBlock is past all checkpointed blocks; skip Loop 1 entirely. - nextCheckpointToEmit = CheckpointNumber(sourceTips.checkpointed.checkpoint.number + 1); - } else { - const startingBlockData = await this.l2BlockSource.getBlockData({ number: startingBlock }); - if (startingBlockData) { - nextCheckpointToEmit = CheckpointNumber(Math.max(nextCheckpointToEmit, startingBlockData.checkpointNumber)); - } - } + // If we are just starting from a fresh local store, fast-forward the download cursor to the configured + // starting block so we skip the history the consumer doesn't care about. + const startingBlock = this.opts.startingBlock !== undefined ? BlockNumber(this.opts.startingBlock) : undefined; + if (latestBlockNumber === 0 && startingBlock !== undefined) { + nextBlockNumber = Math.max(startingBlock, 1); } if (this.opts.skipFinalized) { // When skipping finalized blocks we need to provide reliable reorg detection while fetching as few blocks as // possible. Finalized blocks cannot be reorged by definition, so we can skip most of them. We do need the very // last finalized block however in order to guarantee that we will eventually find a block in which our local - // store matches the source. - // If the last finalized block is behind our local tip, there is nothing to skip. + // store matches the source. If the last finalized block is behind our local tip, there is nothing to skip. nextBlockNumber = Math.max(sourceTips.finalized.block.number, nextBlockNumber); - // If the next checkpoint to emit is behind the finalized tip then skip forward - nextCheckpointToEmit = CheckpointNumber(Math.max(nextCheckpointToEmit, sourceTips.finalized.checkpoint.number)); - } - - // Loop 1: Emit checkpoint events for checkpoints whose blocks are already in local storage. - // This handles the case where blocks were synced as uncheckpointed and later became checkpointed. - // The guard `lastBlockInCheckpoint.number > localTips.proposed.number` ensures we don't emit - // checkpoints for blocks we don't have (e.g., when startingBlock skips earlier blocks). - // Since only one checkpoint can ever be uncheckpointed, this loop should iterate at most once. - if (!this.opts.ignoreCheckpoints) { - let loop1Iterations = 0; - while (nextCheckpointToEmit <= sourceTips.checkpointed.checkpoint.number) { - const checkpoints = await this.l2BlockSource.getCheckpoints({ from: nextCheckpointToEmit, limit: 1 }); - if (checkpoints.length === 0) { - break; - } - const lastBlockInCheckpoint = checkpoints[0].checkpoint.blocks.at(-1)!; - // If this checkpoint has blocks we haven't seen yet, stop - they need to be fetched first - if (lastBlockInCheckpoint.number > localTips.proposed.number) { - break; - } - loop1Iterations++; - if (loop1Iterations > 1) { - this.log.warn( - `Emitting multiple checkpoints (${loop1Iterations}) for already-local blocks. ` + - `Next checkpoint: ${nextCheckpointToEmit}, source checkpointed: ${sourceTips.checkpointed.checkpoint.number}`, - ); - } - const lastBlockHash = await lastBlockInCheckpoint.hash(); - await this.emitEvent({ - type: 'chain-checkpointed', - checkpoint: checkpoints[0], - block: makeL2BlockId(lastBlockInCheckpoint.number, lastBlockHash.toString()), - }); - nextCheckpointToEmit = CheckpointNumber(nextCheckpointToEmit + 1); - } } - // Loop 2: Fetch new checkpointed blocks. For each checkpoint, emit all blocks - // from that checkpoint that we need, then emit the checkpoint event. - // We prefetch multiple checkpoints, then process them one by one. - let prefetchedCheckpoints: PublishedCheckpoint[] = []; - let prefetchIdx = 0; - let nextCheckpointNumber: CheckpointNumber | undefined; - - // Find the starting checkpoint number - if (nextBlockNumber <= sourceTips.checkpointed.block.number) { - const blockData = await this.l2BlockSource.getBlockData({ number: BlockNumber(nextBlockNumber) }); - if (blockData) { - nextCheckpointNumber = blockData.checkpointNumber; - } - } - - while (nextBlockNumber <= sourceTips.checkpointed.block.number && nextCheckpointNumber !== undefined) { - // Refill the prefetch buffer when exhausted - if (prefetchIdx >= prefetchedCheckpoints.length) { - const prefetchLimit = this.opts.checkpointPrefetchLimit ?? CHECKPOINT_PREFETCH_LIMIT; - prefetchedCheckpoints = await this.l2BlockSource.getCheckpoints({ - from: nextCheckpointNumber, - limit: prefetchLimit, - }); - prefetchIdx = 0; - if (prefetchedCheckpoints.length === 0) { - break; - } - } - - const checkpoint = prefetchedCheckpoints[prefetchIdx]!; - - // Get all blocks from this checkpoint that we need, respecting batchSize - const limit = Math.min(this.opts.batchSize ?? 50, sourceTips.checkpointed.block.number - nextBlockNumber + 1); - const blocksForCheckpoint = checkpoint.checkpoint.blocks - .filter(b => b.number >= nextBlockNumber) - .slice(0, limit); - if (blocksForCheckpoint.length === 0) { - break; - } - await this.emitEvent({ type: 'blocks-added', blocks: blocksForCheckpoint }); - nextBlockNumber = blocksForCheckpoint.at(-1)!.number + 1; - - // If we've reached the end of this checkpoint, emit the checkpoint event and move to next - const lastBlockInCheckpoint = checkpoint.checkpoint.blocks.at(-1)!; - if (nextBlockNumber > lastBlockInCheckpoint.number) { - if (!this.opts.ignoreCheckpoints) { - const lastBlockHash = await lastBlockInCheckpoint.hash(); - await this.emitEvent({ - type: 'chain-checkpointed', - checkpoint, - block: makeL2BlockId(lastBlockInCheckpoint.number, lastBlockHash.toString()), - }); - } - prefetchIdx++; - nextCheckpointNumber = CheckpointNumber(nextCheckpointNumber + 1); - } + // Only log this entry once (for sanity) + if (!this.hasStarted) { + this.log.verbose(`Starting sync from block number ${nextBlockNumber - 1}`); + this.hasStarted = true; } - // Loop 3: Fetch any remaining uncheckpointed (proposed) blocks. + // Download every block up to the source's proposed tip, batched by `batchSize`. while (nextBlockNumber <= sourceTips.proposed.number) { const limit = Math.min(this.opts.batchSize ?? 50, sourceTips.proposed.number - nextBlockNumber + 1); this.log.trace(`Requesting blocks from ${nextBlockNumber} limit ${limit}`); @@ -269,15 +157,30 @@ export class L2BlockStream { nextBlockNumber = blocks.at(-1)!.number + 1; } - // Update the proven and finalized tips. - if (localTips.proven !== undefined && sourceTips.proven.block.number !== localTips.proven.block.number) { + // End-of-pass tier reconciliation. For each tier, emit a single event iff the source tip differs from the + // local one. All three source tips come from the SAME `sourceTips` snapshot, so no extra source fetches are + // needed. We re-read the local tips after a prune because the prune handler has already clamped the local + // cursors back; the `localTips` snapshot taken before the prune would be stale and would mis-drive the tier + // comparison (emitting events relative to cursors that no longer exist). + const reconcileTips = pruned ? await this.localData.getL2Tips() : localTips; + if (!this.opts.ignoreCheckpoints && this.tipDiffers(reconcileTips.checkpointed?.block, sourceTips.checkpointed)) { + await this.emitEvent({ + type: 'chain-checkpointed', + block: sourceTips.checkpointed.block, + checkpoint: sourceTips.checkpointed.checkpoint, + }); + } + if (reconcileTips.proven !== undefined && this.tipDiffers(reconcileTips.proven.block, sourceTips.proven)) { await this.emitEvent({ type: 'chain-proven', block: sourceTips.proven.block, checkpoint: sourceTips.proven.checkpoint, }); } - if (localTips.finalized !== undefined && sourceTips.finalized.block.number !== localTips.finalized.block.number) { + if ( + reconcileTips.finalized !== undefined && + this.tipDiffers(reconcileTips.finalized.block, sourceTips.finalized) + ) { await this.emitEvent({ type: 'chain-finalized', block: sourceTips.finalized.block, @@ -292,6 +195,25 @@ export class L2BlockStream { } } + /** + * Returns whether the source tip differs from the local one and therefore warrants a tier event. Compares block + * number and, when both hashes are known, block hash. The hash comparison is skipped when the local hash is + * undefined or missing: world-state legitimately reports `undefined` hashes for tips ahead of its synced range, + * and comparing against an undefined hash would re-emit the event on every poll. + */ + private tipDiffers(localBlock: LocalL2BlockId | undefined, sourceTip: L2TipId): boolean { + if (localBlock === undefined) { + return true; + } + if (sourceTip.block.number !== localBlock.number) { + return true; + } + if (localBlock.hash === undefined) { + return false; + } + return sourceTip.block.hash !== localBlock.hash; + } + /** * Returns whether the source and local agree on the block hash at a given height. * @param blockNumber - The block number to test. @@ -327,7 +249,13 @@ export class L2BlockStream { private async emitEvent(event: L2BlockStreamEvent) { this.log.debug( - `Emitting ${event.type} (${event.type === 'blocks-added' ? event.blocks.length : event.type === 'chain-checkpointed' ? event.checkpoint.checkpoint.number : event.block.number})`, + `Emitting ${event.type} (${ + event.type === 'blocks-added' + ? event.blocks.length + : event.type === 'chain-checkpointed' + ? event.checkpoint.number + : event.block.number + })`, ); await this.handler.handleBlockStreamEvent(event); if (!this.isRunning() && !this.isSyncing) { diff --git a/yarn-project/stdlib/src/block/l2_block_stream/l2_tips_store_base.ts b/yarn-project/stdlib/src/block/l2_block_stream/l2_tips_store_base.ts index 37a576424b29..4a2a317e18bb 100644 --- a/yarn-project/stdlib/src/block/l2_block_stream/l2_tips_store_base.ts +++ b/yarn-project/stdlib/src/block/l2_block_stream/l2_tips_store_base.ts @@ -153,12 +153,8 @@ export abstract class L2TipsStoreBase implements L2BlockStreamEventHandler, L2Bl return; } await this.runInTransaction(async () => { - const checkpointId: CheckpointId = { - number: event.checkpoint.checkpoint.number, - hash: event.checkpoint.checkpoint.hash().toString(), - }; await this.saveTag('checkpointed', event.block); - await this.setTipCheckpoint('checkpointed', checkpointId); + await this.setTipCheckpoint('checkpointed', event.checkpoint); }); } diff --git a/yarn-project/stdlib/src/block/test/l2_tips_store_test_suite.ts b/yarn-project/stdlib/src/block/test/l2_tips_store_test_suite.ts index 7335058c452f..e8a928e5fb3d 100644 --- a/yarn-project/stdlib/src/block/test/l2_tips_store_test_suite.ts +++ b/yarn-project/stdlib/src/block/test/l2_tips_store_test_suite.ts @@ -94,14 +94,18 @@ export function testL2TipsStore(makeTipsStore: () => Promise) { return new PublishedCheckpoint(checkpoint, L1PublishedData.random(), []); }; - /** Creates a chain-checkpointed event with the required block field */ - const makeCheckpointedEvent = async (checkpoint: PublishedCheckpoint) => { - const lastBlock = checkpoint.checkpoint.blocks.at(-1)!; + /** Creates a thin chain-checkpointed event carrying the block + checkpoint ids of the checkpoint's last block. */ + const makeCheckpointedEvent = async (published: PublishedCheckpoint) => { + const lastBlock = published.checkpoint.blocks.at(-1)!; const blockId: L2BlockId = { number: lastBlock.number, hash: (await lastBlock.hash()).toString(), }; - return { type: 'chain-checkpointed' as const, checkpoint, block: blockId }; + const checkpointId = { + number: published.checkpoint.number, + hash: published.checkpoint.hash().toString(), + }; + return { type: 'chain-checkpointed' as const, checkpoint: checkpointId, block: blockId }; }; it('returns zero if no tips are stored', async () => { diff --git a/yarn-project/telemetry-client/src/wrappers/l2_block_stream.ts b/yarn-project/telemetry-client/src/wrappers/l2_block_stream.ts index 90b58b6a5a62..c6f49966cdb5 100644 --- a/yarn-project/telemetry-client/src/wrappers/l2_block_stream.ts +++ b/yarn-project/telemetry-client/src/wrappers/l2_block_stream.ts @@ -11,7 +11,7 @@ import { type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client' /** Extends an L2BlockStream with a tracer to create a new trace per iteration. */ export class TraceableL2BlockStream extends L2BlockStream implements Traceable { constructor( - l2BlockSource: Pick, + l2BlockSource: Pick, localData: L2BlockStreamLocalDataProvider, handler: L2BlockStreamEventHandler, public readonly tracer: Tracer,