From 1405dbf0bbca270c09fd5416d35d585f6a06b806 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Tue, 9 Jun 2026 21:51:20 -0300 Subject: [PATCH] fix(p2p): drive tx protection release from synced blocks instead of wall clock The tx pool protects txs referenced by an in-flight block proposal by keying them to the proposal's slot and removing them from the pending indices, then releases the protection once the slot has passed. Release used to be driven by a wall-clock slot monitor polling the epoch cache. That monitor could fire after a proposal's checkpoint landed on L1 but before the block stream delivered blocks-added: the just-landed txs were unprotected into pending, where eviction or nullifier-conflict resolution could delete them before mined-marking ran, and a later reorg had nothing to restore. Make the protection lifecycle fully event-driven and delete the slot monitor: - Release on local validation failure: a proposal that fails validation releases only the protection entries still keyed to its slot, so a tx also referenced by a live proposal at another slot stays protected. - Collect via synced block slots: prepareForSlot now runs inside the blocks-added handler with the last synced block's slot, after mined-marking, so the unprotect-before-mined race is impossible by construction. - Mining now clears a tx's protection entry, since a mined tx supersedes any protection. Protections are in-memory only and are not rehydrated on restart. Proposers are unaffected: the sequencer still calls prepareForSlot(targetSlot) directly. --- yarn-project/foundation/src/config/env_var.ts | 1 - .../p2p/src/client/p2p_client.test.ts | 37 ++++ yarn-project/p2p/src/client/p2p_client.ts | 41 +--- yarn-project/p2p/src/config.ts | 8 - .../src/mem_pools/tx_pool_v2/interfaces.ts | 11 ++ .../mem_pools/tx_pool_v2/tx_pool_indices.ts | 12 ++ .../mem_pools/tx_pool_v2/tx_pool_v2.test.ts | 176 +++++++++++++++--- .../src/mem_pools/tx_pool_v2/tx_pool_v2.ts | 4 + .../mem_pools/tx_pool_v2/tx_pool_v2_impl.ts | 63 +++++-- .../services/libp2p/libp2p_service.test.ts | 25 ++- .../p2p/src/services/libp2p/libp2p_service.ts | 3 + .../p2p/src/test-helpers/testbench-utils.ts | 4 + 12 files changed, 299 insertions(+), 86 deletions(-) diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index a885d0e6224b..d575c1906e24 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -118,7 +118,6 @@ export type EnvVar = | 'P2P_BATCH_TX_REQUESTER_TX_BATCH_SIZE' | 'P2P_BATCH_TX_REQUESTER_BAD_PEER_THRESHOLD' | 'P2P_BLOCK_CHECK_INTERVAL_MS' - | 'P2P_SLOT_CHECK_INTERVAL_MS' | 'P2P_BLOCK_REQUEST_BATCH_SIZE' | 'P2P_BOOTSTRAP_NODE_ENR_VERSION_CHECK' | 'P2P_BOOTSTRAP_NODES_AS_FULL_PEERS' diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index 01c03e0d206d..ac08036b58a6 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -404,6 +404,43 @@ describe('P2P Client', () => { expect(await client.getSyncedLatestBlockNum()).toEqual(102); }); + it('prepares the pool for the last synced block slot after marking txs mined', async () => { + await client.start(); + + blockSource.addProposedBlocks([ + await L2Block.random(BlockNumber(101), { slotNumber: SlotNumber(150) }), + await L2Block.random(BlockNumber(102), { slotNumber: SlotNumber(151) }), + ]); + await client.sync(); + + // Release is driven by the synced block slot, not the wall clock: prepareForSlot is ultimately + // driven with the last synced block's slot (regardless of how the stream batches the blocks), + // never reads the epoch cache, and never targets a slot beyond what has synced. + expect(txPool.prepareForSlot).toHaveBeenLastCalledWith(SlotNumber(151)); + const preparedSlots = txPool.prepareForSlot.mock.calls.map(([slot]) => Number(slot)); + expect(Math.max(...preparedSlots)).toBe(151); + expect(epochCache.getCurrentAndNextSlot).not.toHaveBeenCalled(); + // Mined-marking runs before the matching-slot release within the handler. + expect(txPool.handleMinedBlock.mock.invocationCallOrder.at(-1)!).toBeLessThan( + txPool.prepareForSlot.mock.invocationCallOrder.at(-1)!, + ); + }); + + it('does not re-prepare for a slot that does not advance', async () => { + await client.start(); + + blockSource.addProposedBlocks([await L2Block.random(BlockNumber(101), { slotNumber: SlotNumber(150) })]); + await client.sync(); + const callsAfterFirst = txPool.prepareForSlot.mock.calls.length; + expect(txPool.prepareForSlot).toHaveBeenLastCalledWith(SlotNumber(150)); + + // A later block at an earlier slot must not advance the prepared-for slot + blockSource.addProposedBlocks([await L2Block.random(BlockNumber(102), { slotNumber: SlotNumber(149) })]); + await client.sync(); + expect(txPool.prepareForSlot.mock.calls.length).toBe(callsAfterFirst); + expect(txPool.prepareForSlot).not.toHaveBeenCalledWith(SlotNumber(149)); + }); + it('handles proven and finalized chain behind starting point', async () => { blockSource.setProvenBlockNumber(0); blockSource.setFinalizedBlockNumber(0); diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index 2d07cfd79c92..c26aac46b805 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -6,7 +6,6 @@ import { SlotNumber, } from '@aztec/foundation/branded-types'; import { createLogger } from '@aztec/foundation/log'; -import { RunningPromise } from '@aztec/foundation/promise'; import { DateProvider } from '@aztec/foundation/timer'; import type { AztecAsyncKVStore, AztecAsyncSingleton } from '@aztec/kv-store'; import { L2TipsKVStore } from '@aztec/kv-store/stores'; @@ -83,9 +82,6 @@ export class P2PClient extends WithTracer implements P2P { /** Tracks the last slot for which we called prepareForSlot */ private lastSlotProcessed: SlotNumber = SlotNumber.ZERO; - /** Polls for slot changes and calls prepareForSlot on the tx pool */ - private slotMonitor: RunningPromise | undefined; - constructor( private store: AztecAsyncKVStore, private l2BlockSource: L2BlockSource & ContractDataSource, @@ -268,14 +264,6 @@ export class P2PClient extends WithTracer implements P2P { this.blockStream.start(); this.txFileStore?.start(); - // Start slot monitor to call prepareForSlot when the slot changes - this.slotMonitor = new RunningPromise( - () => this.maybeCallPrepareForSlot(), - this.log, - this.config.slotCheckIntervalMS, - ); - this.slotMonitor.start(); - return this.syncPromise; } @@ -302,8 +290,6 @@ export class P2PClient extends WithTracer implements P2P { */ public async stop() { this.log.debug('Stopping p2p client...'); - await this.slotMonitor?.stop(); - this.log.debug('Stopped slot monitor'); await tryStop(this.txCollection); this.log.debug('Stopped tx collection service'); await this.txFileStore?.stop(); @@ -642,11 +628,16 @@ export class P2PClient extends WithTracer implements P2P { return; } + const lastBlock = blocks.at(-1)!; + const lastSlot = lastBlock.header.getSlot(); + + // Mark txs mined before releasing protections: a block landing at this slot supersedes any + // protection for txs it includes, so mined-marking must run first to keep just-landed txs from + // being unprotected into pending where they could be evicted before they are recorded as mined. await this.handleMinedBlocks(blocks); - await this.maybeCallPrepareForSlot(); + await this.maybeCallPrepareForSlot(lastSlot); await this.collectingMissingTxs(blocks); - const lastBlock = blocks.at(-1)!; - await this.synchedLatestSlot.set(BigInt(lastBlock.header.getSlot())); + await this.synchedLatestSlot.set(BigInt(lastSlot)); } /** Request txs for unproven blocks so the prover node can prove. */ @@ -741,20 +732,8 @@ export class P2PClient extends WithTracer implements P2P { return isEpochPrune; } - /** Checks if the slot has changed and calls prepareForSlot if so. */ - private async maybeCallPrepareForSlot(): Promise { - // If we have a proposed checkpoint available, we want to prepare the target slot - otherwise we prepare the current slot - const l2Tips = await this.l2Tips.getL2Tips(); - const hasProposedCheckpoint = l2Tips.proposedCheckpoint.checkpoint.number > l2Tips.checkpointed.checkpoint.number; - - let slot; - if (hasProposedCheckpoint) { - const { targetSlot } = this.epochCache.getTargetAndNextSlot(); - slot = targetSlot; - } else { - const { currentSlot } = this.epochCache.getCurrentAndNextSlot(); - slot = currentSlot; - } + /** Calls prepareForSlot for the given slot if it advances past the last slot we prepared for. */ + private async maybeCallPrepareForSlot(slot: SlotNumber): Promise { if (slot <= this.lastSlotProcessed) { return; } diff --git a/yarn-project/p2p/src/config.ts b/yarn-project/p2p/src/config.ts index 5dfdfed7a915..33a147b2c75f 100644 --- a/yarn-project/p2p/src/config.ts +++ b/yarn-project/p2p/src/config.ts @@ -71,9 +71,6 @@ export interface P2PConfig /** The frequency in which to check for new L2 blocks. */ blockCheckIntervalMS: number; - /** The frequency in which to check for new L2 slots. */ - slotCheckIntervalMS: number; - /** The number of blocks to fetch in a single batch. */ blockRequestBatchSize: number; @@ -317,11 +314,6 @@ export const p2pConfigMappings: ConfigMappingsType = { description: 'The frequency in which to check for new L2 blocks.', ...numberConfigHelper(100), }, - slotCheckIntervalMS: { - env: 'P2P_SLOT_CHECK_INTERVAL_MS', - description: 'The frequency in which to check for new L2 slots.', - ...numberConfigHelper(1000), - }, debugDisableColocationPenalty: { env: 'DEBUG_P2P_DISABLE_COLOCATION_PENALTY', description: 'DEBUG: Disable colocation penalty - NEVER set to true in production', diff --git a/yarn-project/p2p/src/mem_pools/tx_pool_v2/interfaces.ts b/yarn-project/p2p/src/mem_pools/tx_pool_v2/interfaces.ts index c49f2fce3805..2f7fab151b30 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool_v2/interfaces.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool_v2/interfaces.ts @@ -170,6 +170,17 @@ export interface TxPoolV2 extends TypedEventEmitter { */ prepareForSlot(slotNumber: SlotNumber): Promise; + /** + * Releases the protections a failed block proposal created and restores the txs to pending. + * Only clears protection entries still recorded at exactly the given slot: a tx that another, + * still-live proposal raised to a higher slot via {@link protectTxs} keeps its protection, and + * mined txs (which carry no protection entry) are left untouched. Restored txs are re-validated + * and resolved against nullifier conflicts before re-entering the pending indices. + * @param txHashes - Hashes of the proposal's txs to release. + * @param slotNumber - The slot the failed proposal targeted; protection is released only for this slot. + */ + unprotectTxs(txHashes: TxHash[], slotNumber: SlotNumber): Promise; + /** * Handles pruned blocks during a reorg. * Un-mines all transactions mined in blocks beyond the given latest block diff --git a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_indices.ts b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_indices.ts index f39a4c4e5a09..d0dcc8200564 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_indices.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_indices.ts @@ -131,6 +131,9 @@ export class TxPoolIndices { meta.minedL2BlockId = blockId; // Safe to call unconditionally - removeFromPendingIndices is idempotent this.#removeFromPendingIndices(meta); + // A mined tx supersedes any protection: drop the stale entry so it can't linger in the map and + // be matched by later protection scans. + this.#protectedTransactions.delete(meta.txHash); } /** Clears the mined status from a transaction */ @@ -316,6 +319,15 @@ export class TxPoolIndices { return result; } + /** + * From the given hashes, returns those whose protection is recorded at exactly the given slot. + * Used to release the protections a single block proposal created without disturbing entries a + * later proposal raised to a higher slot via updateProtection. + */ + findProtectedTxsAtSlot(txHashes: string[], slotNumber: SlotNumber): string[] { + return txHashes.filter(txHash => this.#protectedTransactions.get(txHash) === slotNumber); + } + /** Filters out transactions that are currently protected */ filterUnprotected(txs: TxMetaData[]): TxMetaData[] { return txs.filter(meta => !this.#protectedTransactions.has(meta.txHash)); diff --git a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.test.ts b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.test.ts index a7bde8270d3a..fe9dbe572f09 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.test.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.test.ts @@ -1515,6 +1515,23 @@ describe('TxPoolV2', () => { expectNoCallbacks(); // State transition only, tx not removed from pool }); + it('a tx protected at an earlier slot stays mined when its block lands, not unprotected to pending', async () => { + // Models the event-driven release: the block stream handler marks txs mined for the landed + // block before releasing protections from earlier slots. A tx protected at slot 1 that lands + // in a block at slot 2 must end up mined, never bounced through pending where it could be lost. + const tx = await mockTx(1); + await pool.addProtectedTxs([tx], slot1Header); + expectAddedTxs(tx); + expect(await pool.getTxStatus(tx.getTxHash())).toBe('protected'); + + // Mark mined first (as the blocks-added handler does), then release earlier-slot protections. + await pool.handleMinedBlock(makeBlock([tx], slot2Header)); + await pool.prepareForSlot(SlotNumber(2)); + + expect(await pool.getTxStatus(tx.getTxHash())).toBe('mined'); + expect(await pool.getPendingTxCount()).toBe(0); + }); + it('handles empty block gracefully', async () => { // Should not throw when processing an empty block await pool.handleMinedBlock(makeEmptyBlock(slot1Header)); @@ -1800,6 +1817,97 @@ describe('TxPoolV2', () => { }); }); + describe('unprotectTxs', () => { + it('restores matching-slot protected txs to pending', async () => { + const tx = await mockTx(1); + await pool.addProtectedTxs([tx], slot1Header); + expectAddedTxs(tx); + expect(await pool.getTxStatus(tx.getTxHash())).toBe('protected'); + + await pool.unprotectTxs([tx.getTxHash()], SlotNumber(1)); + + expect(await pool.getTxStatus(tx.getTxHash())).toBe('pending'); + expect(await pool.getPendingTxCount()).toBe(1); + expectNoCallbacks(); // state transition only, tx not added or removed + }); + + it('leaves protections recorded at a later slot untouched', async () => { + const tx = await mockTx(1); + // Protected for slot 1, then raised to slot 2 by a second live proposal + await pool.addProtectedTxs([tx], slot1Header); + expectAddedTxs(tx); + await pool.addProtectedTxs([tx], slot2Header); + expect(await pool.getTxStatus(tx.getTxHash())).toBe('protected'); + + // The failed slot-1 proposal releases its protection, but the slot-2 protection survives + await pool.unprotectTxs([tx.getTxHash()], SlotNumber(1)); + + expect(await pool.getTxStatus(tx.getTxHash())).toBe('protected'); + expectNoCallbacks(); + }); + + it('only releases the requested hashes that match the slot', async () => { + const txReleased = await mockTx(1); + const txKeptLaterSlot = await mockTx(2); + const txUnrelated = await mockTx(3); + + await pool.addProtectedTxs([txReleased], slot1Header); + await pool.addProtectedTxs([txKeptLaterSlot], slot2Header); + await pool.addProtectedTxs([txUnrelated], slot1Header); + clearCallbackTracking(); + + // Failed proposal at slot 1 referenced only txReleased and txKeptLaterSlot + await pool.unprotectTxs([txReleased.getTxHash(), txKeptLaterSlot.getTxHash()], SlotNumber(1)); + + expect(await pool.getTxStatus(txReleased.getTxHash())).toBe('pending'); + expect(await pool.getTxStatus(txKeptLaterSlot.getTxHash())).toBe('protected'); // slot 2, not released + expect(await pool.getTxStatus(txUnrelated.getTxHash())).toBe('protected'); // not in the hash list + }); + + it('leaves mined txs untouched', async () => { + const tx = await mockTx(1); + // Protected, then mined at slot 1 (mining clears the protection entry) + await pool.addProtectedTxs([tx], slot1Header); + await pool.handleMinedBlock(makeBlock([tx], slot1Header)); + expect(await pool.getTxStatus(tx.getTxHash())).toBe('mined'); + clearCallbackTracking(); + + await pool.unprotectTxs([tx.getTxHash()], SlotNumber(1)); + + expect(await pool.getTxStatus(tx.getTxHash())).toBe('mined'); + expect(await pool.getPendingTxCount()).toBe(0); + expectNoCallbacks(); + }); + + it('is a no-op when no protection matches the slot', async () => { + const tx = await mockTx(1); + await pool.addProtectedTxs([tx], slot2Header); + clearCallbackTracking(); + + await pool.unprotectTxs([tx.getTxHash()], SlotNumber(1)); + + expect(await pool.getTxStatus(tx.getTxHash())).toBe('protected'); + expectNoCallbacks(); + }); + + it('resolves nullifier conflicts when restoring to pending', async () => { + const txPending = await mockPublicTx(1, 5); + const txProtected = await mockPublicTx(2, 20); + // Protected tx shares a nullifier with the pending tx but has higher priority + setNullifier(txProtected, 0, getNullifier(txPending, 0)); + + await pool.addPendingTxs([txPending]); + await pool.addProtectedTxs([txProtected], slot1Header); + clearCallbackTracking(); + + await pool.unprotectTxs([txProtected.getTxHash()], SlotNumber(1)); + + // Higher-priority unprotected tx wins the conflict; the pending loser is evicted + expect(await pool.getTxStatus(txProtected.getTxHash())).toBe('pending'); + expect(await pool.getTxStatus(txPending.getTxHash())).toBe('deleted'); + }); + }); + describe('handlePrunedBlocks', () => { it('un-mines transactions from pruned block', async () => { const tx = await mockTx(1); @@ -2108,6 +2216,23 @@ describe('TxPoolV2', () => { expect(await poolWithValidator.getPendingTxCount()).toBe(0); }); + it('unprotectTxs deletes tx that fails validation when restoring', async () => { + const tx = await mockTx(1); + + await poolWithValidator.addProtectedTxs([tx], slot1Header); + expect(await poolWithValidator.getTxStatus(tx.getTxHash())).toBe('protected'); + + mockValidator.validateTx.mockResolvedValue({ + result: 'invalid', + reason: ['tx expired'], + }); + + await poolWithValidator.unprotectTxs([tx.getTxHash()], SlotNumber(1)); + + expect(await poolWithValidator.getTxStatus(tx.getTxHash())).toBe('deleted'); + expect(await poolWithValidator.getPendingTxCount()).toBe(0); + }); + it('prepareForSlot keeps tx that passes validation when unprotecting', async () => { const tx = await mockTx(1); @@ -2783,60 +2908,50 @@ describe('TxPoolV2', () => { }); describe('protected tx in pruned block', () => { - it('protected tx during prune that later fails validation should be soft-deleted', async () => { + it('mined tx from pruned block that fails revalidation on un-mine should be soft-deleted', async () => { const tx = await mockTx(1); - // Add, protect, and mine the tx + // Add, protect, and mine the tx. Mining clears the protection entry. await poolWithValidator.addPendingTxs([tx]); await poolWithValidator.addProtectedTxs([tx], slot1Header); await poolWithValidator.handleMinedBlock(makeBlock([tx], slot1Header)); expect(await poolWithValidator.getTxStatus(tx.getTxHash())).toBe('mined'); - // Prune - tx is un-mined but stays protected (validator passes at this point) - await poolWithValidator.handlePrunedBlocks(block0Id); - expect(await poolWithValidator.getTxStatus(tx.getTxHash())).toBe('protected'); - - // Now make validator reject this tx + // Make validator reject this tx, then prune. handlePrunedBlocks revalidates un-mined txs. mockValidator.validateTx.mockResolvedValue({ result: 'invalid', reason: ['timestamp expired'], }); - - // Unprotect (prepareForSlot) - tx fails validation - await poolWithValidator.prepareForSlot(SlotNumber(2)); + await poolWithValidator.handlePrunedBlocks(block0Id); // The tx was in a pruned block, so it should be SOFT-deleted, not hard-deleted expect(await poolWithValidator.getTxStatus(tx.getTxHash())).toBe('deleted'); expect(await poolWithValidator.getTxByHash(tx.getTxHash())).toBeDefined(); }); - it('protected tx during prune that later loses nullifier conflict should be soft-deleted', async () => { - const txProtected = await mockPublicTx(1, 5); + it('mined tx from pruned block that loses nullifier conflict on un-mine should be soft-deleted', async () => { + const txMined = await mockPublicTx(1, 5); const txHigherPriority = await mockPublicTx(2, 10); // Give them the same nullifier - setNullifier(txHigherPriority, 0, getNullifier(txProtected, 0)); + setNullifier(txHigherPriority, 0, getNullifier(txMined, 0)); - // Add, protect, and mine txProtected - await poolWithValidator.addPendingTxs([txProtected]); - await poolWithValidator.addProtectedTxs([txProtected], slot1Header); - await poolWithValidator.handleMinedBlock(makeBlock([txProtected], slot1Header)); - expect(await poolWithValidator.getTxStatus(txProtected.getTxHash())).toBe('mined'); + // Add, protect, and mine txMined. Mining clears the protection entry. + await poolWithValidator.addPendingTxs([txMined]); + await poolWithValidator.addProtectedTxs([txMined], slot1Header); + await poolWithValidator.handleMinedBlock(makeBlock([txMined], slot1Header)); + expect(await poolWithValidator.getTxStatus(txMined.getTxHash())).toBe('mined'); - // Prune - txProtected is un-mined but stays protected - await poolWithValidator.handlePrunedBlocks(block0Id); - expect(await poolWithValidator.getTxStatus(txProtected.getTxHash())).toBe('protected'); - - // Now add a higher priority tx with same nullifier + // Add a higher priority pending tx with the same nullifier await poolWithValidator.addPendingTxs([txHigherPriority]); expect(await poolWithValidator.getTxStatus(txHigherPriority.getTxHash())).toBe('pending'); - // Unprotect (prepareForSlot) - txProtected loses nullifier conflict - await poolWithValidator.prepareForSlot(SlotNumber(2)); + // Prune - txMined is un-mined and loses the nullifier conflict during handlePrunedBlocks + await poolWithValidator.handlePrunedBlocks(block0Id); // The tx was in a pruned block, so it should be SOFT-deleted, not hard-deleted - expect(await poolWithValidator.getTxStatus(txProtected.getTxHash())).toBe('deleted'); - expect(await poolWithValidator.getTxByHash(txProtected.getTxHash())).toBeDefined(); + expect(await poolWithValidator.getTxStatus(txMined.getTxHash())).toBe('deleted'); + expect(await poolWithValidator.getTxByHash(txMined.getTxHash())).toBeDefined(); // Higher priority tx should be pending expect(await poolWithValidator.getTxStatus(txHigherPriority.getTxHash())).toBe('pending'); @@ -2971,7 +3086,7 @@ describe('TxPoolV2', () => { expect(await pool.getTxStatus(tx.getTxHash())).toBe('pending'); }); - it('pending -> protected -> mined -> protected (reorg, still valid)', async () => { + it('pending -> protected -> mined -> pending (reorg, still valid)', async () => { const tx = await mockTx(1); await pool.addPendingTxs([tx]); @@ -2982,10 +3097,11 @@ describe('TxPoolV2', () => { expectNoCallbacks(); expect(await pool.getTxStatus(tx.getTxHash())).toBe('mined'); - // After reorg, tx retains its protection status (protection is managed by prepareForSlot) + // Mining supersedes protection and clears its entry, so a later reorg un-mines the tx back to + // pending (not protected). Event-driven release then handles it like any other pending tx. await pool.handlePrunedBlocks(block0Id); expectNoCallbacks(); // State transition only - expect(await pool.getTxStatus(tx.getTxHash())).toBe('protected'); + expect(await pool.getTxStatus(tx.getTxHash())).toBe('pending'); }); it('N/A -> protected -> mined -> deleted (req/resp flow)', async () => { diff --git a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.ts b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.ts index f3ab87678659..9fc60e7c2c3b 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.ts @@ -110,6 +110,10 @@ export class AztecKVTxPoolV2 extends (EventEmitter as new () => TypedEventEmitte return this.#queue.put(() => this.#impl.prepareForSlot(slotNumber)); } + unprotectTxs(txHashes: TxHash[], slotNumber: SlotNumber): Promise { + return this.#queue.put(() => this.#impl.unprotectTxs(txHashes, slotNumber)); + } + handlePrunedBlocks(latestBlock: L2BlockId, options?: { deleteAllTxs?: boolean }): Promise { return this.#queue.put(() => this.#impl.handlePrunedBlocks(latestBlock, options)); } diff --git a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2_impl.ts b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2_impl.ts index 90053a0fb3e6..8cfb4a7c24ab 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2_impl.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2_impl.ts @@ -585,29 +585,62 @@ export class TxPoolV2Impl { } this.#log.info(`Preparing for slot ${slotNumber}: unprotecting ${txsToRestore.length} txs`); + await this.#restoreUnprotectedToPending(txsToRestore, 'during prepareForSlot'); + }); + } - // Step 4: Validate for pending pool - const { valid, invalid } = await this.#revalidateMetadata(txsToRestore, 'during prepareForSlot'); + async unprotectTxs(txHashes: TxHash[], slotNumber: SlotNumber): Promise { + const hashStrs = txHashes.map(h => h.toString()); - // Step 5: Resolve nullifier conflicts and add winners to pending indices - const { added, toEvict } = this.#applyNullifierConflictResolution(valid); + await this.#store.transactionAsync(async () => { + // Only release entries still recorded at this exact slot. A tx that another, still-live proposal + // raised to a higher slot via updateProtection must keep that protection. Mined txs have no + // protection entry and so are never matched here. + const matching = this.#indices.findProtectedTxsAtSlot(hashStrs, slotNumber); + if (matching.length === 0) { + this.#log.debug(`Unprotecting txs for slot ${slotNumber}: no matching protections to release`); + return; + } - // Step 6: Delete invalid txs and evict conflict losers - await this.#deleteTxsBatch(invalid); - await this.#evictTxs(toEvict, 'NullifierConflict'); + this.#indices.clearProtection(matching); - // Step 7: Run eviction rules (enforce pool size limit) - if (added.length > 0) { - const feePayers = added.map(meta => meta.feePayer); - const uniqueFeePayers = new Set(feePayers); - await this.#evictionManager.evictAfterNewTxs( - added.map(m => m.txHash), - [...uniqueFeePayers], - ); + const txsToRestore = this.#indices.filterRestorable(matching); + if (txsToRestore.length === 0) { + return; } + + this.#log.info(`Unprotecting ${txsToRestore.length} txs from failed proposal at slot ${slotNumber}`); + await this.#restoreUnprotectedToPending(txsToRestore, 'during unprotectTxs'); }); } + /** + * Returns just-unprotected txs to the pending pool: revalidates them, resolves nullifier + * conflicts, deletes losers, then enforces the pool size limit. Must run inside a store + * transaction; callers clear the protection entries before invoking. + */ + async #restoreUnprotectedToPending(txsToRestore: TxMetaData[], context: string): Promise { + // Step 1: Validate for pending pool + const { valid, invalid } = await this.#revalidateMetadata(txsToRestore, context); + + // Step 2: Resolve nullifier conflicts and add winners to pending indices + const { added, toEvict } = this.#applyNullifierConflictResolution(valid); + + // Step 3: Delete invalid txs and evict conflict losers + await this.#deleteTxsBatch(invalid); + await this.#evictTxs(toEvict, 'NullifierConflict'); + + // Step 4: Run eviction rules (enforce pool size limit) + if (added.length > 0) { + const feePayers = added.map(meta => meta.feePayer); + const uniqueFeePayers = new Set(feePayers); + await this.#evictionManager.evictAfterNewTxs( + added.map(m => m.txHash), + [...uniqueFeePayers], + ); + } + } + async handlePrunedBlocks(latestBlock: L2BlockId, options?: { deleteAllTxs?: boolean }): Promise { // Step 1: Find transactions mined after the prune point const txsToUnmine = this.#indices.findTxsMinedAfter(latestBlock.number); diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts index 276debeeba3e..b688db763ad7 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts @@ -586,7 +586,7 @@ describe('LibP2PService', () => { mockEpochCache, ); - blockReceivedCallback = jest.fn().mockImplementation(() => Promise.resolve(true)); + blockReceivedCallback = jest.fn().mockImplementation(() => Promise.resolve(true)); duplicateProposalCallback = jest.fn(); service.registerBlockReceivedCallback(blockReceivedCallback as any); service.registerDuplicateProposalCallback(duplicateProposalCallback); @@ -785,6 +785,29 @@ describe('LibP2PService', () => { expect(reportMessageValidationResultSpy).toHaveBeenCalledWith('msg-1', MOCK_PEER_ID, TopicValidatorResult.Reject); }); + it('local validation failure releases the protections it created', async () => { + const header = makeBlockHeader(1, { slotNumber: targetSlot }); + const proposal = await makeBlockProposal({ signer, blockHeader: header }); + blockReceivedCallback.mockImplementationOnce(() => Promise.resolve(false)); + + await service.processBlockFromPeer(proposal.toBuffer(), 'msg-1', mockPeerId); + + expect(mockTxPool.protectTxs).toHaveBeenCalledTimes(1); + // The failed proposal releases exactly the txs it protected, keyed to its slot. + expect(mockTxPool.unprotectTxs).toHaveBeenCalledTimes(1); + expect(mockTxPool.unprotectTxs).toHaveBeenCalledWith(proposal.txHashes, targetSlot); + }); + + it('successful local validation does not release protections', async () => { + const header = makeBlockHeader(1, { slotNumber: targetSlot }); + const proposal = await makeBlockProposal({ signer, blockHeader: header }); + + await service.processBlockFromPeer(proposal.toBuffer(), 'msg-1', mockPeerId); + + expect(mockTxPool.protectTxs).toHaveBeenCalledTimes(1); + expect(mockTxPool.unprotectTxs).not.toHaveBeenCalled(); + }); + // Regression for A-1013: payloads sharing (slot, position, archive) but differing on another // signed field (e.g. inHash) used to dedup by archive only and silently drop the second one. // The pool now dedups by signed-payload hash, so the equivocation surfaces. diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index 148a6e3a5b2b..fe7b5249fbc1 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -1363,6 +1363,9 @@ export class LibP2PService extends WithTracer implements P2PService { const isValid = await this.blockReceivedCallback(block, sender); if (!isValid) { this.logger.info(`Block proposal validation failed for block ${block.blockNumber}`, block.toBlockInfo()); + // Release the protections this proposal created so its txs return to pending. Only entries still + // keyed to this slot are cleared, so a tx referenced by a live proposal at another slot stays protected. + await this.mempools.txPool.unprotectTxs(block.txHashes, slot); } } diff --git a/yarn-project/p2p/src/test-helpers/testbench-utils.ts b/yarn-project/p2p/src/test-helpers/testbench-utils.ts index a9671f1b78e9..26fb73ff3b89 100644 --- a/yarn-project/p2p/src/test-helpers/testbench-utils.ts +++ b/yarn-project/p2p/src/test-helpers/testbench-utils.ts @@ -119,6 +119,10 @@ export class InMemoryTxPool extends EventEmitter implements TxPoolV2 { return Promise.resolve(); } + unprotectTxs(_txHashes: TxHash[], _slotNumber: SlotNumber): Promise { + return Promise.resolve(); + } + handlePrunedBlocks(_latestBlock: L2BlockId, _options?: { deleteAllTxs?: boolean }): Promise { return Promise.resolve(); }