diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index a885d0e6224b..d575c1906e24 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -118,7 +118,6 @@ export type EnvVar = | 'P2P_BATCH_TX_REQUESTER_TX_BATCH_SIZE' | 'P2P_BATCH_TX_REQUESTER_BAD_PEER_THRESHOLD' | 'P2P_BLOCK_CHECK_INTERVAL_MS' - | 'P2P_SLOT_CHECK_INTERVAL_MS' | 'P2P_BLOCK_REQUEST_BATCH_SIZE' | 'P2P_BOOTSTRAP_NODE_ENR_VERSION_CHECK' | 'P2P_BOOTSTRAP_NODES_AS_FULL_PEERS' diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index 01c03e0d206d..ac08036b58a6 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -404,6 +404,43 @@ describe('P2P Client', () => { expect(await client.getSyncedLatestBlockNum()).toEqual(102); }); + it('prepares the pool for the last synced block slot after marking txs mined', async () => { + await client.start(); + + blockSource.addProposedBlocks([ + await L2Block.random(BlockNumber(101), { slotNumber: SlotNumber(150) }), + await L2Block.random(BlockNumber(102), { slotNumber: SlotNumber(151) }), + ]); + await client.sync(); + + // Release is driven by the synced block slot, not the wall clock: prepareForSlot is ultimately + // driven with the last synced block's slot (regardless of how the stream batches the blocks), + // never reads the epoch cache, and never targets a slot beyond what has synced. + expect(txPool.prepareForSlot).toHaveBeenLastCalledWith(SlotNumber(151)); + const preparedSlots = txPool.prepareForSlot.mock.calls.map(([slot]) => Number(slot)); + expect(Math.max(...preparedSlots)).toBe(151); + expect(epochCache.getCurrentAndNextSlot).not.toHaveBeenCalled(); + // Mined-marking runs before the matching-slot release within the handler. + expect(txPool.handleMinedBlock.mock.invocationCallOrder.at(-1)!).toBeLessThan( + txPool.prepareForSlot.mock.invocationCallOrder.at(-1)!, + ); + }); + + it('does not re-prepare for a slot that does not advance', async () => { + await client.start(); + + blockSource.addProposedBlocks([await L2Block.random(BlockNumber(101), { slotNumber: SlotNumber(150) })]); + await client.sync(); + const callsAfterFirst = txPool.prepareForSlot.mock.calls.length; + expect(txPool.prepareForSlot).toHaveBeenLastCalledWith(SlotNumber(150)); + + // A later block at an earlier slot must not advance the prepared-for slot + blockSource.addProposedBlocks([await L2Block.random(BlockNumber(102), { slotNumber: SlotNumber(149) })]); + await client.sync(); + expect(txPool.prepareForSlot.mock.calls.length).toBe(callsAfterFirst); + expect(txPool.prepareForSlot).not.toHaveBeenCalledWith(SlotNumber(149)); + }); + it('handles proven and finalized chain behind starting point', async () => { blockSource.setProvenBlockNumber(0); blockSource.setFinalizedBlockNumber(0); diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index 2d07cfd79c92..c26aac46b805 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -6,7 +6,6 @@ import { SlotNumber, } from '@aztec/foundation/branded-types'; import { createLogger } from '@aztec/foundation/log'; -import { RunningPromise } from '@aztec/foundation/promise'; import { DateProvider } from '@aztec/foundation/timer'; import type { AztecAsyncKVStore, AztecAsyncSingleton } from '@aztec/kv-store'; import { L2TipsKVStore } from '@aztec/kv-store/stores'; @@ -83,9 +82,6 @@ export class P2PClient extends WithTracer implements P2P { /** Tracks the last slot for which we called prepareForSlot */ private lastSlotProcessed: SlotNumber = SlotNumber.ZERO; - /** Polls for slot changes and calls prepareForSlot on the tx pool */ - private slotMonitor: RunningPromise | undefined; - constructor( private store: AztecAsyncKVStore, private l2BlockSource: L2BlockSource & ContractDataSource, @@ -268,14 +264,6 @@ export class P2PClient extends WithTracer implements P2P { this.blockStream.start(); this.txFileStore?.start(); - // Start slot monitor to call prepareForSlot when the slot changes - this.slotMonitor = new RunningPromise( - () => this.maybeCallPrepareForSlot(), - this.log, - this.config.slotCheckIntervalMS, - ); - this.slotMonitor.start(); - return this.syncPromise; } @@ -302,8 +290,6 @@ export class P2PClient extends WithTracer implements P2P { */ public async stop() { this.log.debug('Stopping p2p client...'); - await this.slotMonitor?.stop(); - this.log.debug('Stopped slot monitor'); await tryStop(this.txCollection); this.log.debug('Stopped tx collection service'); await this.txFileStore?.stop(); @@ -642,11 +628,16 @@ export class P2PClient extends WithTracer implements P2P { return; } + const lastBlock = blocks.at(-1)!; + const lastSlot = lastBlock.header.getSlot(); + + // Mark txs mined before releasing protections: a block landing at this slot supersedes any + // protection for txs it includes, so mined-marking must run first to keep just-landed txs from + // being unprotected into pending where they could be evicted before they are recorded as mined. await this.handleMinedBlocks(blocks); - await this.maybeCallPrepareForSlot(); + await this.maybeCallPrepareForSlot(lastSlot); await this.collectingMissingTxs(blocks); - const lastBlock = blocks.at(-1)!; - await this.synchedLatestSlot.set(BigInt(lastBlock.header.getSlot())); + await this.synchedLatestSlot.set(BigInt(lastSlot)); } /** Request txs for unproven blocks so the prover node can prove. */ @@ -741,20 +732,8 @@ export class P2PClient extends WithTracer implements P2P { return isEpochPrune; } - /** Checks if the slot has changed and calls prepareForSlot if so. */ - private async maybeCallPrepareForSlot(): Promise { - // If we have a proposed checkpoint available, we want to prepare the target slot - otherwise we prepare the current slot - const l2Tips = await this.l2Tips.getL2Tips(); - const hasProposedCheckpoint = l2Tips.proposedCheckpoint.checkpoint.number > l2Tips.checkpointed.checkpoint.number; - - let slot; - if (hasProposedCheckpoint) { - const { targetSlot } = this.epochCache.getTargetAndNextSlot(); - slot = targetSlot; - } else { - const { currentSlot } = this.epochCache.getCurrentAndNextSlot(); - slot = currentSlot; - } + /** Calls prepareForSlot for the given slot if it advances past the last slot we prepared for. */ + private async maybeCallPrepareForSlot(slot: SlotNumber): Promise { if (slot <= this.lastSlotProcessed) { return; } diff --git a/yarn-project/p2p/src/config.ts b/yarn-project/p2p/src/config.ts index 5dfdfed7a915..33a147b2c75f 100644 --- a/yarn-project/p2p/src/config.ts +++ b/yarn-project/p2p/src/config.ts @@ -71,9 +71,6 @@ export interface P2PConfig /** The frequency in which to check for new L2 blocks. */ blockCheckIntervalMS: number; - /** The frequency in which to check for new L2 slots. */ - slotCheckIntervalMS: number; - /** The number of blocks to fetch in a single batch. */ blockRequestBatchSize: number; @@ -317,11 +314,6 @@ export const p2pConfigMappings: ConfigMappingsType = { description: 'The frequency in which to check for new L2 blocks.', ...numberConfigHelper(100), }, - slotCheckIntervalMS: { - env: 'P2P_SLOT_CHECK_INTERVAL_MS', - description: 'The frequency in which to check for new L2 slots.', - ...numberConfigHelper(1000), - }, debugDisableColocationPenalty: { env: 'DEBUG_P2P_DISABLE_COLOCATION_PENALTY', description: 'DEBUG: Disable colocation penalty - NEVER set to true in production', diff --git a/yarn-project/p2p/src/mem_pools/tx_pool_v2/interfaces.ts b/yarn-project/p2p/src/mem_pools/tx_pool_v2/interfaces.ts index c49f2fce3805..2f7fab151b30 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool_v2/interfaces.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool_v2/interfaces.ts @@ -170,6 +170,17 @@ export interface TxPoolV2 extends TypedEventEmitter { */ prepareForSlot(slotNumber: SlotNumber): Promise; + /** + * Releases the protections a failed block proposal created and restores the txs to pending. + * Only clears protection entries still recorded at exactly the given slot: a tx that another, + * still-live proposal raised to a higher slot via {@link protectTxs} keeps its protection, and + * mined txs (which carry no protection entry) are left untouched. Restored txs are re-validated + * and resolved against nullifier conflicts before re-entering the pending indices. + * @param txHashes - Hashes of the proposal's txs to release. + * @param slotNumber - The slot the failed proposal targeted; protection is released only for this slot. + */ + unprotectTxs(txHashes: TxHash[], slotNumber: SlotNumber): Promise; + /** * Handles pruned blocks during a reorg. * Un-mines all transactions mined in blocks beyond the given latest block diff --git a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_indices.ts b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_indices.ts index f39a4c4e5a09..d0dcc8200564 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_indices.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_indices.ts @@ -131,6 +131,9 @@ export class TxPoolIndices { meta.minedL2BlockId = blockId; // Safe to call unconditionally - removeFromPendingIndices is idempotent this.#removeFromPendingIndices(meta); + // A mined tx supersedes any protection: drop the stale entry so it can't linger in the map and + // be matched by later protection scans. + this.#protectedTransactions.delete(meta.txHash); } /** Clears the mined status from a transaction */ @@ -316,6 +319,15 @@ export class TxPoolIndices { return result; } + /** + * From the given hashes, returns those whose protection is recorded at exactly the given slot. + * Used to release the protections a single block proposal created without disturbing entries a + * later proposal raised to a higher slot via updateProtection. + */ + findProtectedTxsAtSlot(txHashes: string[], slotNumber: SlotNumber): string[] { + return txHashes.filter(txHash => this.#protectedTransactions.get(txHash) === slotNumber); + } + /** Filters out transactions that are currently protected */ filterUnprotected(txs: TxMetaData[]): TxMetaData[] { return txs.filter(meta => !this.#protectedTransactions.has(meta.txHash)); diff --git a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.test.ts b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.test.ts index a7bde8270d3a..fe9dbe572f09 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.test.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.test.ts @@ -1515,6 +1515,23 @@ describe('TxPoolV2', () => { expectNoCallbacks(); // State transition only, tx not removed from pool }); + it('a tx protected at an earlier slot stays mined when its block lands, not unprotected to pending', async () => { + // Models the event-driven release: the block stream handler marks txs mined for the landed + // block before releasing protections from earlier slots. A tx protected at slot 1 that lands + // in a block at slot 2 must end up mined, never bounced through pending where it could be lost. + const tx = await mockTx(1); + await pool.addProtectedTxs([tx], slot1Header); + expectAddedTxs(tx); + expect(await pool.getTxStatus(tx.getTxHash())).toBe('protected'); + + // Mark mined first (as the blocks-added handler does), then release earlier-slot protections. + await pool.handleMinedBlock(makeBlock([tx], slot2Header)); + await pool.prepareForSlot(SlotNumber(2)); + + expect(await pool.getTxStatus(tx.getTxHash())).toBe('mined'); + expect(await pool.getPendingTxCount()).toBe(0); + }); + it('handles empty block gracefully', async () => { // Should not throw when processing an empty block await pool.handleMinedBlock(makeEmptyBlock(slot1Header)); @@ -1800,6 +1817,97 @@ describe('TxPoolV2', () => { }); }); + describe('unprotectTxs', () => { + it('restores matching-slot protected txs to pending', async () => { + const tx = await mockTx(1); + await pool.addProtectedTxs([tx], slot1Header); + expectAddedTxs(tx); + expect(await pool.getTxStatus(tx.getTxHash())).toBe('protected'); + + await pool.unprotectTxs([tx.getTxHash()], SlotNumber(1)); + + expect(await pool.getTxStatus(tx.getTxHash())).toBe('pending'); + expect(await pool.getPendingTxCount()).toBe(1); + expectNoCallbacks(); // state transition only, tx not added or removed + }); + + it('leaves protections recorded at a later slot untouched', async () => { + const tx = await mockTx(1); + // Protected for slot 1, then raised to slot 2 by a second live proposal + await pool.addProtectedTxs([tx], slot1Header); + expectAddedTxs(tx); + await pool.addProtectedTxs([tx], slot2Header); + expect(await pool.getTxStatus(tx.getTxHash())).toBe('protected'); + + // The failed slot-1 proposal releases its protection, but the slot-2 protection survives + await pool.unprotectTxs([tx.getTxHash()], SlotNumber(1)); + + expect(await pool.getTxStatus(tx.getTxHash())).toBe('protected'); + expectNoCallbacks(); + }); + + it('only releases the requested hashes that match the slot', async () => { + const txReleased = await mockTx(1); + const txKeptLaterSlot = await mockTx(2); + const txUnrelated = await mockTx(3); + + await pool.addProtectedTxs([txReleased], slot1Header); + await pool.addProtectedTxs([txKeptLaterSlot], slot2Header); + await pool.addProtectedTxs([txUnrelated], slot1Header); + clearCallbackTracking(); + + // Failed proposal at slot 1 referenced only txReleased and txKeptLaterSlot + await pool.unprotectTxs([txReleased.getTxHash(), txKeptLaterSlot.getTxHash()], SlotNumber(1)); + + expect(await pool.getTxStatus(txReleased.getTxHash())).toBe('pending'); + expect(await pool.getTxStatus(txKeptLaterSlot.getTxHash())).toBe('protected'); // slot 2, not released + expect(await pool.getTxStatus(txUnrelated.getTxHash())).toBe('protected'); // not in the hash list + }); + + it('leaves mined txs untouched', async () => { + const tx = await mockTx(1); + // Protected, then mined at slot 1 (mining clears the protection entry) + await pool.addProtectedTxs([tx], slot1Header); + await pool.handleMinedBlock(makeBlock([tx], slot1Header)); + expect(await pool.getTxStatus(tx.getTxHash())).toBe('mined'); + clearCallbackTracking(); + + await pool.unprotectTxs([tx.getTxHash()], SlotNumber(1)); + + expect(await pool.getTxStatus(tx.getTxHash())).toBe('mined'); + expect(await pool.getPendingTxCount()).toBe(0); + expectNoCallbacks(); + }); + + it('is a no-op when no protection matches the slot', async () => { + const tx = await mockTx(1); + await pool.addProtectedTxs([tx], slot2Header); + clearCallbackTracking(); + + await pool.unprotectTxs([tx.getTxHash()], SlotNumber(1)); + + expect(await pool.getTxStatus(tx.getTxHash())).toBe('protected'); + expectNoCallbacks(); + }); + + it('resolves nullifier conflicts when restoring to pending', async () => { + const txPending = await mockPublicTx(1, 5); + const txProtected = await mockPublicTx(2, 20); + // Protected tx shares a nullifier with the pending tx but has higher priority + setNullifier(txProtected, 0, getNullifier(txPending, 0)); + + await pool.addPendingTxs([txPending]); + await pool.addProtectedTxs([txProtected], slot1Header); + clearCallbackTracking(); + + await pool.unprotectTxs([txProtected.getTxHash()], SlotNumber(1)); + + // Higher-priority unprotected tx wins the conflict; the pending loser is evicted + expect(await pool.getTxStatus(txProtected.getTxHash())).toBe('pending'); + expect(await pool.getTxStatus(txPending.getTxHash())).toBe('deleted'); + }); + }); + describe('handlePrunedBlocks', () => { it('un-mines transactions from pruned block', async () => { const tx = await mockTx(1); @@ -2108,6 +2216,23 @@ describe('TxPoolV2', () => { expect(await poolWithValidator.getPendingTxCount()).toBe(0); }); + it('unprotectTxs deletes tx that fails validation when restoring', async () => { + const tx = await mockTx(1); + + await poolWithValidator.addProtectedTxs([tx], slot1Header); + expect(await poolWithValidator.getTxStatus(tx.getTxHash())).toBe('protected'); + + mockValidator.validateTx.mockResolvedValue({ + result: 'invalid', + reason: ['tx expired'], + }); + + await poolWithValidator.unprotectTxs([tx.getTxHash()], SlotNumber(1)); + + expect(await poolWithValidator.getTxStatus(tx.getTxHash())).toBe('deleted'); + expect(await poolWithValidator.getPendingTxCount()).toBe(0); + }); + it('prepareForSlot keeps tx that passes validation when unprotecting', async () => { const tx = await mockTx(1); @@ -2783,60 +2908,50 @@ describe('TxPoolV2', () => { }); describe('protected tx in pruned block', () => { - it('protected tx during prune that later fails validation should be soft-deleted', async () => { + it('mined tx from pruned block that fails revalidation on un-mine should be soft-deleted', async () => { const tx = await mockTx(1); - // Add, protect, and mine the tx + // Add, protect, and mine the tx. Mining clears the protection entry. await poolWithValidator.addPendingTxs([tx]); await poolWithValidator.addProtectedTxs([tx], slot1Header); await poolWithValidator.handleMinedBlock(makeBlock([tx], slot1Header)); expect(await poolWithValidator.getTxStatus(tx.getTxHash())).toBe('mined'); - // Prune - tx is un-mined but stays protected (validator passes at this point) - await poolWithValidator.handlePrunedBlocks(block0Id); - expect(await poolWithValidator.getTxStatus(tx.getTxHash())).toBe('protected'); - - // Now make validator reject this tx + // Make validator reject this tx, then prune. handlePrunedBlocks revalidates un-mined txs. mockValidator.validateTx.mockResolvedValue({ result: 'invalid', reason: ['timestamp expired'], }); - - // Unprotect (prepareForSlot) - tx fails validation - await poolWithValidator.prepareForSlot(SlotNumber(2)); + await poolWithValidator.handlePrunedBlocks(block0Id); // The tx was in a pruned block, so it should be SOFT-deleted, not hard-deleted expect(await poolWithValidator.getTxStatus(tx.getTxHash())).toBe('deleted'); expect(await poolWithValidator.getTxByHash(tx.getTxHash())).toBeDefined(); }); - it('protected tx during prune that later loses nullifier conflict should be soft-deleted', async () => { - const txProtected = await mockPublicTx(1, 5); + it('mined tx from pruned block that loses nullifier conflict on un-mine should be soft-deleted', async () => { + const txMined = await mockPublicTx(1, 5); const txHigherPriority = await mockPublicTx(2, 10); // Give them the same nullifier - setNullifier(txHigherPriority, 0, getNullifier(txProtected, 0)); + setNullifier(txHigherPriority, 0, getNullifier(txMined, 0)); - // Add, protect, and mine txProtected - await poolWithValidator.addPendingTxs([txProtected]); - await poolWithValidator.addProtectedTxs([txProtected], slot1Header); - await poolWithValidator.handleMinedBlock(makeBlock([txProtected], slot1Header)); - expect(await poolWithValidator.getTxStatus(txProtected.getTxHash())).toBe('mined'); + // Add, protect, and mine txMined. Mining clears the protection entry. + await poolWithValidator.addPendingTxs([txMined]); + await poolWithValidator.addProtectedTxs([txMined], slot1Header); + await poolWithValidator.handleMinedBlock(makeBlock([txMined], slot1Header)); + expect(await poolWithValidator.getTxStatus(txMined.getTxHash())).toBe('mined'); - // Prune - txProtected is un-mined but stays protected - await poolWithValidator.handlePrunedBlocks(block0Id); - expect(await poolWithValidator.getTxStatus(txProtected.getTxHash())).toBe('protected'); - - // Now add a higher priority tx with same nullifier + // Add a higher priority pending tx with the same nullifier await poolWithValidator.addPendingTxs([txHigherPriority]); expect(await poolWithValidator.getTxStatus(txHigherPriority.getTxHash())).toBe('pending'); - // Unprotect (prepareForSlot) - txProtected loses nullifier conflict - await poolWithValidator.prepareForSlot(SlotNumber(2)); + // Prune - txMined is un-mined and loses the nullifier conflict during handlePrunedBlocks + await poolWithValidator.handlePrunedBlocks(block0Id); // The tx was in a pruned block, so it should be SOFT-deleted, not hard-deleted - expect(await poolWithValidator.getTxStatus(txProtected.getTxHash())).toBe('deleted'); - expect(await poolWithValidator.getTxByHash(txProtected.getTxHash())).toBeDefined(); + expect(await poolWithValidator.getTxStatus(txMined.getTxHash())).toBe('deleted'); + expect(await poolWithValidator.getTxByHash(txMined.getTxHash())).toBeDefined(); // Higher priority tx should be pending expect(await poolWithValidator.getTxStatus(txHigherPriority.getTxHash())).toBe('pending'); @@ -2971,7 +3086,7 @@ describe('TxPoolV2', () => { expect(await pool.getTxStatus(tx.getTxHash())).toBe('pending'); }); - it('pending -> protected -> mined -> protected (reorg, still valid)', async () => { + it('pending -> protected -> mined -> pending (reorg, still valid)', async () => { const tx = await mockTx(1); await pool.addPendingTxs([tx]); @@ -2982,10 +3097,11 @@ describe('TxPoolV2', () => { expectNoCallbacks(); expect(await pool.getTxStatus(tx.getTxHash())).toBe('mined'); - // After reorg, tx retains its protection status (protection is managed by prepareForSlot) + // Mining supersedes protection and clears its entry, so a later reorg un-mines the tx back to + // pending (not protected). Event-driven release then handles it like any other pending tx. await pool.handlePrunedBlocks(block0Id); expectNoCallbacks(); // State transition only - expect(await pool.getTxStatus(tx.getTxHash())).toBe('protected'); + expect(await pool.getTxStatus(tx.getTxHash())).toBe('pending'); }); it('N/A -> protected -> mined -> deleted (req/resp flow)', async () => { diff --git a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.ts b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.ts index f3ab87678659..9fc60e7c2c3b 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2.ts @@ -110,6 +110,10 @@ export class AztecKVTxPoolV2 extends (EventEmitter as new () => TypedEventEmitte return this.#queue.put(() => this.#impl.prepareForSlot(slotNumber)); } + unprotectTxs(txHashes: TxHash[], slotNumber: SlotNumber): Promise { + return this.#queue.put(() => this.#impl.unprotectTxs(txHashes, slotNumber)); + } + handlePrunedBlocks(latestBlock: L2BlockId, options?: { deleteAllTxs?: boolean }): Promise { return this.#queue.put(() => this.#impl.handlePrunedBlocks(latestBlock, options)); } diff --git a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2_impl.ts b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2_impl.ts index 90053a0fb3e6..8cfb4a7c24ab 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2_impl.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2_impl.ts @@ -585,29 +585,62 @@ export class TxPoolV2Impl { } this.#log.info(`Preparing for slot ${slotNumber}: unprotecting ${txsToRestore.length} txs`); + await this.#restoreUnprotectedToPending(txsToRestore, 'during prepareForSlot'); + }); + } - // Step 4: Validate for pending pool - const { valid, invalid } = await this.#revalidateMetadata(txsToRestore, 'during prepareForSlot'); + async unprotectTxs(txHashes: TxHash[], slotNumber: SlotNumber): Promise { + const hashStrs = txHashes.map(h => h.toString()); - // Step 5: Resolve nullifier conflicts and add winners to pending indices - const { added, toEvict } = this.#applyNullifierConflictResolution(valid); + await this.#store.transactionAsync(async () => { + // Only release entries still recorded at this exact slot. A tx that another, still-live proposal + // raised to a higher slot via updateProtection must keep that protection. Mined txs have no + // protection entry and so are never matched here. + const matching = this.#indices.findProtectedTxsAtSlot(hashStrs, slotNumber); + if (matching.length === 0) { + this.#log.debug(`Unprotecting txs for slot ${slotNumber}: no matching protections to release`); + return; + } - // Step 6: Delete invalid txs and evict conflict losers - await this.#deleteTxsBatch(invalid); - await this.#evictTxs(toEvict, 'NullifierConflict'); + this.#indices.clearProtection(matching); - // Step 7: Run eviction rules (enforce pool size limit) - if (added.length > 0) { - const feePayers = added.map(meta => meta.feePayer); - const uniqueFeePayers = new Set(feePayers); - await this.#evictionManager.evictAfterNewTxs( - added.map(m => m.txHash), - [...uniqueFeePayers], - ); + const txsToRestore = this.#indices.filterRestorable(matching); + if (txsToRestore.length === 0) { + return; } + + this.#log.info(`Unprotecting ${txsToRestore.length} txs from failed proposal at slot ${slotNumber}`); + await this.#restoreUnprotectedToPending(txsToRestore, 'during unprotectTxs'); }); } + /** + * Returns just-unprotected txs to the pending pool: revalidates them, resolves nullifier + * conflicts, deletes losers, then enforces the pool size limit. Must run inside a store + * transaction; callers clear the protection entries before invoking. + */ + async #restoreUnprotectedToPending(txsToRestore: TxMetaData[], context: string): Promise { + // Step 1: Validate for pending pool + const { valid, invalid } = await this.#revalidateMetadata(txsToRestore, context); + + // Step 2: Resolve nullifier conflicts and add winners to pending indices + const { added, toEvict } = this.#applyNullifierConflictResolution(valid); + + // Step 3: Delete invalid txs and evict conflict losers + await this.#deleteTxsBatch(invalid); + await this.#evictTxs(toEvict, 'NullifierConflict'); + + // Step 4: Run eviction rules (enforce pool size limit) + if (added.length > 0) { + const feePayers = added.map(meta => meta.feePayer); + const uniqueFeePayers = new Set(feePayers); + await this.#evictionManager.evictAfterNewTxs( + added.map(m => m.txHash), + [...uniqueFeePayers], + ); + } + } + async handlePrunedBlocks(latestBlock: L2BlockId, options?: { deleteAllTxs?: boolean }): Promise { // Step 1: Find transactions mined after the prune point const txsToUnmine = this.#indices.findTxsMinedAfter(latestBlock.number); diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts index 276debeeba3e..b688db763ad7 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.test.ts @@ -586,7 +586,7 @@ describe('LibP2PService', () => { mockEpochCache, ); - blockReceivedCallback = jest.fn().mockImplementation(() => Promise.resolve(true)); + blockReceivedCallback = jest.fn().mockImplementation(() => Promise.resolve(true)); duplicateProposalCallback = jest.fn(); service.registerBlockReceivedCallback(blockReceivedCallback as any); service.registerDuplicateProposalCallback(duplicateProposalCallback); @@ -785,6 +785,29 @@ describe('LibP2PService', () => { expect(reportMessageValidationResultSpy).toHaveBeenCalledWith('msg-1', MOCK_PEER_ID, TopicValidatorResult.Reject); }); + it('local validation failure releases the protections it created', async () => { + const header = makeBlockHeader(1, { slotNumber: targetSlot }); + const proposal = await makeBlockProposal({ signer, blockHeader: header }); + blockReceivedCallback.mockImplementationOnce(() => Promise.resolve(false)); + + await service.processBlockFromPeer(proposal.toBuffer(), 'msg-1', mockPeerId); + + expect(mockTxPool.protectTxs).toHaveBeenCalledTimes(1); + // The failed proposal releases exactly the txs it protected, keyed to its slot. + expect(mockTxPool.unprotectTxs).toHaveBeenCalledTimes(1); + expect(mockTxPool.unprotectTxs).toHaveBeenCalledWith(proposal.txHashes, targetSlot); + }); + + it('successful local validation does not release protections', async () => { + const header = makeBlockHeader(1, { slotNumber: targetSlot }); + const proposal = await makeBlockProposal({ signer, blockHeader: header }); + + await service.processBlockFromPeer(proposal.toBuffer(), 'msg-1', mockPeerId); + + expect(mockTxPool.protectTxs).toHaveBeenCalledTimes(1); + expect(mockTxPool.unprotectTxs).not.toHaveBeenCalled(); + }); + // Regression for A-1013: payloads sharing (slot, position, archive) but differing on another // signed field (e.g. inHash) used to dedup by archive only and silently drop the second one. // The pool now dedups by signed-payload hash, so the equivocation surfaces. diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index 148a6e3a5b2b..fe7b5249fbc1 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -1363,6 +1363,9 @@ export class LibP2PService extends WithTracer implements P2PService { const isValid = await this.blockReceivedCallback(block, sender); if (!isValid) { this.logger.info(`Block proposal validation failed for block ${block.blockNumber}`, block.toBlockInfo()); + // Release the protections this proposal created so its txs return to pending. Only entries still + // keyed to this slot are cleared, so a tx referenced by a live proposal at another slot stays protected. + await this.mempools.txPool.unprotectTxs(block.txHashes, slot); } } diff --git a/yarn-project/p2p/src/test-helpers/testbench-utils.ts b/yarn-project/p2p/src/test-helpers/testbench-utils.ts index a9671f1b78e9..26fb73ff3b89 100644 --- a/yarn-project/p2p/src/test-helpers/testbench-utils.ts +++ b/yarn-project/p2p/src/test-helpers/testbench-utils.ts @@ -119,6 +119,10 @@ export class InMemoryTxPool extends EventEmitter implements TxPoolV2 { return Promise.resolve(); } + unprotectTxs(_txHashes: TxHash[], _slotNumber: SlotNumber): Promise { + return Promise.resolve(); + } + handlePrunedBlocks(_latestBlock: L2BlockId, _options?: { deleteAllTxs?: boolean }): Promise { return Promise.resolve(); }