Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ export type EnvVar =
| 'P2P_BATCH_TX_REQUESTER_TX_BATCH_SIZE'
| 'P2P_BATCH_TX_REQUESTER_BAD_PEER_THRESHOLD'
| 'P2P_BLOCK_CHECK_INTERVAL_MS'
| 'P2P_SLOT_CHECK_INTERVAL_MS'
| 'P2P_BLOCK_REQUEST_BATCH_SIZE'
| 'P2P_BOOTSTRAP_NODE_ENR_VERSION_CHECK'
| 'P2P_BOOTSTRAP_NODES_AS_FULL_PEERS'
Expand Down
37 changes: 37 additions & 0 deletions yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,43 @@ describe('P2P Client', () => {
expect(await client.getSyncedLatestBlockNum()).toEqual(102);
});

it('prepares the pool for the last synced block slot after marking txs mined', async () => {
await client.start();

blockSource.addProposedBlocks([
await L2Block.random(BlockNumber(101), { slotNumber: SlotNumber(150) }),
await L2Block.random(BlockNumber(102), { slotNumber: SlotNumber(151) }),
]);
await client.sync();

// Release is driven by the synced block slot, not the wall clock: prepareForSlot is ultimately
// driven with the last synced block's slot (regardless of how the stream batches the blocks),
// never reads the epoch cache, and never targets a slot beyond what has synced.
expect(txPool.prepareForSlot).toHaveBeenLastCalledWith(SlotNumber(151));
const preparedSlots = txPool.prepareForSlot.mock.calls.map(([slot]) => Number(slot));
expect(Math.max(...preparedSlots)).toBe(151);
expect(epochCache.getCurrentAndNextSlot).not.toHaveBeenCalled();
// Mined-marking runs before the matching-slot release within the handler.
expect(txPool.handleMinedBlock.mock.invocationCallOrder.at(-1)!).toBeLessThan(
txPool.prepareForSlot.mock.invocationCallOrder.at(-1)!,
);
});

it('does not re-prepare for a slot that does not advance', async () => {
await client.start();

blockSource.addProposedBlocks([await L2Block.random(BlockNumber(101), { slotNumber: SlotNumber(150) })]);
await client.sync();
const callsAfterFirst = txPool.prepareForSlot.mock.calls.length;
expect(txPool.prepareForSlot).toHaveBeenLastCalledWith(SlotNumber(150));

// A later block at an earlier slot must not advance the prepared-for slot
blockSource.addProposedBlocks([await L2Block.random(BlockNumber(102), { slotNumber: SlotNumber(149) })]);
await client.sync();
expect(txPool.prepareForSlot.mock.calls.length).toBe(callsAfterFirst);
expect(txPool.prepareForSlot).not.toHaveBeenCalledWith(SlotNumber(149));
});

it('handles proven and finalized chain behind starting point', async () => {
blockSource.setProvenBlockNumber(0);
blockSource.setFinalizedBlockNumber(0);
Expand Down
41 changes: 10 additions & 31 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
SlotNumber,
} from '@aztec/foundation/branded-types';
import { createLogger } from '@aztec/foundation/log';
import { RunningPromise } from '@aztec/foundation/promise';
import { DateProvider } from '@aztec/foundation/timer';
import type { AztecAsyncKVStore, AztecAsyncSingleton } from '@aztec/kv-store';
import { L2TipsKVStore } from '@aztec/kv-store/stores';
Expand Down Expand Up @@ -83,9 +82,6 @@ export class P2PClient extends WithTracer implements P2P {
/** Tracks the last slot for which we called prepareForSlot */
private lastSlotProcessed: SlotNumber = SlotNumber.ZERO;

/** Polls for slot changes and calls prepareForSlot on the tx pool */
private slotMonitor: RunningPromise | undefined;

constructor(
private store: AztecAsyncKVStore,
private l2BlockSource: L2BlockSource & ContractDataSource,
Expand Down Expand Up @@ -268,14 +264,6 @@ export class P2PClient extends WithTracer implements P2P {
this.blockStream.start();
this.txFileStore?.start();

// Start slot monitor to call prepareForSlot when the slot changes
this.slotMonitor = new RunningPromise(
() => this.maybeCallPrepareForSlot(),
this.log,
this.config.slotCheckIntervalMS,
);
this.slotMonitor.start();

return this.syncPromise;
}

Expand All @@ -302,8 +290,6 @@ export class P2PClient extends WithTracer implements P2P {
*/
public async stop() {
this.log.debug('Stopping p2p client...');
await this.slotMonitor?.stop();
this.log.debug('Stopped slot monitor');
await tryStop(this.txCollection);
this.log.debug('Stopped tx collection service');
await this.txFileStore?.stop();
Expand Down Expand Up @@ -642,11 +628,16 @@ export class P2PClient extends WithTracer implements P2P {
return;
}

const lastBlock = blocks.at(-1)!;
const lastSlot = lastBlock.header.getSlot();

// Mark txs mined before releasing protections: a block landing at this slot supersedes any
// protection for txs it includes, so mined-marking must run first to keep just-landed txs from
// being unprotected into pending where they could be evicted before they are recorded as mined.
await this.handleMinedBlocks(blocks);
await this.maybeCallPrepareForSlot();
await this.maybeCallPrepareForSlot(lastSlot);
await this.collectingMissingTxs(blocks);
const lastBlock = blocks.at(-1)!;
await this.synchedLatestSlot.set(BigInt(lastBlock.header.getSlot()));
await this.synchedLatestSlot.set(BigInt(lastSlot));
}

/** Request txs for unproven blocks so the prover node can prove. */
Expand Down Expand Up @@ -741,20 +732,8 @@ export class P2PClient extends WithTracer implements P2P {
return isEpochPrune;
}

/** Checks if the slot has changed and calls prepareForSlot if so. */
private async maybeCallPrepareForSlot(): Promise<void> {
// If we have a proposed checkpoint available, we want to prepare the target slot - otherwise we prepare the current slot
const l2Tips = await this.l2Tips.getL2Tips();
const hasProposedCheckpoint = l2Tips.proposedCheckpoint.checkpoint.number > l2Tips.checkpointed.checkpoint.number;

let slot;
if (hasProposedCheckpoint) {
const { targetSlot } = this.epochCache.getTargetAndNextSlot();
slot = targetSlot;
} else {
const { currentSlot } = this.epochCache.getCurrentAndNextSlot();
slot = currentSlot;
}
/** Calls prepareForSlot for the given slot if it advances past the last slot we prepared for. */
private async maybeCallPrepareForSlot(slot: SlotNumber): Promise<void> {
if (slot <= this.lastSlotProcessed) {
return;
}
Expand Down
8 changes: 0 additions & 8 deletions yarn-project/p2p/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ export interface P2PConfig
/** The frequency in which to check for new L2 blocks. */
blockCheckIntervalMS: number;

/** The frequency in which to check for new L2 slots. */
slotCheckIntervalMS: number;

/** The number of blocks to fetch in a single batch. */
blockRequestBatchSize: number;

Expand Down Expand Up @@ -317,11 +314,6 @@ export const p2pConfigMappings: ConfigMappingsType<P2PConfig> = {
description: 'The frequency in which to check for new L2 blocks.',
...numberConfigHelper(100),
},
slotCheckIntervalMS: {
env: 'P2P_SLOT_CHECK_INTERVAL_MS',
description: 'The frequency in which to check for new L2 slots.',
...numberConfigHelper(1000),
},
debugDisableColocationPenalty: {
env: 'DEBUG_P2P_DISABLE_COLOCATION_PENALTY',
description: 'DEBUG: Disable colocation penalty - NEVER set to true in production',
Expand Down
11 changes: 11 additions & 0 deletions yarn-project/p2p/src/mem_pools/tx_pool_v2/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ export interface TxPoolV2 extends TypedEventEmitter<TxPoolV2Events> {
*/
prepareForSlot(slotNumber: SlotNumber): Promise<void>;

/**
* Releases the protections a failed block proposal created and restores the txs to pending.
* Only clears protection entries still recorded at exactly the given slot: a tx that another,
* still-live proposal raised to a higher slot via {@link protectTxs} keeps its protection, and
* mined txs (which carry no protection entry) are left untouched. Restored txs are re-validated
* and resolved against nullifier conflicts before re-entering the pending indices.
* @param txHashes - Hashes of the proposal's txs to release.
* @param slotNumber - The slot the failed proposal targeted; protection is released only for this slot.
*/
unprotectTxs(txHashes: TxHash[], slotNumber: SlotNumber): Promise<void>;

/**
* Handles pruned blocks during a reorg.
* Un-mines all transactions mined in blocks beyond the given latest block
Expand Down
12 changes: 12 additions & 0 deletions yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_indices.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ export class TxPoolIndices {
meta.minedL2BlockId = blockId;
// Safe to call unconditionally - removeFromPendingIndices is idempotent
this.#removeFromPendingIndices(meta);
// A mined tx supersedes any protection: drop the stale entry so it can't linger in the map and
// be matched by later protection scans.
this.#protectedTransactions.delete(meta.txHash);
}

/** Clears the mined status from a transaction */
Expand Down Expand Up @@ -316,6 +319,15 @@ export class TxPoolIndices {
return result;
}

/**
* From the given hashes, returns those whose protection is recorded at exactly the given slot.
* Used to release the protections a single block proposal created without disturbing entries a
* later proposal raised to a higher slot via updateProtection.
*/
findProtectedTxsAtSlot(txHashes: string[], slotNumber: SlotNumber): string[] {
return txHashes.filter(txHash => this.#protectedTransactions.get(txHash) === slotNumber);
}

/** Filters out transactions that are currently protected */
filterUnprotected(txs: TxMetaData[]): TxMetaData[] {
return txs.filter(meta => !this.#protectedTransactions.has(meta.txHash));
Expand Down
Loading
Loading