diff --git a/Cargo.lock b/Cargo.lock index bca7477a4..25659a7bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1271,6 +1271,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "verification", ] [[package]] @@ -13818,6 +13819,7 @@ dependencies = [ "alloy", "amp-client", "anyhow", + "arrow", "arrow-flight", "clap", "futures", @@ -13825,6 +13827,7 @@ dependencies = [ "reqwest 0.13.2", "serde", "serde_json", + "thiserror 2.0.18", "tokio", "tonic", "typed-arrow", diff --git a/crates/bin/ampctl/src/cmd/dataset/deploy.rs b/crates/bin/ampctl/src/cmd/dataset/deploy.rs index 7bee64011..fb766ff5d 100644 --- a/crates/bin/ampctl/src/cmd/dataset/deploy.rs +++ b/crates/bin/ampctl/src/cmd/dataset/deploy.rs @@ -64,6 +64,16 @@ pub struct Args { /// The worker must be active (has sent heartbeats recently) for the deployment to succeed. #[arg(long, value_parser = clap::value_parser!(NodeSelector))] pub worker_id: Option, + + /// Enable cryptographic verification of EVM block data during extraction + /// + /// When enabled, verifies block hashes, transaction roots, and receipt roots + /// before writing data to storage. Only applicable to EVM raw datasets. + /// Verification failures are retryable errors. + /// + /// Defaults to false if not specified. + #[arg(long)] + pub verify: bool, } /// Deploy a dataset to start syncing blockchain data. @@ -81,6 +91,7 @@ pub async fn run( end_block, parallelism, worker_id, + verify, }: Args, ) -> Result<(), Error> { tracing::debug!( @@ -88,10 +99,19 @@ pub async fn run( ?end_block, %parallelism, ?worker_id, + %verify, "Deploying dataset" ); - let job_id = deploy_dataset(&global, &dataset_ref, end_block, parallelism, worker_id).await?; + let job_id = deploy_dataset( + &global, + &dataset_ref, + end_block, + parallelism, + worker_id, + verify, + ) + .await?; let result = DeployResult { job_id }; global.print(&result).map_err(Error::JsonSerialization)?; @@ -102,18 +122,19 @@ pub async fn run( /// /// POSTs to the versioned `/datasets/{namespace}/{name}/versions/{version}/deploy` endpoint /// and returns the job ID. -#[tracing::instrument(skip_all, fields(%dataset_ref, ?end_block, %parallelism, ?worker_id))] +#[tracing::instrument(skip_all, fields(%dataset_ref, ?end_block, %parallelism, ?worker_id, %verify))] async fn deploy_dataset( global: &GlobalArgs, dataset_ref: &Reference, end_block: Option, parallelism: u16, worker_id: Option, + verify: bool, ) -> Result { let client = global.build_client().map_err(Error::ClientBuild)?; let job_id = client .datasets() - .deploy(dataset_ref, end_block, parallelism, worker_id) + .deploy(dataset_ref, end_block, parallelism, worker_id, verify) .await .map_err(Error::Deploy)?; diff --git a/crates/bin/ampctl/src/cmd/dataset/deploy__after_help.md b/crates/bin/ampctl/src/cmd/dataset/deploy__after_help.md index a1555eef0..22bd34bd3 100644 --- a/crates/bin/ampctl/src/cmd/dataset/deploy__after_help.md +++ b/crates/bin/ampctl/src/cmd/dataset/deploy__after_help.md @@ -20,6 +20,11 @@ Deploy dataset, staying 100 blocks behind chain tip: ampctl dataset deploy my_namespace/my_dataset@1.0.0 --end-block -100 ``` +Deploy EVM dataset with cryptographic verification: +``` +ampctl dataset deploy my_namespace/my_dataset@1.0.0 --verify +``` + Use custom admin URL: ``` ampctl dataset deploy my_namespace/my_dataset@1.0.0 --admin-url http://production:1610 diff --git a/crates/clients/admin/src/datasets.rs b/crates/clients/admin/src/datasets.rs index 83eb12425..af536be30 100644 --- a/crates/clients/admin/src/datasets.rs +++ b/crates/clients/admin/src/datasets.rs @@ -268,6 +268,7 @@ impl<'a> DatasetsClient<'a> { end_block: Option, parallelism: u16, worker_id: Option, + verify: bool, ) -> Result { let namespace = dataset_ref.namespace(); let name = dataset_ref.name(); @@ -285,6 +286,7 @@ impl<'a> DatasetsClient<'a> { end_block, parallelism, worker_id, + verify, }; let response = self @@ -1283,6 +1285,8 @@ struct DeployRequest { parallelism: u16, #[serde(skip_serializing_if = "Option::is_none")] worker_id: Option, + #[serde(skip_serializing_if = "std::ops::Not::not")] + verify: bool, } /// Input type for dataset registration manifest parameter. diff --git a/crates/core/datasets-raw/src/rows.rs b/crates/core/datasets-raw/src/rows.rs index a3527f43a..2a517a667 100644 --- a/crates/core/datasets-raw/src/rows.rs +++ b/crates/core/datasets-raw/src/rows.rs @@ -35,6 +35,15 @@ impl IntoIterator for Rows { } } +impl<'a> IntoIterator for &'a Rows { + type Item = &'a TableRows; + type IntoIter = std::slice::Iter<'a, TableRows>; + + fn into_iter(self) -> Self::IntoIter { + self.0.iter() + } +} + /// A record batch associated with a single block of chain data, for populating raw datasets. pub struct TableRows { pub table: Table, diff --git a/crates/core/verification/Cargo.toml b/crates/core/verification/Cargo.toml index 0ac53f74c..f96255d10 100644 --- a/crates/core/verification/Cargo.toml +++ b/crates/core/verification/Cargo.toml @@ -8,6 +8,7 @@ license-file.workspace = true alloy.workspace = true amp-client = { path = "../../clients/flight" } anyhow.workspace = true +arrow.workspace = true arrow-flight.workspace = true clap.workspace = true futures.workspace = true @@ -15,6 +16,7 @@ indicatif.workspace = true reqwest.workspace = true serde.workspace = true serde_json.workspace = true +thiserror.workspace = true tokio.workspace = true tonic.workspace = true typed-arrow.workspace = true diff --git a/crates/core/verification/src/client.rs b/crates/core/verification/src/client.rs index 9a70ae536..0bdb80362 100644 --- a/crates/core/verification/src/client.rs +++ b/crates/core/verification/src/client.rs @@ -5,6 +5,7 @@ use alloy::primitives::{ }; use amp_client::AmpClient; use anyhow::Context as _; +use arrow::array::RecordBatch; use futures::StreamExt as _; use typed_arrow::{AsViewsIterator as _, Decimal128, List}; @@ -297,6 +298,135 @@ impl Transaction { } } +// Arrow record view types for parsing EVM data from RecordBatches +#[derive(typed_arrow::Record)] +struct BlockRecordView { + _block_num: u64, + block_num: u64, + timestamp: typed_arrow::Timestamp, + hash: [u8; 32], + parent_hash: [u8; 32], + ommers_hash: [u8; 32], + miner: [u8; 20], + state_root: [u8; 32], + transactions_root: [u8; 32], + receipt_root: [u8; 32], + logs_bloom: Vec, + difficulty: Decimal128<38, 0>, + total_difficulty: Option>, + gas_limit: u64, + gas_used: u64, + extra_data: Vec, + mix_hash: [u8; 32], + nonce: u64, + base_fee_per_gas: Option>, + withdrawals_root: Option<[u8; 32]>, + blob_gas_used: Option, + excess_blob_gas: Option, + parent_beacon_root: Option<[u8; 32]>, + requests_hash: Option<[u8; 32]>, +} + +#[derive(typed_arrow::Record)] +struct TransactionRecordView { + _block_num: u64, + block_hash: [u8; 32], + block_num: u64, + timestamp: typed_arrow::Timestamp, + tx_index: u32, + tx_hash: [u8; 32], + to: Option<[u8; 20]>, + nonce: u64, + gas_price: Option>, + gas_limit: u64, + value: String, + input: Vec, + r: [u8; 32], + s: [u8; 32], + v_parity: bool, + chain_id: Option, + gas_used: u64, + r#type: i32, + max_fee_per_gas: Option>, + max_priority_fee_per_gas: Option>, + max_fee_per_blob_gas: Option>, + from: [u8; 20], + status: bool, + state_root: Option<[u8; 32]>, + access_list: Option>, + blob_versioned_hashes: Option>, + authorization_list: Option>, +} + +#[derive(typed_arrow::Record)] +struct AccessListItemView { + address: [u8; 20], + storage_keys: List<[u8; 32]>, +} + +#[derive(typed_arrow::Record)] +struct AuthorizationTupleView { + chain_id: u64, + address: [u8; 20], + nonce: u64, + y_parity: bool, + r: [u8; 32], + s: [u8; 32], +} + +#[derive(typed_arrow::Record)] +struct LogRecordView { + _block_num: u64, + block_hash: [u8; 32], + block_num: u64, + timestamp: typed_arrow::Timestamp, + tx_hash: [u8; 32], + tx_index: u32, + log_index: u32, + address: [u8; 20], + topic0: Option<[u8; 32]>, + topic1: Option<[u8; 32]>, + topic2: Option<[u8; 32]>, + topic3: Option<[u8; 32]>, + data: Vec, +} + +/// Error converting Arrow RecordBatches to verification types. +#[derive(Debug, thiserror::Error)] +pub enum ConversionError { + /// RecordBatch schema doesn't match expected EVM table schema + #[error("schema mismatch")] + SchemaMismatch(#[source] Box), + + /// Failed to read a record view from the batch + #[error("failed to read record view")] + ReadView(#[source] Box), + + /// Integer overflow when converting to u128 + #[error("{0} does not fit in u128")] + InvalidU128(&'static str), + + /// Integer overflow when converting to u64 + #[error("{0} does not fit in u64")] + InvalidU64(&'static str), + + /// Value field is not a valid decimal string + #[error("value is not a valid decimal string")] + InvalidDecimalString, + + /// Negative value where unsigned integer was expected + #[error("{0} is negative")] + NegativeValue(&'static str), + + /// Block batch doesn't contain exactly one row + #[error("block batch must contain exactly one row, got {0}")] + InvalidBlockBatchSize(usize), + + /// Block batch is empty + #[error("block batch is empty")] + EmptyBlockBatch, +} + /// Escape and quote dataset name for SQL identifier safety. /// Replaces " with "" and wraps the result in double quotes. fn escape_dataset_name(dataset: &str) -> String { @@ -308,98 +438,6 @@ pub async fn fetch_blocks( dataset: &str, range: RangeInclusive, ) -> anyhow::Result> { - #[derive(typed_arrow::Record)] - struct BlockRecordView { - _block_num: u64, - block_num: u64, - timestamp: typed_arrow::Timestamp, - hash: [u8; 32], - parent_hash: [u8; 32], - ommers_hash: [u8; 32], - miner: [u8; 20], - state_root: [u8; 32], - transactions_root: [u8; 32], - receipt_root: [u8; 32], - logs_bloom: Vec, - difficulty: Decimal128<38, 0>, - total_difficulty: Option>, - gas_limit: u64, - gas_used: u64, - extra_data: Vec, - mix_hash: [u8; 32], - nonce: u64, - base_fee_per_gas: Option>, - withdrawals_root: Option<[u8; 32]>, - blob_gas_used: Option, - excess_blob_gas: Option, - parent_beacon_root: Option<[u8; 32]>, - requests_hash: Option<[u8; 32]>, - } - - #[derive(typed_arrow::Record)] - struct TransactionRecordView { - _block_num: u64, - block_hash: [u8; 32], - block_num: u64, - timestamp: typed_arrow::Timestamp, - tx_index: u32, - tx_hash: [u8; 32], - to: Option<[u8; 20]>, - nonce: u64, - gas_price: Option>, - gas_limit: u64, - value: String, - input: Vec, - r: [u8; 32], - s: [u8; 32], - v_parity: bool, - chain_id: Option, - gas_used: u64, - r#type: i32, - max_fee_per_gas: Option>, - max_priority_fee_per_gas: Option>, - max_fee_per_blob_gas: Option>, - from: [u8; 20], - status: bool, - state_root: Option<[u8; 32]>, - access_list: Option>, - blob_versioned_hashes: Option>, - authorization_list: Option>, - } - - #[derive(typed_arrow::Record)] - struct AccessListItem { - address: [u8; 20], - storage_keys: List<[u8; 32]>, - } - - #[derive(typed_arrow::Record)] - struct AuthorizationTuple { - chain_id: u64, - address: [u8; 20], - nonce: u64, - y_parity: bool, - r: [u8; 32], - s: [u8; 32], - } - - #[derive(typed_arrow::Record)] - struct LogRecordView { - _block_num: u64, - block_hash: [u8; 32], - block_num: u64, - timestamp: typed_arrow::Timestamp, - tx_hash: [u8; 32], - tx_index: u32, - log_index: u32, - address: [u8; 20], - topic0: Option<[u8; 32]>, - topic1: Option<[u8; 32]>, - topic2: Option<[u8; 32]>, - topic3: Option<[u8; 32]>, - data: Vec, - } - let start_block = *range.start(); let end_block = *range.end(); @@ -609,3 +647,204 @@ pub async fn fetch_blocks( Ok(block_data) } + +/// Convert Arrow RecordBatches (block, transactions, logs) to a verification Block. +/// +/// This is used for inline verification during extraction, where data is already +/// in RecordBatch format. The function expects: +/// - `block_batch`: A single-row RecordBatch containing one block's header data +/// - `transactions_batch`: RecordBatch containing all transactions for this block +/// - `logs_batch`: RecordBatch containing all logs for this block +/// +/// # Errors +/// +/// Returns an error if: +/// - The RecordBatch schema doesn't match expected EVM table schema +/// - Type conversions fail (e.g., overflow, missing required fields) +/// - The block batch doesn't contain exactly one row +pub fn block_from_record_batches( + block_batch: &RecordBatch, + transactions_batch: &RecordBatch, + logs_batch: &RecordBatch, +) -> Result { + // Parse logs + let mut logs = Vec::new(); + for view in logs_batch + .iter_views::() + .map_err(|e| ConversionError::SchemaMismatch(Box::new(e)))? + { + let view = view.map_err(|e| ConversionError::ReadView(Box::new(e)))?; + let view: LogRecordView = view + .try_into() + .map_err(|e| ConversionError::ReadView(Box::new(e)))?; + + // Collapse topic0-topic3 into single Vec + let topics: Vec = [view.topic0, view.topic1, view.topic2, view.topic3] + .into_iter() + .flatten() + .map(B256::from) + .collect(); + + let log = Log { + block_number: view.block_num, + tx_index: view.tx_index, + log_index: view.log_index, + address: Address::from(view.address), + topics, + data: Bytes::from(view.data), + }; + + logs.push(log); + } + + // Sort logs by (tx_index, log_index) + logs.sort_unstable_by_key(|log| (log.tx_index, log.log_index)); + + // Parse transactions + let mut transactions = Vec::new(); + for view in transactions_batch + .iter_views::() + .map_err(|e| ConversionError::SchemaMismatch(Box::new(e)))? + { + let view = view.map_err(|e| ConversionError::ReadView(Box::new(e)))?; + let view: TransactionRecordView = view + .try_into() + .map_err(|e| ConversionError::ReadView(Box::new(e)))?; + + let transaction = Transaction { + block_number: view.block_num, + tx_index: view.tx_index, + tx_hash: B256::from(view.tx_hash), + tx_type: view.r#type, + nonce: view.nonce, + gas_price: view + .gas_price + .map(|d| u128::try_from(d.into_value())) + .transpose() + .map_err(|_| ConversionError::InvalidU128("gas_price"))?, + max_fee_per_gas: view + .max_fee_per_gas + .map(|d| u128::try_from(d.into_value())) + .transpose() + .map_err(|_| ConversionError::InvalidU128("max_fee_per_gas"))?, + max_priority_fee_per_gas: view + .max_priority_fee_per_gas + .map(|d| u128::try_from(d.into_value())) + .transpose() + .map_err(|_| ConversionError::InvalidU128("max_priority_fee_per_gas"))?, + max_fee_per_blob_gas: view + .max_fee_per_blob_gas + .map(|d| u128::try_from(d.into_value())) + .transpose() + .map_err(|_| ConversionError::InvalidU128("max_fee_per_blob_gas"))?, + gas_limit: view.gas_limit, + to: view.to.map(Address::from), + value: U256::from_str_radix(&view.value, 10) + .map_err(|_| ConversionError::InvalidDecimalString)?, + input: Bytes::from(view.input), + r: view.r.into(), + s: view.s.into(), + v_parity: view.v_parity, + chain_id: view.chain_id, + from: Address::from(view.from), + access_list: view.access_list.map(|list| { + list.into_inner() + .into_iter() + .map(|item| { + let address = Address::from(item.address); + let storage_keys = item + .storage_keys + .into_inner() + .into_iter() + .map(B256::from) + .collect(); + (address, storage_keys) + }) + .collect() + }), + blob_versioned_hashes: view + .blob_versioned_hashes + .map(|hashes| hashes.into_inner().into_iter().map(B256::from).collect()), + gas_used: view.gas_used, + status: view.status, + state_root: view.state_root.map(B256::from), + authorization_list: view.authorization_list.map(|list| { + list.into_inner() + .into_iter() + .map(|item| { + ( + item.chain_id, + Address::from(item.address), + item.nonce, + item.y_parity, + B256::from(item.r), + B256::from(item.s), + ) + }) + .collect() + }), + }; + + transactions.push(transaction); + } + + transactions.sort_unstable_by_key(|tx| tx.tx_index); + + // Parse block header - should be exactly one row + if block_batch.num_rows() != 1 { + return Err(ConversionError::InvalidBlockBatchSize( + block_batch.num_rows(), + )); + } + + let mut block_iter = block_batch + .iter_views::() + .map_err(|e| ConversionError::SchemaMismatch(Box::new(e)))?; + let view = block_iter + .next() + .ok_or(ConversionError::EmptyBlockBatch)? + .map_err(|e| ConversionError::ReadView(Box::new(e)))?; + let view: BlockRecordView = view + .try_into() + .map_err(|e| ConversionError::ReadView(Box::new(e)))?; + + let block_num = view.block_num; + let block = Block { + number: block_num, + timestamp: (view.timestamp.value() / 1_000_000_000) as u64, + hash: BlockHash::from(view.hash), + parent_hash: BlockHash::from(view.parent_hash), + ommers_hash: BlockHash::from(view.ommers_hash), + miner: Address::from(view.miner), + state_root: BlockHash::from(view.state_root), + transactions_root: BlockHash::from(view.transactions_root), + receipts_root: BlockHash::from(view.receipt_root), + logs_bloom: view.logs_bloom.into(), + difficulty: U256::try_from(view.difficulty.into_value()) + .map_err(|_| ConversionError::NegativeValue("difficulty"))?, + total_difficulty: view + .total_difficulty + .map(|d| U256::try_from(d.into_value())) + .transpose() + .map_err(|_| ConversionError::NegativeValue("total_difficulty"))?, + gas_limit: view.gas_limit, + gas_used: view.gas_used, + extra_data: view.extra_data.into(), + mix_hash: BlockHash::from(view.mix_hash), + nonce: view.nonce, + base_fee_per_gas: view + .base_fee_per_gas + .map(|d| u64::try_from(d.into_value())) + .transpose() + .map_err(|_| ConversionError::InvalidU64("base_fee_per_gas"))?, + withdrawals_root: view.withdrawals_root.map(BlockHash::from), + blob_gas_used: view.blob_gas_used, + excess_blob_gas: view.excess_blob_gas, + parent_beacon_root: view.parent_beacon_root.map(BlockHash::from), + requests_hash: view.requests_hash.map(BlockHash::from), + transactions, + logs, + }; + + Ok(block) +} diff --git a/crates/core/verification/src/lib.rs b/crates/core/verification/src/lib.rs index a2ee4c9e1..e8314195f 100644 --- a/crates/core/verification/src/lib.rs +++ b/crates/core/verification/src/lib.rs @@ -1,4 +1,4 @@ -mod client; +pub mod client; mod diff; mod rpc; @@ -21,7 +21,49 @@ use reqwest::Url; use tokio::sync::Mutex; use tonic::transport::{ClientTlsConfig, Endpoint}; -use crate::client::{Block, Log, fetch_blocks}; +use crate::client::fetch_blocks; +pub use crate::client::{Block, ConversionError, Log, Transaction}; + +/// Error from cryptographic verification of block data. +#[derive(Debug, thiserror::Error)] +pub enum VerificationError { + /// Block hash computed from header fields doesn't match stored hash + #[error("block hash mismatch: computed {computed}, expected {expected}")] + BlockHashMismatch { + computed: alloy::primitives::B256, + expected: alloy::primitives::B256, + }, + + /// Transactions root computed from transactions doesn't match header + #[error("transactions root mismatch: computed {computed}, expected {expected}")] + TransactionsRootMismatch { + computed: alloy::primitives::B256, + expected: alloy::primitives::B256, + }, + + /// Receipts root computed from receipts doesn't match header + #[error("receipts root mismatch: computed {computed}, expected {expected}")] + ReceiptsRootMismatch { + computed: alloy::primitives::B256, + expected: alloy::primitives::B256, + }, + + /// Failed to build transaction envelope for root computation + #[error("failed to build transaction envelope")] + TxEnvelope(#[source] anyhow::Error), + + /// Failed to convert record batches to verification types + #[error("failed to convert record batches")] + Conversion(#[from] ConversionError), + + /// Missing required table for verification + #[error("verification enabled but '{0}' table not found in dataset")] + MissingTable(&'static str), + + /// Unsupported transaction type + #[error("unsupported transaction type: {0}")] + UnsupportedTxType(i32), +} #[derive(clap::Parser, Debug)] #[command(name = "verify", long_about = None)] @@ -199,7 +241,7 @@ async fn fetch_and_verify_batch( match verification_result { (Ok(_), _) => {} (Err(err), _) if rpc_url.is_none() => { - return Err(err).context(anyhow!("verify block {}", block_number)); + return Err(anyhow!(err).context(anyhow!("verify block {}", block_number))); } (Err(err), amp_block) => { let Some(rpc_url) = rpc_url else { @@ -210,7 +252,7 @@ async fn fetch_and_verify_batch( Ok(rpc_block) => diff::create_block_diff(&_block, &rpc_block), Err(err) => format!("RPC verification also failed: {err:#}"), }; - return Err(err + return Err(anyhow!(err) .context(rpc_verification_message) .context(anyhow!("verify block {}", block_number))); } @@ -222,32 +264,39 @@ async fn fetch_and_verify_batch( Ok(()) } -fn verify_block(block: &Block) -> anyhow::Result<()> { +pub fn verify_block(block: &Block) -> Result<(), VerificationError> { let computed_block_hash = alloy::consensus::Header::from(block).hash_slow(); - anyhow::ensure!(computed_block_hash == block.hash); + if computed_block_hash != block.hash { + return Err(VerificationError::BlockHashMismatch { + computed: computed_block_hash, + expected: block.hash, + }); + } - verify_transactions_root(block).context("transactions root")?; - verify_receipts_root(block).context("receipts root (logs)")?; + verify_transactions_root(block)?; + verify_receipts_root(block)?; Ok(()) } -fn verify_transactions_root(block: &Block) -> anyhow::Result<()> { +pub fn verify_transactions_root(block: &Block) -> Result<(), VerificationError> { let mut tx_envelopes = Vec::with_capacity(block.transactions.len()); for tx in &block.transactions { - tx_envelopes.push(tx.to_tx_envelope()?); + tx_envelopes.push(tx.to_tx_envelope().map_err(VerificationError::TxEnvelope)?); } - let transactions_root = alloy::consensus::proofs::calculate_transaction_root(&tx_envelopes); - anyhow::ensure!( - transactions_root == block.transactions_root, - "computed transactions root does not match transactions_root from block header", - ); + let computed_root = alloy::consensus::proofs::calculate_transaction_root(&tx_envelopes); + if computed_root != block.transactions_root { + return Err(VerificationError::TransactionsRootMismatch { + computed: computed_root, + expected: block.transactions_root, + }); + } Ok(()) } -fn verify_receipts_root(block: &Block) -> anyhow::Result<()> { +pub fn verify_receipts_root(block: &Block) -> Result<(), VerificationError> { let mut logs_by_tx: BTreeMap> = BTreeMap::new(); for log in &block.logs { logs_by_tx.entry(log.tx_index).or_default().push(log); @@ -293,16 +342,18 @@ fn verify_receipts_root(block: &Block) -> anyhow::Result<()> { 2 => alloy::consensus::ReceiptEnvelope::Eip1559(receipt.with_bloom()), 3 => alloy::consensus::ReceiptEnvelope::Eip4844(receipt.with_bloom()), 4 => alloy::consensus::ReceiptEnvelope::Eip7702(receipt.with_bloom()), - _ => anyhow::bail!("unsupported transaction type: {}", tx.tx_type), + _ => return Err(VerificationError::UnsupportedTxType(tx.tx_type)), }; receipts.push(receipt_envelope); } - let receipts_root = alloy::consensus::proofs::calculate_receipt_root(&receipts); - anyhow::ensure!( - receipts_root == block.receipts_root, - "computed receipts root does not match receipts_root from block header", - ); + let computed_root = alloy::consensus::proofs::calculate_receipt_root(&receipts); + if computed_root != block.receipts_root { + return Err(VerificationError::ReceiptsRootMismatch { + computed: computed_root, + expected: block.receipts_root, + }); + } Ok(()) } @@ -314,7 +365,7 @@ async fn fetch_and_verify_rpc_block( let block = rpc::fetch_rpc_block(rpc_url, block_number) .await .context("fetch block from RPC")?; - verify_block(&block).context("verify block from RPC")?; + verify_block(&block).map_err(|e| anyhow!(e).context("verify block from RPC"))?; Ok(block) } diff --git a/crates/core/worker-datasets-raw/Cargo.toml b/crates/core/worker-datasets-raw/Cargo.toml index 6542a76bf..77d27e07f 100644 --- a/crates/core/worker-datasets-raw/Cargo.toml +++ b/crates/core/worker-datasets-raw/Cargo.toml @@ -8,11 +8,11 @@ license-file.workspace = true amp-data-store = { path = "../data-store" } amp-parquet = { path = "../amp-parquet" } amp-providers-registry = { path = "../providers-registry" } +amp-worker-core = { path = "../worker-core" } common = { path = "../common" } +datafusion.workspace = true datasets-common = { path = "../datasets-common" } datasets-raw = { path = "../datasets-raw" } -amp-worker-core = { path = "../worker-core" } -datafusion.workspace = true futures.workspace = true metadata-db = { path = "../metadata-db" } monitoring = { path = "../monitoring" } @@ -21,6 +21,7 @@ serde_json.workspace = true thiserror.workspace = true tokio.workspace = true tracing.workspace = true +verification = { path = "../verification" } [dev-dependencies] rstest.workspace = true diff --git a/crates/core/worker-datasets-raw/src/job_descriptor.rs b/crates/core/worker-datasets-raw/src/job_descriptor.rs index 9c2e90f9c..7e6b61f58 100644 --- a/crates/core/worker-datasets-raw/src/job_descriptor.rs +++ b/crates/core/worker-datasets-raw/src/job_descriptor.rs @@ -18,6 +18,15 @@ pub struct JobDescriptor { pub dataset_name: Name, /// The hash of the dataset manifest that defines the extraction. pub manifest_hash: Hash, + /// Enable cryptographic verification of EVM block data during extraction. + /// + /// When enabled, verifies block hashes, transaction roots, and receipt roots + /// before writing data. Requires the dataset to have `blocks`, `transactions`, + /// and `logs` tables with EVM schema. Verification failures are retryable errors. + /// + /// Defaults to `false`. + #[serde(default)] + pub verify: bool, } fn default_max_writers() -> u16 { diff --git a/crates/core/worker-datasets-raw/src/job_impl.rs b/crates/core/worker-datasets-raw/src/job_impl.rs index ee9cfc3bb..1217002fd 100644 --- a/crates/core/worker-datasets-raw/src/job_impl.rs +++ b/crates/core/worker-datasets-raw/src/job_impl.rs @@ -362,6 +362,7 @@ pub async fn execute( start, end, latest_block, + desc.verify, ) .await .map_err(Error::PartitionTask)?; diff --git a/crates/core/worker-datasets-raw/src/job_impl/ranges.rs b/crates/core/worker-datasets-raw/src/job_impl/ranges.rs index e0f3810ab..18ba48df8 100644 --- a/crates/core/worker-datasets-raw/src/job_impl/ranges.rs +++ b/crates/core/worker-datasets-raw/src/job_impl/ranges.rs @@ -61,6 +61,8 @@ pub(super) async fn materialize_ranges( job_end_block: Option, // Current chain head block, used for percentage calculation in continuous mode chain_head: BlockNum, + // Enable cryptographic verification of EVM block data + verify: bool, ) -> Result<(), TryWaitAllError> { tracing::info!( "materializing ranges {}", @@ -117,6 +119,7 @@ pub(super) async fn materialize_ranges( id: i as u32, metrics: ctx.metrics.clone(), progress_tracker: progress_tracker.clone(), + verify, }); // Spawn the writers, starting them with a 1-second delay between each. @@ -426,6 +429,8 @@ struct MaterializePartition { metrics: Option>, /// A progress tracker which logs the overall progress of all partitions. progress_tracker: Arc, + /// Enable cryptographic verification of EVM block data + verify: bool, } impl MaterializePartition { /// Consumes the instance returning a future that runs the partition, processing all assigned block ranges sequentially. @@ -535,6 +540,11 @@ impl MaterializePartition { dataset_rows: Rows, writer: &mut RawDatasetWriter, ) -> Result<(), RunRangeError> { + // Verify block data if verification is enabled + if self.verify { + self.verify_block_data(&dataset_rows)?; + } + for table_rows in dataset_rows { if let Some(ref metrics) = self.metrics { let num_rows: u64 = table_rows.rows.num_rows().try_into().unwrap(); @@ -560,6 +570,61 @@ impl MaterializePartition { self.progress_tracker.block_covered(block_num); Ok(()) } + + /// Verify block data using cryptographic verification. + /// + /// Expects the dataset to contain blocks, transactions, and logs tables. + /// Converts the Arrow RecordBatches to verification types and performs: + /// - Block hash verification + /// - Transaction root verification + /// - Receipt root verification + #[expect(clippy::result_large_err)] + fn verify_block_data(&self, dataset_rows: &Rows) -> Result<(), RunRangeError> { + // Collect the RecordBatches for blocks, transactions, and logs + let mut block_batch = None; + let mut transactions_batch = None; + let mut logs_batch = None; + + for table_rows in dataset_rows { + let table_name = table_rows.table.name().to_string(); + match table_name.as_str() { + "blocks" => block_batch = Some(&table_rows.rows), + "transactions" => transactions_batch = Some(&table_rows.rows), + "logs" => logs_batch = Some(&table_rows.rows), + _ => {} // Ignore other tables + } + } + + // Ensure all required tables are present + let block_batch = block_batch.ok_or_else(|| { + RunRangeError::verification_failed(verification::VerificationError::MissingTable( + "blocks", + )) + })?; + let transactions_batch = transactions_batch.ok_or_else(|| { + RunRangeError::verification_failed(verification::VerificationError::MissingTable( + "transactions", + )) + })?; + let logs_batch = logs_batch.ok_or_else(|| { + RunRangeError::verification_failed(verification::VerificationError::MissingTable( + "logs", + )) + })?; + + // Convert RecordBatches to verification Block + let block = verification::client::block_from_record_batches( + block_batch, + transactions_batch, + logs_batch, + ) + .map_err(|err| RunRangeError::verification_failed(err.into()))?; + + // Run verification + verification::verify_block(&block).map_err(RunRangeError::verification_failed)?; + + Ok(()) + } } /// Errors that occur when running a block range materialize operation. @@ -615,6 +680,13 @@ impl RunRangeError { block_range: None, } } + + pub(super) fn verification_failed(err: verification::VerificationError) -> Self { + Self { + kind: RunRangeErrorKind::VerificationFailed(err), + block_range: None, + } + } } impl std::fmt::Display for RunRangeError { @@ -708,6 +780,22 @@ enum RunRangeErrorKind { /// - Compaction task failure #[error("Failed to close")] Close(#[source] RawDatasetWriterCloseError), + + /// Block verification failed + /// + /// This occurs when cryptographic verification of block data fails during extraction. + /// The block hash, transaction root, or receipt root does not match the computed value + /// from the extracted data. + /// + /// Possible causes: + /// - Data corruption during extraction + /// - Provider returned invalid data + /// - Missing required tables (blocks, transactions, logs) + /// - Schema mismatch between extraction and verification expectations + /// + /// This error is retryable as the issue may be transient. + #[error("Verification failed")] + VerificationFailed(#[source] verification::VerificationError), } impl RetryableErrorExt for RunRangeErrorKind { @@ -719,6 +807,7 @@ impl RetryableErrorExt for RunRangeErrorKind { Self::NonIncreasingBlockNum { .. } => false, Self::Write(err) => err.is_retryable(), Self::Close(err) => err.is_retryable(), + Self::VerificationFailed(_) => true, } } } diff --git a/crates/services/admin-api/src/handlers/datasets/deploy.rs b/crates/services/admin-api/src/handlers/datasets/deploy.rs index 1f0df5d7b..b11307078 100644 --- a/crates/services/admin-api/src/handlers/datasets/deploy.rs +++ b/crates/services/admin-api/src/handlers/datasets/deploy.rs @@ -36,6 +36,7 @@ use crate::{ /// - `end_block`: End block configuration (null for continuous, "latest", number, or negative offset) /// - `parallelism`: Number of parallel workers (default: 1, only for raw datasets) /// - `worker_id`: Optional worker selector (exact ID or glob pattern) +/// - `verify`: Enable cryptographic verification of EVM block data (default: false) /// /// ## Response /// - **202 Accepted**: Job successfully scheduled @@ -107,6 +108,7 @@ pub async fn handler( end_block, parallelism, worker_id, + verify, } = match json { Ok(Json(request)) => request, Err(err) => { @@ -162,6 +164,7 @@ pub async fn handler( dataset_namespace: reference.namespace().clone(), dataset_name: reference.name().clone(), manifest_hash: reference.hash().clone(), + verify, } .into() }; @@ -252,6 +255,16 @@ pub struct DeployRequest { #[serde(default)] #[cfg_attr(feature = "utoipa", schema(value_type = Option))] pub worker_id: Option, + + /// Enable cryptographic verification of EVM block data during extraction. + /// + /// When enabled, verifies block hashes, transaction roots, and receipt roots + /// before writing data to storage. Only applicable to EVM raw datasets. + /// Verification failures are retryable errors. + /// + /// Defaults to false if not specified. + #[serde(default)] + pub verify: bool, } fn default_parallelism() -> u16 { diff --git a/docs/feat/admin-dataset.md b/docs/feat/admin-dataset.md index 3f74ed3b5..10833eae4 100644 --- a/docs/feat/admin-dataset.md +++ b/docs/feat/admin-dataset.md @@ -69,6 +69,9 @@ ampctl dataset deploy my_namespace/my_dataset@1.0.0 --parallelism 4 # Assign to a specific worker ampctl dataset deploy my_namespace/my_dataset@1.0.0 --worker-id my-worker + +# Enable cryptographic verification for EVM datasets +ampctl dataset deploy my_namespace/my_dataset@1.0.0 --verify ``` **List registered datasets:** diff --git a/docs/feat/verification.md b/docs/feat/verification.md index 13ef1258e..d8315a96d 100644 --- a/docs/feat/verification.md +++ b/docs/feat/verification.md @@ -51,15 +51,27 @@ When all verifications pass, the dataset provides these guarantees: ## What is Not Verified -| Item | Reason | -| ---------------- | ------------------------------------------------------------------------------------------------------------ | -| EVM execution | Transactions are not re-executed to verify state transitions or gas calculations | -| Consensus rules | Proof-of-work/proof-of-stake, block timestamps, gas limits, and other consensus parameters are not validated | -| Canonical chain | Block headers are verified internally but not proven to belong to the chain agreed upon by network consensus | +| Item | Reason | +| --------------- | ------------------------------------------------------------------------------------------------------------ | +| EVM execution | Transactions are not re-executed to verify state transitions or gas calculations | +| Consensus rules | Proof-of-work/proof-of-stake, block timestamps, gas limits, and other consensus parameters are not validated | +| Canonical chain | Block headers are verified internally but not proven to belong to the chain agreed upon by network consensus | ## Usage -Run verification against a block range: +### Extraction-Time Verification + +Enable verification during extraction to catch data corruption immediately: + +```bash +ampctl dataset deploy my_namespace/my_dataset@1.0.0 --verify +``` + +When `--verify` is enabled, each block is cryptographically verified before being written to storage. This is EVM-only and requires the dataset to have `blocks`, `transactions`, and `logs` tables. Verification failures are retryable errors, allowing the extraction to recover from transient issues. + +### Post-Extraction Verification + +Run verification against an already-extracted block range: ```bash ampctl verify --dataset=edgeandnode/ethereum_mainnet --start-block=0 --end-block=100000 diff --git a/docs/openapi-specs/admin.spec.json b/docs/openapi-specs/admin.spec.json index bac9c92f5..d6c3b6998 100644 --- a/docs/openapi-specs/admin.spec.json +++ b/docs/openapi-specs/admin.spec.json @@ -295,7 +295,7 @@ "datasets" ], "summary": "Handler for the `POST /datasets/{namespace}/{name}/versions/{revision}/deploy` endpoint", - "description": "Schedules a data extraction job for the specified dataset revision.\n\n## Path Parameters\n- `namespace`: Dataset namespace\n- `name`: Dataset name\n- `revision`: Revision (version, hash, \"latest\", or \"dev\")\n\n## Request Body\n- `end_block`: End block configuration (null for continuous, \"latest\", number, or negative offset)\n- `parallelism`: Number of parallel workers (default: 1, only for raw datasets)\n- `worker_id`: Optional worker selector (exact ID or glob pattern)\n\n## Response\n- **202 Accepted**: Job successfully scheduled\n- **400 Bad Request**: Invalid path parameters or request body\n- **404 Not Found**: Dataset or revision not found\n- **500 Internal Server Error**: Database or scheduler error\n\n## Error Codes\n- `INVALID_PATH`: Invalid path parameters (namespace, name, or revision)\n- `INVALID_BODY`: Invalid request body (malformed JSON or missing required fields)\n- `DATASET_NOT_FOUND`: The specified dataset or revision does not exist\n- `LIST_VERSION_TAGS_ERROR`: Failed to list version tags from dataset store\n- `RESOLVE_REVISION_ERROR`: Failed to resolve revision to manifest hash\n- `GET_DATASET_ERROR`: Failed to load dataset from store\n- `WORKER_NOT_AVAILABLE`: Specified worker not found or inactive\n- `SCHEDULER_ERROR`: Failed to schedule extraction job\n\n## Behavior\nThis endpoint schedules a data extraction job for a dataset:\n1. Resolves the revision to find the corresponding version tag\n2. Loads the full dataset configuration from the dataset store\n3. Schedules an extraction job with the specified parameters\n4. Returns job ID for tracking\n\nThe revision parameter supports four types:\n- Semantic version (e.g., \"1.2.3\") - uses that specific version\n- \"latest\" - resolves to the highest semantic version\n- \"dev\" - resolves to the development version tag\n- Manifest hash (SHA256 hash) - finds the version that points to this hash\n\nJobs are executed asynchronously by worker nodes. Use the returned job ID\nto track progress via the jobs endpoints.", + "description": "Schedules a data extraction job for the specified dataset revision.\n\n## Path Parameters\n- `namespace`: Dataset namespace\n- `name`: Dataset name\n- `revision`: Revision (version, hash, \"latest\", or \"dev\")\n\n## Request Body\n- `end_block`: End block configuration (null for continuous, \"latest\", number, or negative offset)\n- `parallelism`: Number of parallel workers (default: 1, only for raw datasets)\n- `worker_id`: Optional worker selector (exact ID or glob pattern)\n- `verify`: Enable cryptographic verification of EVM block data (default: false)\n\n## Response\n- **202 Accepted**: Job successfully scheduled\n- **400 Bad Request**: Invalid path parameters or request body\n- **404 Not Found**: Dataset or revision not found\n- **500 Internal Server Error**: Database or scheduler error\n\n## Error Codes\n- `INVALID_PATH`: Invalid path parameters (namespace, name, or revision)\n- `INVALID_BODY`: Invalid request body (malformed JSON or missing required fields)\n- `DATASET_NOT_FOUND`: The specified dataset or revision does not exist\n- `LIST_VERSION_TAGS_ERROR`: Failed to list version tags from dataset store\n- `RESOLVE_REVISION_ERROR`: Failed to resolve revision to manifest hash\n- `GET_DATASET_ERROR`: Failed to load dataset from store\n- `WORKER_NOT_AVAILABLE`: Specified worker not found or inactive\n- `SCHEDULER_ERROR`: Failed to schedule extraction job\n\n## Behavior\nThis endpoint schedules a data extraction job for a dataset:\n1. Resolves the revision to find the corresponding version tag\n2. Loads the full dataset configuration from the dataset store\n3. Schedules an extraction job with the specified parameters\n4. Returns job ID for tracking\n\nThe revision parameter supports four types:\n- Semantic version (e.g., \"1.2.3\") - uses that specific version\n- \"latest\" - resolves to the highest semantic version\n- \"dev\" - resolves to the development version tag\n- Manifest hash (SHA256 hash) - finds the version that points to this hash\n\nJobs are executed asynchronously by worker nodes. Use the returned job ID\nto track progress via the jobs endpoints.", "operationId": "deploy_dataset", "parameters": [ { @@ -2630,6 +2630,10 @@ "description": "Number of parallel workers to run\n\nEach worker will be responsible for an equal number of blocks.\nFor example, if extracting blocks 0-10,000,000 with parallelism=10,\neach worker will handle a contiguous section of 1 million blocks.\n\nOnly applicable to raw datasets (EVM RPC, Firehose, etc.).\nDerived datasets ignore this parameter.\n\nDefaults to 1 if not specified.", "minimum": 0 }, + "verify": { + "type": "boolean", + "description": "Enable cryptographic verification of EVM block data during extraction.\n\nWhen enabled, verifies block hashes, transaction roots, and receipt roots\nbefore writing data to storage. Only applicable to EVM raw datasets.\nVerification failures are retryable errors.\n\nDefaults to false if not specified." + }, "worker_id": { "type": [ "string", diff --git a/tests/src/main.rs b/tests/src/main.rs index 717fb7955..355d571ad 100644 --- a/tests/src/main.rs +++ b/tests/src/main.rs @@ -385,7 +385,7 @@ async fn bless( // Dump the dataset tracing::debug!(%dataset, end_block=end, "Dumping dataset"); - test_helpers::deploy_and_wait(ampctl, &dataset, Some(end), Duration::from_secs(30)) + test_helpers::deploy_and_wait(ampctl, &dataset, Some(end), Duration::from_secs(30), false) .await .map_err(|err| { anyhow!( diff --git a/tests/src/steps/dump.rs b/tests/src/steps/dump.rs index 1eed1e743..d72bd0425 100644 --- a/tests/src/steps/dump.rs +++ b/tests/src/steps/dump.rs @@ -46,6 +46,7 @@ impl Step { &self.dataset, Some(self.end), Duration::from_secs(30), + false, ) .await .expect("Failed to dump dataset via worker"); diff --git a/tests/src/testlib/fixtures/ampctl.rs b/tests/src/testlib/fixtures/ampctl.rs index ad50c3943..29c0986fe 100644 --- a/tests/src/testlib/fixtures/ampctl.rs +++ b/tests/src/testlib/fixtures/ampctl.rs @@ -194,6 +194,7 @@ impl Ampctl { end_block: Option, parallelism: Option, worker_id: Option, + verify: bool, ) -> Result { let reference: Reference = dataset_ref.parse().map_err(|err| { anyhow!( @@ -212,6 +213,7 @@ impl Ampctl { end_block_param, parallelism.unwrap_or(1), worker_id, + verify, ) .await .map_err(Into::into) diff --git a/tests/src/testlib/helpers.rs b/tests/src/testlib/helpers.rs index a6b737b3b..4d7778d25 100644 --- a/tests/src/testlib/helpers.rs +++ b/tests/src/testlib/helpers.rs @@ -127,6 +127,7 @@ pub async fn deploy_and_wait( dataset_ref: &Reference, end_block: Option, timeout: Duration, + verify: bool, ) -> Result { let is_continuous = end_block.is_none(); let job_id = ampctl @@ -135,6 +136,7 @@ pub async fn deploy_and_wait( end_block, Some(1), // parallelism None, // worker_id (auto-select) + verify, ) .await?; diff --git a/tests/src/tests/it_admin_api_datasets_list_jobs.rs b/tests/src/tests/it_admin_api_datasets_list_jobs.rs index 93b59769f..c9b06d18c 100644 --- a/tests/src/tests/it_admin_api_datasets_list_jobs.rs +++ b/tests/src/tests/it_admin_api_datasets_list_jobs.rs @@ -211,7 +211,7 @@ impl TestCtx { ); self.ampctl_client .datasets() - .deploy(&reference, None, 1, None) + .deploy(&reference, None, 1, None, false) .await } diff --git a/tests/src/tests/it_admin_api_datasets_stop_job.rs b/tests/src/tests/it_admin_api_datasets_stop_job.rs index a263f4f55..6d9688212 100644 --- a/tests/src/tests/it_admin_api_datasets_stop_job.rs +++ b/tests/src/tests/it_admin_api_datasets_stop_job.rs @@ -301,7 +301,7 @@ impl TestCtx { ) -> Result { self.ampctl_client .datasets() - .deploy(&self.dataset_ref, end_block, 1, None) + .deploy(&self.dataset_ref, end_block, 1, None, false) .await } @@ -330,7 +330,7 @@ impl TestCtx { ); self.ampctl_client .datasets() - .deploy(&reference, None, 1, None) + .deploy(&reference, None, 1, None, false) .await } diff --git a/tests/src/tests/it_admin_api_jobs_progress.rs b/tests/src/tests/it_admin_api_jobs_progress.rs index dbe9429bd..308e2d577 100644 --- a/tests/src/tests/it_admin_api_jobs_progress.rs +++ b/tests/src/tests/it_admin_api_jobs_progress.rs @@ -321,7 +321,7 @@ impl TestCtx { let reference = format!("{}/{}@{}", namespace, name, revision); let job_id = ampctl - .dataset_deploy(&reference, end_block, None, None) + .dataset_deploy(&reference, end_block, None, None, false) .await .expect("failed to deploy dataset"); diff --git a/tests/src/tests/it_admin_api_revisions.rs b/tests/src/tests/it_admin_api_revisions.rs index 75e5106cd..d6ec75669 100644 --- a/tests/src/tests/it_admin_api_revisions.rs +++ b/tests/src/tests/it_admin_api_revisions.rs @@ -656,7 +656,7 @@ async fn delete_revision_with_active_writer_returns_409() { // Deploy dataset — schedules a job, worker creates revisions and assigns writer ctx.ampctl_client - .dataset_deploy("_/anvil_rpc@0.0.0", None, None, None) + .dataset_deploy("_/anvil_rpc@0.0.0", None, None, None, false) .await .expect("failed to deploy dataset"); @@ -710,7 +710,7 @@ async fn delete_revision_with_stopped_writer_succeeds() { // Deploy dataset — schedules a job, worker creates revisions and assigns writer let job_id = ctx .ampctl_client - .dataset_deploy("_/anvil_rpc@0.0.0", None, None, None) + .dataset_deploy("_/anvil_rpc@0.0.0", None, None, None, false) .await .expect("failed to deploy dataset"); @@ -892,7 +892,7 @@ async fn truncate_revision_with_active_writer_returns_409() { // Deploy dataset — schedules a job, worker creates revisions and assigns writer ctx.ampctl_client - .dataset_deploy("_/anvil_rpc@0.0.0", None, None, None) + .dataset_deploy("_/anvil_rpc@0.0.0", None, None, None, false) .await .expect("failed to deploy dataset"); @@ -946,7 +946,7 @@ async fn truncate_revision_with_stopped_writer_succeeds() { // Deploy dataset — schedules a job, worker creates revisions and assigns writer let job_id = ctx .ampctl_client - .dataset_deploy("_/anvil_rpc@0.0.0", None, None, None) + .dataset_deploy("_/anvil_rpc@0.0.0", None, None, None, false) .await .expect("failed to deploy dataset"); @@ -1110,7 +1110,7 @@ async fn prune_revision_with_active_writer_returns_409() { // Deploy dataset - schedules a job, worker creates revisions and assigns writer ctx.ampctl_client - .dataset_deploy("_/anvil_rpc@0.0.0", None, None, None) + .dataset_deploy("_/anvil_rpc@0.0.0", None, None, None, false) .await .expect("failed to deploy dataset"); @@ -1151,7 +1151,7 @@ async fn prune_revision_with_stopped_writer_succeeds() { // Deploy dataset - schedules a job, worker creates revisions and assigns writer let job_id = ctx .ampctl_client - .dataset_deploy("_/anvil_rpc@0.0.0", None, None, None) + .dataset_deploy("_/anvil_rpc@0.0.0", None, None, None, false) .await .expect("failed to deploy dataset"); @@ -1216,6 +1216,7 @@ async fn prune_revision_with_reorg_schedules_non_canonical_files() { &dataset_ref, Some(10), std::time::Duration::from_secs(30), + false, ) .await .expect("failed to deploy dataset"); @@ -1252,6 +1253,7 @@ async fn prune_revision_with_reorg_schedules_non_canonical_files() { &dataset_ref, Some(13), std::time::Duration::from_secs(30), + false, ) .await .expect("failed to deploy dataset after reorg"); diff --git a/tests/src/tests/it_client.rs b/tests/src/tests/it_client.rs index 12edcda30..014e236d7 100644 --- a/tests/src/tests/it_client.rs +++ b/tests/src/tests/it_client.rs @@ -157,9 +157,15 @@ impl TestCtx { async fn dump(&self, dataset: &str, end: BlockNum) { let ampctl = self.ctx.new_ampctl(); let dataset_ref: Reference = dataset.parse().expect("failed to parse dataset reference"); - test_helpers::deploy_and_wait(&ctl, &dataset_ref, Some(end), Duration::from_secs(30)) - .await - .expect("Failed to dump dataset via worker"); + test_helpers::deploy_and_wait( + &ctl, + &dataset_ref, + Some(end), + Duration::from_secs(30), + false, + ) + .await + .expect("Failed to dump dataset via worker"); } /// Query blocks from the specified dataset. diff --git a/tests/src/tests/it_dump.rs b/tests/src/tests/it_dump.rs index c1a5df49b..11337149f 100644 --- a/tests/src/tests/it_dump.rs +++ b/tests/src/tests/it_dump.rs @@ -18,7 +18,7 @@ async fn evm_rpc_single_dump() { test.deactivate_all_revisions(&tables).await; //* When - test.dump_and_create_snapshot(block).await; + test.dump_and_create_snapshot(block, false).await; //* Then // Validate table consistency @@ -55,7 +55,7 @@ async fn evm_rpc_single_dump_fetch_receipts_per_tx() { test.deactivate_all_revisions(&tables).await; //* When - test.dump_and_create_snapshot(block).await; + test.dump_and_create_snapshot(block, false).await; //* Then // Validate table consistency @@ -92,7 +92,7 @@ async fn evm_rpc_base_single_dump() { test.deactivate_all_revisions(&tables).await; //* When - test.dump_and_create_snapshot(block).await; + test.dump_and_create_snapshot(block, false).await; //* Then // Validate table consistency @@ -129,7 +129,7 @@ async fn evm_rpc_base_single_dump_fetch_receipts_per_tx() { test.deactivate_all_revisions(&tables).await; //* When - test.dump_and_create_snapshot(block).await; + test.dump_and_create_snapshot(block, false).await; //* Then // Validate table consistency @@ -166,7 +166,7 @@ async fn eth_firehose_single_dump() { test.deactivate_all_revisions(&tables).await; //* When - test.dump_and_create_snapshot(block).await; + test.dump_and_create_snapshot(block, false).await; //* Then // Validate table consistency @@ -203,7 +203,7 @@ async fn base_firehose_single_dump() { test.deactivate_all_revisions(&tables).await; //* When - test.dump_and_create_snapshot(block).await; + test.dump_and_create_snapshot(block, false).await; //* Then // Validate table consistency @@ -293,7 +293,7 @@ impl TestCtx { /// /// This method exercises the full production code path by scheduling a job /// via the Admin API and waiting for the worker to complete it. - async fn dump_and_create_snapshot(&self, block: u64) -> Vec> { + async fn dump_and_create_snapshot(&self, block: u64, verify: bool) -> Vec> { let ampctl = self.ctx.new_ampctl(); // Deploy via scheduler/worker @@ -302,6 +302,7 @@ impl TestCtx { &self.dataset_ref, Some(block), Duration::from_secs(60), + verify, ) .await .expect("Failed to dump dataset via worker"); @@ -393,3 +394,30 @@ fn assert_json_eq_ignoring_nulls(reference: &Value, dumped: &Value) { } } } + +#[tokio::test] +async fn evm_rpc_single_dump_with_verify() { + //* Given + let test = TestCtx::setup( + "evm_rpc_single_dump_with_verify", + "_/eth_rpc@0.0.0", + "rpc_eth_mainnet", + ) + .await; + + let block = test.get_dataset_start_block().await; + let tables = test.restore_reference_snapshot().await; + test.deactivate_all_revisions(&tables).await; + + //* When - Extract with verification enabled + test.dump_and_create_snapshot(block, true).await; + + //* Then - If we get here, verification passed during extraction + // Validate table consistency + for table in &tables { + test_helpers::check_table_consistency(table, test.ctx.daemon_server().data_store()) + .await + .expect("Table consistency check failed"); + } + // Test succeeds if the job completes without verification errors +} diff --git a/tests/src/tests/it_functions_eth_call.rs b/tests/src/tests/it_functions_eth_call.rs index 6a15a9a8a..098eb3f8f 100644 --- a/tests/src/tests/it_functions_eth_call.rs +++ b/tests/src/tests/it_functions_eth_call.rs @@ -24,7 +24,7 @@ async fn eth_call_reads_counter_value_after_increments() { // Deploy anvil_rpc dataset (dependency for amp_demo) using ampctl ampctl - .dataset_deploy("_/anvil_rpc@0.0.0", None, Some(1), None) + .dataset_deploy("_/anvil_rpc@0.0.0", None, Some(1), None, false) .await .expect("Failed to deploy anvil_rpc dataset"); diff --git a/tests/src/tests/it_multi_table_continuous.rs b/tests/src/tests/it_multi_table_continuous.rs index 8207136d5..eda46a667 100644 --- a/tests/src/tests/it_multi_table_continuous.rs +++ b/tests/src/tests/it_multi_table_continuous.rs @@ -41,6 +41,7 @@ async fn dump_multi_table_derived_dataset_in_continuous_mode_populates_all_table &anvil_dataset_ref, Some(5), Duration::from_secs(30), + false, ) .await .expect("Failed to dump dataset anvil_rpc"); @@ -63,6 +64,7 @@ async fn dump_multi_table_derived_dataset_in_continuous_mode_populates_all_table .expect("failed to parse dataset reference"), None, Duration::from_secs(30), + false, ) .await .expect("Failed to dump dataset"); @@ -84,6 +86,7 @@ async fn dump_multi_table_derived_dataset_in_continuous_mode_populates_all_table &anvil_dataset_ref, Some(8), Duration::from_secs(30), + false, ) .await .expect("Failed to dump anvil_rpc with new blocks"); diff --git a/tests/src/tests/it_reorg.rs b/tests/src/tests/it_reorg.rs index 00f1cbdd2..94d5c2b7f 100644 --- a/tests/src/tests/it_reorg.rs +++ b/tests/src/tests/it_reorg.rs @@ -437,7 +437,7 @@ impl ReorgTestCtx { async fn dump(&self, dataset: &str, end: Option) { let ampctl = self.ctx.new_ampctl(); let dataset_ref: Reference = format!("_/{}@0.0.0", dataset).parse().unwrap(); - test_helpers::deploy_and_wait(&ctl, &dataset_ref, end, Duration::from_secs(30)) + test_helpers::deploy_and_wait(&ctl, &dataset_ref, end, Duration::from_secs(30), false) .await .expect("Failed to dump dataset"); } diff --git a/tests/src/tests/it_server_sql_references.rs b/tests/src/tests/it_server_sql_references.rs index 4c8a27877..a7656a84d 100644 --- a/tests/src/tests/it_server_sql_references.rs +++ b/tests/src/tests/it_server_sql_references.rs @@ -311,9 +311,15 @@ impl TestCtx { .parse() .expect("should parse multi_version reference"); let ampctl = self.ctx.new_ampctl(); - test_helpers::deploy_and_wait(&ctl, &dump_ref, Some(15000000), Duration::from_secs(60)) - .await - .expect("failed to dump multi_version dataset"); + test_helpers::deploy_and_wait( + &ctl, + &dump_ref, + Some(15000000), + Duration::from_secs(60), + false, + ) + .await + .expect("failed to dump multi_version dataset"); } /// Register the `basic_function` package with a semver tag. diff --git a/tests/src/tests/it_sql_dataset_batch_size.rs b/tests/src/tests/it_sql_dataset_batch_size.rs index 539ce432c..9fbc8a468 100644 --- a/tests/src/tests/it_sql_dataset_batch_size.rs +++ b/tests/src/tests/it_sql_dataset_batch_size.rs @@ -132,9 +132,15 @@ impl TestCtx { async fn dump_dataset(&self, dataset: &str, end: u64) { let ampctl = self.ctx.new_ampctl(); let dataset_ref: Reference = dataset.parse().expect("failed to parse dataset reference"); - test_helpers::deploy_and_wait(&ctl, &dataset_ref, Some(end), Duration::from_secs(30)) - .await - .expect("Failed to dump dataset"); + test_helpers::deploy_and_wait( + &ctl, + &dataset_ref, + Some(end), + Duration::from_secs(30), + false, + ) + .await + .expect("Failed to dump dataset"); } /// Create a catalog for the specified dataset. diff --git a/tests/src/tests/it_worker_events.rs b/tests/src/tests/it_worker_events.rs index 00d7dcf7a..df81c65af 100644 --- a/tests/src/tests/it_worker_events.rs +++ b/tests/src/tests/it_worker_events.rs @@ -404,7 +404,7 @@ async fn e2e_anvil_sync_captures_lifecycle_events() { //* When: Deploy the dataset with an end block so the job completes let ampctl = ctx.new_ampctl(); let job_id = ampctl - .dataset_deploy("_/anvil_rpc@0.0.0", Some(2000), None, None) + .dataset_deploy("_/anvil_rpc@0.0.0", Some(2000), None, None, false) .await .expect("failed to deploy dataset");