Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions yarn-project/aztec-node/src/sentinel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ The sentinel is one of several watchers registered with the slasher; it does not
| Source | What it provides |
|---|---|
| `EpochCache` | Slot/epoch helpers, committee + proposer for a slot, escape-hatch state |
| `L2BlockSource` (archiver) | Synced slot, `chain-checkpointed` events, block headers |
| `L2BlockSource` (archiver) | Synced slot, `getCheckpoint({ slot })`, `getL2Tips()`, block headers |
| `P2PClient` | `getCheckpointAttestationsForSlot(slot, payloadHash)`, `hasBlockProposalsForSlot(slot)` |
| `CheckpointReexecutionTracker` | Local re-execution outcome for the proposal at each slot (`valid` / `invalid` / `unvalidated`) — populated by the validator client's `ProposalHandler` |
| L1-checkpointed events | `chain-checkpointed` populates `slotNumberToCheckpoint` with the canonical attestor set |
| L1-confirmed checkpoints | Fetched on demand per slot via `archiver.getCheckpoint({ slot })`, yielding the canonical attestor set |

## Two cadences

Expand Down Expand Up @@ -49,7 +49,7 @@ For each slot, the proposer is assigned one of six statuses, ranked highest-conf

| # | Status | Trigger | Inactive party |
|---|---|---|---|
| 6 | `checkpoint-mined` | `slotNumberToCheckpoint.has(slot)` (a checkpoint covering this slot has landed on L1) | Attestors who didn't attest |
| 6 | `checkpoint-mined` | `archiver.getCheckpoint({ slot })` returns a checkpoint (one covering this slot has landed on L1) | Attestors who didn't attest |
| 5 | `checkpoint-valid` | `tracker.getOutcomeForSlot(slot) === 'valid'` | Attestors who didn't attest |
| 4 | `checkpoint-invalid` | `tracker.getOutcomeForSlot(slot) === 'invalid'` (re-executed and rejected) | Proposer |
| 3 | `checkpoint-unvalidated` | `tracker.getOutcomeForSlot(slot) === 'unvalidated'` (validation aborted: missing data, timeout, etc.) | Proposer |
Expand Down
43 changes: 17 additions & 26 deletions yarn-project/aztec-node/src/sentinel/sentinel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {
GENESIS_BLOCK_HEADER_HASH,
L2Block,
type L2BlockSource,
type L2BlockStream,
getAttestationInfoFromPublishedCheckpoint,
} from '@aztec/stdlib/block';
import {
Expand Down Expand Up @@ -44,7 +43,6 @@ describe('sentinel', () => {
let epochCache: MockProxy<EpochCache>;
let archiver: MockProxy<L2BlockSource>;
let p2p: MockProxy<P2PClient>;
let blockStream: MockProxy<L2BlockStream>;
let reexecutionTracker: CheckpointReexecutionTracker;

let kvStore: AztecLMDBStoreV2;
Expand All @@ -70,7 +68,6 @@ describe('sentinel', () => {
archiver = mock<L2BlockSource>();
archiver.getGenesisBlockHash.mockReturnValue(GENESIS_BLOCK_HEADER_HASH);
p2p = mock<P2PClient>();
blockStream = mock<L2BlockStream>();
reexecutionTracker = new CheckpointReexecutionTracker();

kvStore = await openTmpStore('sentinel-test');
Expand Down Expand Up @@ -100,7 +97,7 @@ describe('sentinel', () => {
epochCache.getEpochNow.mockReturnValue(epoch);
epochCache.getL1Constants.mockReturnValue(l1Constants);

sentinel = new TestSentinel(epochCache, archiver, p2p, store, reexecutionTracker, config, blockStream);
sentinel = new TestSentinel(epochCache, archiver, p2p, store, reexecutionTracker, config);
});

afterEach(async () => {
Expand Down Expand Up @@ -142,16 +139,22 @@ describe('sentinel', () => {
let proposer: EthAddress;
let committee: EthAddress[];

/** Helper to create and emit a chain-checkpointed event */
const emitCheckpointEvent = async (checkpoint: Checkpoint, checkpointAttestations: CommitteeAttestation[] = []) => {
/**
* Stubs the archiver so the slot's on-demand `getCheckpoint({ slot })` lookup returns this checkpoint, mirroring
* a checkpoint that has landed on L1 covering the slot.
*/
const mineCheckpointForSlot = (checkpoint: Checkpoint, checkpointAttestations: CommitteeAttestation[] = []) => {
const published = new PublishedCheckpoint(checkpoint, L1PublishedData.random(), checkpointAttestations);
const lastBlock = checkpoint.blocks.at(-1)!;
const block = { number: lastBlock.number, hash: (await lastBlock.hash()).toString() };
await sentinel.handleBlockStreamEvent({ type: 'chain-checkpointed', checkpoint: published, block });
const checkpointSlot = checkpoint.header.slotNumber;
archiver.getCheckpoint.mockImplementation(query =>
Promise.resolve('slot' in query && query.slot === checkpointSlot ? published : undefined),
);
return published;
};

beforeEach(async () => {
// No L1 checkpoint by default; individual tests mine one via mineCheckpointForSlot.
archiver.getCheckpoint.mockResolvedValue(undefined);
signers = times(4, Secp256k1Signer.random);
validators = signers.map(signer => signer.address);
block = await L2Block.random(BlockNumber(1), { slotNumber: slot });
Expand All @@ -165,7 +168,7 @@ describe('sentinel', () => {
it('flags checkpoint as mined when L1 has it (case 6)', async () => {
// Create a checkpoint with a block at the target slot and emit chain-checkpointed event
const checkpoint = await Checkpoint.random(CheckpointNumber(1), { numBlocks: 1, slotNumber: slot });
await emitCheckpointEvent(checkpoint);
mineCheckpointForSlot(checkpoint);

const activity = await sentinel.getSlotActivity(slot, epoch, proposer, committee);
expect(activity[proposer.toString()]).toEqual('checkpoint-mined');
Expand Down Expand Up @@ -208,7 +211,7 @@ describe('sentinel', () => {
it('prefers L1-mined over tracker outcome', async () => {
reexecutionTracker.recordOutcome(slot, block.archive.root, 'invalid', CheckpointNumber(1));
const checkpoint = await Checkpoint.random(CheckpointNumber(1), { numBlocks: 1, slotNumber: slot });
await emitCheckpointEvent(checkpoint);
mineCheckpointForSlot(checkpoint);

const activity = await sentinel.getSlotActivity(slot, epoch, proposer, committee);
expect(activity[proposer.toString()]).toEqual('checkpoint-mined');
Expand All @@ -224,7 +227,7 @@ describe('sentinel', () => {
});

// Emit the chain-checkpointed event with attestations from signers 0 and 1
publishedCheckpoint = await emitCheckpointEvent(checkpoint, checkpointAttestations);
publishedCheckpoint = mineCheckpointForSlot(checkpoint, checkpointAttestations);

const attestorsFromCheckpoint = compactArray(
getAttestationInfoFromPublishedCheckpoint(publishedCheckpoint, TEST_COORDINATION_SIGNATURE_CONTEXT).map(info =>
Expand Down Expand Up @@ -267,7 +270,7 @@ describe('sentinel', () => {

// Emit chain-checkpointed event with both signed and placeholder attestations
// The Sentinel should only count the recovered-from-signature ones
publishedCheckpoint = await emitCheckpointEvent(checkpoint, allAttestations);
publishedCheckpoint = mineCheckpointForSlot(checkpoint, allAttestations);

// Verify that getAttestationInfoFromPublishedCheckpoint returns 4 entries total:
// - 2 with status 'recovered-from-signature' (actual attestations with valid signatures)
Expand Down Expand Up @@ -299,7 +302,7 @@ describe('sentinel', () => {
it('identifies missed attestors if block is mined', async () => {
// Create checkpoint with a block at the target slot
const checkpoint = await Checkpoint.random(CheckpointNumber(1), { numBlocks: 1, slotNumber: slot });
await emitCheckpointEvent(checkpoint);
mineCheckpointForSlot(checkpoint);

// P2P provides attestations from validators 0, 1, 2 (not validator 3)
p2p.getCheckpointAttestationsForSlot.mockResolvedValue(attestations.slice(0, -1));
Expand Down Expand Up @@ -1030,18 +1033,6 @@ describe('sentinel', () => {
});

class TestSentinel extends Sentinel {
constructor(
epochCache: EpochCache,
archiver: L2BlockSource,
p2p: P2PClient,
store: SentinelStore,
reexecutionTracker: CheckpointReexecutionTracker,
config: SentinelRuntimeConfig,
protected override blockStream: L2BlockStream,
) {
super(epochCache, archiver, p2p, store, reexecutionTracker, config);
}

public override init() {
this.initialSlot = this.epochCache.getEpochAndSlotNow().slot;
return Promise.resolve();
Expand Down
106 changes: 34 additions & 72 deletions yarn-project/aztec-node/src/sentinel/sentinel.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
import type { EpochCache } from '@aztec/epoch-cache';
import {
BlockNumber,
CheckpointNumber,
CheckpointProposalHash,
EpochNumber,
SlotNumber,
} from '@aztec/foundation/branded-types';
import { CheckpointNumber, CheckpointProposalHash, EpochNumber, SlotNumber } from '@aztec/foundation/branded-types';
import { countWhile, filterAsync, fromEntries, getEntries, mapValues } from '@aztec/foundation/collection';
import { EthAddress } from '@aztec/foundation/eth-address';
import { createLogger } from '@aztec/foundation/log';
import { RunningPromise } from '@aztec/foundation/running-promise';
import { L2TipsMemoryStore, type L2TipsStore } from '@aztec/kv-store/stores';
import type { P2PClient } from '@aztec/p2p';
import {
OffenseType,
Expand All @@ -21,13 +14,7 @@ import {
getOffenseTypeName,
} from '@aztec/slasher';
import type { SlasherConfig } from '@aztec/slasher/config';
import {
type L2BlockSource,
L2BlockStream,
type L2BlockStreamEvent,
type L2BlockStreamEventHandler,
getAttestationInfoFromPublishedCheckpoint,
} from '@aztec/stdlib/block';
import { type L2BlockSource, getAttestationInfoFromPublishedCheckpoint } from '@aztec/stdlib/block';
import type { CheckpointReexecutionTracker } from '@aztec/stdlib/checkpoint';
import type { ChainConfig } from '@aztec/stdlib/config';
import { getEpochAtSlot, getSlotRangeForEpoch, getTimestampForSlot } from '@aztec/stdlib/epoch-helpers';
Expand Down Expand Up @@ -97,7 +84,7 @@ function statusToCategory(status: ValidatorStatusInSlot): ValidatorStatusType {
* first:
*
* - `checkpoint-mined` — a checkpoint covering this slot has landed on L1
* (`slotNumberToCheckpoint` populated from `chain-checkpointed`).
* (fetched on demand via `archiver.getCheckpoint({ slot })`).
* - `checkpoint-valid` — the local node re-executed a checkpoint proposal for this slot
* successfully (consulted via `CheckpointReexecutionTracker`).
* - `checkpoint-invalid` — the local node re-executed a checkpoint proposal for this slot
Expand Down Expand Up @@ -135,25 +122,13 @@ function statusToCategory(status: ValidatorStatusInSlot): ValidatorStatusType {
* (no history entries for that slot) and per-epoch evaluation writes an empty performance map
* (no slashing).
*/
export class Sentinel extends (EventEmitter as new () => WatcherEmitter) implements L2BlockStreamEventHandler, Watcher {
export class Sentinel extends (EventEmitter as new () => WatcherEmitter) implements Watcher {
protected runningPromise: RunningPromise;
protected blockStream!: L2BlockStream;
protected l2TipsStore: L2TipsStore;

protected initialSlot: SlotNumber | undefined;
protected lastProcessedSlot: SlotNumber | undefined;
/** Largest epoch number for which the end-of-epoch aggregator has run. */
protected lastEvaluatedEpoch: EpochNumber | undefined;
protected slotNumberToCheckpoint: Map<
SlotNumber,
{
checkpointNumber: CheckpointNumber;
archive: string;
/** Hex keccak256 of the consensus payload bytes; used to fetch matching p2p attestations. */
proposalPayloadHash: CheckpointProposalHash;
attestors: EthAddress[];
}
> = new Map();

constructor(
protected epochCache: EpochCache,
Expand All @@ -165,7 +140,6 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme
protected logger = createLogger('node:sentinel'),
) {
super();
this.l2TipsStore = new L2TipsMemoryStore(archiver.getGenesisBlockHash());
const interval = (epochCache.getL1Constants().ethereumSlotDuration * 1000) / 4;
this.runningPromise = new RunningPromise(this.work.bind(this), logger, interval);
}
Expand All @@ -187,17 +161,14 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme
}

/**
* Loads initial slot and initializes blockstream. We will not process anything at or before
* the initial slot. Floors at the archiver's synced L2 slot so the sentinel keeps making
* forward progress when L1 is advancing but L2 has no activity (the synced slot is driven by
* L1 sync, not by L2 blocks). Falls back to the wallclock if the archiver isn't ready yet
* (cold start).
* Loads the initial slot. We will not process anything at or before the initial slot. Floors at the
* archiver's synced L2 slot so the sentinel keeps making forward progress when L1 is advancing but L2 has no
* activity (the synced slot is driven by L1 sync, not by L2 blocks). Falls back to the wallclock if the
* archiver isn't ready yet (cold start).
*/
protected async init() {
this.initialSlot = await this.getCurrentSlot();
const startingBlock = BlockNumber(await this.archiver.getBlockNumber());
this.logger.info(`Starting validator sentinel with initial slot ${this.initialSlot} and block ${startingBlock}`);
this.blockStream = new L2BlockStream(this.archiver, this.l2TipsStore, this, this.logger, { startingBlock });
this.logger.info(`Starting validator sentinel with initial slot ${this.initialSlot}`);
}

/**
Expand All @@ -215,44 +186,38 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme
return this.runningPromise.stop();
}

public async handleBlockStreamEvent(event: L2BlockStreamEvent): Promise<void> {
await this.l2TipsStore.handleBlockStreamEvent(event);
if (event.type === 'chain-checkpointed') {
this.handleCheckpoint(event);
}
}

protected handleCheckpoint(event: L2BlockStreamEvent) {
if (event.type !== 'chain-checkpointed') {
return;
/**
* Fetches the L1-confirmed checkpoint covering a slot (if any) and derives the slot-level data the
* activity classifier needs: the checkpoint number, archive root, consensus payload hash (used to fetch
* matching p2p attestations regardless of feeAssetPriceModifier variants), and the recovered attestor set.
* Reads on demand so the result is always against the canonical chain — a reorged-out checkpoint simply
* stops being returned, with no stale mapping to clean up.
*/
protected async getCheckpointForSlot(slot: SlotNumber): Promise<
| {
checkpointNumber: CheckpointNumber;
archive: string;
proposalPayloadHash: CheckpointProposalHash;
attestors: EthAddress[];
}
| undefined
> {
const checkpoint = await this.archiver.getCheckpoint({ slot });
if (!checkpoint) {
return undefined;
}
const checkpoint = event.checkpoint;

// Store mapping from slot to archive, checkpoint number, attestors, and the consensus payload
// hash (used to query matching p2p attestations regardless of feeAssetPriceModifier variants).
const signatureContext = this.getSignatureContext();
const proposalPayloadHash = CheckpointProposalHash.fromBuffer(
ConsensusPayload.fromCheckpoint(checkpoint.checkpoint, signatureContext).getPayloadHash(),
);
this.slotNumberToCheckpoint.set(checkpoint.checkpoint.header.slotNumber, {
return {
checkpointNumber: checkpoint.checkpoint.number,
archive: checkpoint.checkpoint.archive.root.toString(),
proposalPayloadHash,
attestors: getAttestationInfoFromPublishedCheckpoint(checkpoint, signatureContext)
.filter(a => a.status === 'recovered-from-signature')
.map(a => a.address!),
});

// Prune the archive map to only keep at most N entries
const historyLength = this.store.getHistoryLength();
if (this.slotNumberToCheckpoint.size > historyLength) {
const toDelete = Array.from(this.slotNumberToCheckpoint.keys())
.sort((a, b) => Number(a - b))
.slice(0, this.slotNumberToCheckpoint.size - historyLength);
for (const key of toDelete) {
this.slotNumberToCheckpoint.delete(key);
}
}
};
}

/**
Expand Down Expand Up @@ -387,10 +352,6 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme
public async work() {
const currentSlot = await this.getCurrentSlot();
try {
// Manually sync the block stream to ensure we have the latest data.
// Note we never `start` the blockstream, so it loops at the same pace as we do.
await this.blockStream.sync();

// Per-slot activity recording (lag = 2 slots for P2P attestation settlement).
const targetSlot = await this.isReadyToProcess(currentSlot);
if (targetSlot !== false) {
Expand Down Expand Up @@ -478,7 +439,7 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme
return false;
}

const archiverLastBlockHash = await this.l2TipsStore.getL2Tips().then(tip => tip.proposed.hash);
const archiverLastBlockHash = await this.archiver.getL2Tips().then(tip => tip.proposed.hash);
const p2pLastBlockHash = await this.p2p.getL2Tips().then(tips => tips.proposed.hash);
const isP2pSynced = archiverLastBlockHash === p2pLastBlockHash;
if (!isP2pSynced) {
Expand Down Expand Up @@ -534,8 +495,9 @@ export class Sentinel extends (EventEmitter as new () => WatcherEmitter) impleme
this.logger.debug(`Computing stats for slot ${slot} at epoch ${epoch}`, { slot, epoch, proposer, committee });

// Gather attestors from both p2p (live attestations) and the archiver (signers on the
// checkpoint if one has landed on L1). Used regardless of which case applies.
const checkpoint = this.slotNumberToCheckpoint.get(slot);
// checkpoint if one has landed on L1). Fetched on demand so it always reflects the canonical chain.
// Used regardless of which case applies.
const checkpoint = await this.getCheckpointForSlot(slot);
const p2pAttested = await this.p2p.getCheckpointAttestationsForSlot(slot, checkpoint?.proposalPayloadHash);
const p2pAttestors = p2pAttested.map(a => a.getSender()).filter((s): s is EthAddress => s !== undefined);
const attestors = new Set(
Expand Down
Loading
Loading