From a259ca14d16a7d274f5a5f9d1457be74924e3f20 Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Sat, 14 Mar 2026 16:32:06 +0000 Subject: [PATCH] feat: isolate and track forester worker concurrency --- forester/CHANGELOG.md | 10 ++ forester/src/epoch_manager.rs | 167 ++++++++++++------ forester/src/metrics.rs | 6 +- forester/src/priority_fee.rs | 2 +- forester/src/processor/v2/helpers.rs | 123 ++++++------- forester/src/processor/v2/proof_worker.rs | 6 +- forester/tests/e2e_test.rs | 9 +- .../batched_state_async_indexer_test.rs | 9 +- forester/tests/test_utils.rs | 9 +- prover/client/src/errors.rs | 1 - prover/client/src/helpers.rs | 15 +- prover/client/src/proof_client.rs | 6 +- .../batch_address_append/proof_inputs.rs | 128 ++------------ .../src/proof_types/combined/v2/json.rs | 14 +- .../src/proof_types/non_inclusion/v2/json.rs | 21 ++- prover/client/tests/batch_address_append.rs | 3 - prover/client/tests/init_merkle_tree.rs | 2 +- prover/server/prover/common/types.go | 2 +- .../prover/v1/inclusion_proving_system.go | 2 +- .../server/prover/v1/marshal_non_inclusion.go | 2 +- .../prover/v1/non_inclusion_proving_system.go | 2 +- prover/server/prover/v1/non_inclusion_test.go | 2 +- prover/server/prover/v1/test_data_helpers.go | 4 +- .../prover/v2/inclusion_proving_system.go | 2 +- prover/server/prover/v2/marshal_inclusion.go | 2 +- .../server/prover/v2/marshal_non_inclusion.go | 2 +- .../prover/v2/non_inclusion_proving_system.go | 2 +- prover/server/prover/v2/test_data_helpers.go | 4 +- sdk-libs/client/src/local_test_validator.rs | 58 +++--- .../program-test/src/indexer/test_indexer.rs | 3 +- 30 files changed, 295 insertions(+), 323 deletions(-) diff --git a/forester/CHANGELOG.md b/forester/CHANGELOG.md index 388f633a93..8b2af02dfc 100644 --- a/forester/CHANGELOG.md +++ b/forester/CHANGELOG.md @@ -2,6 +2,16 @@ ## [Unreleased] +### Added + +- **Graceful shutdown signaling** via `watch::channel`. Shutdown requests are now race-free regardless of when the run loop subscribes. +- **Panic isolation for `process_epoch`.** A panicking epoch no longer kills the run loop; the panic message is logged and processing continues. + +### Fixed + +- **`bigint_to_u8_32` now rejects negative `BigInt` inputs** (`light-prover-client`). Previously, negative inputs were silently converted to `[u8; 32]` using only the magnitude bytes, producing wrong-sign output that would cause silent proof-input corruption. +- **`pathIndex` widened from `u32` to `u64`** on both the Rust client and the Go prover server. The Gnark circuit already constrained by tree height (up to 40 bits for v2 address trees); only the JSON marshalling and runtime struct types were artificially narrow. This prevents proof generation failures once a v2 address tree exceeds ~4.3 billion entries. + ### Breaking Changes - **Removed `--photon-api-key` CLI arg and `PHOTON_API_KEY` env var.** The API key should now be included in `--indexer-url` as a query parameter: diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index f52efa1b13..dc19de5576 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -14,7 +14,6 @@ use forester_utils::{ forester_epoch::{get_epoch_phases, Epoch, ForesterSlot, TreeAccounts, TreeForesterSchedule}, rpc_pool::SolanaRpcPool, }; -use futures::future::join_all; use light_client::{ indexer::{Indexer, MerkleProof, NewAddressProofWithContext}, rpc::{LightClient, LightClientConfig, RetryConfig, Rpc, RpcError}, @@ -39,7 +38,7 @@ use solana_sdk::{ transaction::TransactionError, }; use tokio::{ - sync::{mpsc, oneshot, Mutex}, + sync::{mpsc, oneshot, watch, Mutex}, task::JoinHandle, time::{sleep, Instant, MissedTickBehavior}, }; @@ -94,8 +93,6 @@ type StateBatchProcessorMap = type AddressBatchProcessorMap = Arc>>)>>; type ProcessorInitLockMap = Arc>>>; -type TreeProcessingTask = JoinHandle>; - /// Coordinates re-finalization across parallel `process_queue` tasks when new /// foresters register mid-epoch. Only one task performs the on-chain /// `finalize_registration` tx; others wait for it to complete. @@ -280,6 +277,10 @@ pub struct EpochManager { run_id: Arc, /// Per-epoch registration trackers to coordinate re-finalization when new foresters register mid-epoch registration_trackers: Arc>>, + /// Set to `true` by `request_shutdown`; `run` observes it via a + /// `watch::Receiver` so a request that arrives before the subscriber + /// is still picked up on the next loop iteration. + shutdown_tx: watch::Sender, } impl Clone for EpochManager { @@ -310,6 +311,7 @@ impl Clone for EpochManager { heartbeat: self.heartbeat.clone(), run_id: self.run_id.clone(), registration_trackers: self.registration_trackers.clone(), + shutdown_tx: self.shutdown_tx.clone(), } } } @@ -359,23 +361,28 @@ impl EpochManager { heartbeat, run_id: Arc::::from(run_id), registration_trackers: Arc::new(DashMap::new()), + shutdown_tx: watch::channel(false).0, }) } + fn request_shutdown(&self) { + let _ = self.shutdown_tx.send(true); + } + pub async fn run(self: Arc) -> Result<()> { let (tx, mut rx) = mpsc::channel(100); let tx = Arc::new(tx); let mut monitor_handle = tokio::spawn({ - let self_clone = Arc::clone(&self); - let tx_clone = Arc::clone(&tx); + let self_clone = self.clone(); + let tx_clone = tx.clone(); async move { self_clone.monitor_epochs(tx_clone).await } }); // Process current and previous epochs let current_previous_handle = tokio::spawn({ - let self_clone = Arc::clone(&self); - let tx_clone = Arc::clone(&tx); + let self_clone = self.clone(); + let tx_clone = tx.clone(); async move { self_clone .process_current_and_previous_epochs(tx_clone) @@ -384,35 +391,80 @@ impl EpochManager { }); let tree_discovery_handle = tokio::spawn({ - let self_clone = Arc::clone(&self); + let self_clone = self.clone(); async move { self_clone.discover_trees_periodically().await } }); let balance_check_handle = tokio::spawn({ - let self_clone = Arc::clone(&self); + let self_clone = self.clone(); async move { self_clone.check_sol_balance_periodically().await } }); + // Abort all background tasks (including monitor) on any exit path, + // not just a clean break out of the loop. `monitor_handle` is still + // polled in the select! below — only its `AbortHandle` lives here. let _guard = scopeguard::guard( ( + monitor_handle.abort_handle(), current_previous_handle, tree_discovery_handle, balance_check_handle, ), - |(h2, h3, h4)| { + |(monitor, h2, h3, h4)| { info!( event = "background_tasks_aborting", run_id = %self.run_id, "Aborting EpochManager background tasks" ); + monitor.abort(); h2.abort(); h3.abort(); h4.abort(); }, ); + let mut shutdown_rx = self.shutdown_tx.subscribe(); + let mut epoch_tasks = tokio::task::JoinSet::new(); let result = loop { + // Check the flag at the top of the loop so a shutdown requested + // before we subscribed (or between iterations) is observed + // without depending on `changed()` firing. + if *shutdown_rx.borrow_and_update() { + info!( + event = "epoch_manager_shutdown_requested", + run_id = %self.run_id, + "Stopping EpochManager after shutdown request" + ); + break Ok(()); + } + tokio::select! { + _ = shutdown_rx.changed() => {} + Some(join_result) = epoch_tasks.join_next() => { + match join_result { + Ok(Ok(())) => debug!( + event = "epoch_processing_completed", + run_id = %self.run_id, + "Epoch processed successfully" + ), + Ok(Err(e)) => error!( + event = "epoch_processing_failed", + run_id = %self.run_id, + error = ?e, + "Error processing epoch" + ), + Err(join_error) => { + if join_error.is_panic() { + error!( + event = "epoch_processing_panicked", + run_id = %self.run_id, + error = %join_error, + "Epoch processing panicked" + ); + } + } + } + } epoch_opt = rx.recv() => { match epoch_opt { Some(epoch) => { @@ -422,17 +474,9 @@ impl EpochManager { epoch, "Received epoch from monitor" ); - let self_clone = Arc::clone(&self); - tokio::spawn(async move { - if let Err(e) = self_clone.process_epoch(epoch).await { - error!( - event = "epoch_processing_failed", - run_id = %self_clone.run_id, - epoch, - error = ?e, - "Error processing epoch" - ); - } + let self_clone = self.clone(); + epoch_tasks.spawn(async move { + self_clone.process_epoch(epoch).await }); } None => { @@ -486,8 +530,7 @@ impl EpochManager { } }; - // Abort monitor_handle on exit - monitor_handle.abort(); + // `_guard` aborts monitor_handle and the other background tasks. result } @@ -641,7 +684,7 @@ impl EpochManager { } } - async fn add_new_tree(&self, new_tree: TreeAccounts) -> Result<()> { + async fn add_new_tree(self: &Arc, new_tree: TreeAccounts) -> Result<()> { info!( event = "new_tree_add_started", run_id = %self.run_id, @@ -701,7 +744,7 @@ impl EpochManager { )?; epoch_info.trees.push(tree_schedule.clone()); - let self_clone = Arc::new(self.clone()); + let self_clone = self.clone(); let tracker = self .registration_trackers .entry(current_epoch) @@ -1037,7 +1080,7 @@ impl EpochManager { } #[instrument(level = "debug", skip(self), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch))] - async fn process_epoch(&self, epoch: u64) -> Result<()> { + async fn process_epoch(self: Arc, epoch: u64) -> Result<()> { // Clone the Arc immediately to release the DashMap shard lock. // Without .clone(), the RefMut guard would be held across async operations, // blocking other epochs from accessing the DashMap if they hash to the same shard. @@ -1115,7 +1158,7 @@ impl EpochManager { // Perform work if self.sync_slot().await? < phases.active.end { - self.perform_active_work(®istration_info).await?; + self.clone().perform_active_work(®istration_info).await?; } // Wait for report work phase if self.sync_slot().await? < phases.report_work.start { @@ -1589,7 +1632,7 @@ impl EpochManager { skip(self, epoch_info), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch_info.epoch.epoch ))] - async fn perform_active_work(&self, epoch_info: &ForesterEpochInfo) -> Result<()> { + async fn perform_active_work(self: Arc, epoch_info: &ForesterEpochInfo) -> Result<()> { self.heartbeat.increment_active_cycle(); let current_slot = self.slot_tracker.estimated_current_slot(); @@ -1615,6 +1658,17 @@ impl EpochManager { .cloned() .collect(); + if trees_to_process.is_empty() { + debug!( + event = "active_work_cycle_no_trees", + run_id = %self.run_id, + "No trees to process this cycle" + ); + let mut rpc = self.rpc_pool.get_connection().await?; + wait_until_slot_reached(&mut *rpc, &self.slot_tracker, active_phase_end).await?; + return Ok(()); + } + info!( event = "active_work_cycle_started", run_id = %self.run_id, @@ -1624,7 +1678,6 @@ impl EpochManager { "Starting active work cycle" ); - let self_arc = Arc::new(self.clone()); let registration_tracker = self .registration_trackers .entry(epoch_info.epoch.epoch) @@ -1636,7 +1689,7 @@ impl EpochManager { .value() .clone(); - let mut handles: Vec = Vec::with_capacity(trees_to_process.len()); + let mut tree_tasks = tokio::task::JoinSet::new(); for tree in trees_to_process { debug!( @@ -1648,27 +1701,22 @@ impl EpochManager { ); self.heartbeat.add_tree_tasks_spawned(1); - let self_clone = self_arc.clone(); + let self_clone = self.clone(); let epoch_clone = epoch_info.epoch.clone(); let forester_epoch_pda = epoch_info.forester_epoch_pda.clone(); let tracker = registration_tracker.clone(); - - let handle = tokio::spawn(async move { + tree_tasks.spawn(async move { self_clone .process_queue(&epoch_clone, forester_epoch_pda, tree, tracker) .await }); - - handles.push(handle); } - debug!("Waiting for {} tree processing tasks", handles.len()); - let results = join_all(handles).await; let mut success_count = 0usize; let mut error_count = 0usize; let mut panic_count = 0usize; - for result in results { - match result { + while let Some(join_result) = tree_tasks.join_next().await { + match join_result { Ok(Ok(())) => success_count += 1, Ok(Err(e)) => { error_count += 1; @@ -1679,14 +1727,16 @@ impl EpochManager { "Error processing queue" ); } - Err(e) => { - panic_count += 1; - error!( - event = "tree_processing_task_panicked", - run_id = %self.run_id, - error = ?e, - "Tree processing task panicked" - ); + Err(join_error) => { + if join_error.is_panic() { + panic_count += 1; + error!( + event = "tree_processing_task_panicked", + run_id = %self.run_id, + error = %join_error, + "Tree processing task panicked" + ); + } } } } @@ -4509,16 +4559,20 @@ pub async fn run_service( retry_count + 1 ); + let run_future = epoch_manager.clone().run(); + tokio::pin!(run_future); + let result = tokio::select! { - result = epoch_manager.run() => result, - _ = shutdown => { + result = &mut run_future => result, + _ = &mut shutdown => { info!( event = "shutdown_received", run_id = %run_id_for_logs, phase = "service_run", "Received shutdown signal. Stopping the service." ); - Ok(()) + epoch_manager.request_shutdown(); + run_future.await } }; @@ -4810,4 +4864,15 @@ mod tests { assert_eq!(report.metrics.total_proof_generation().as_secs(), 7); assert_eq!(report.metrics.total_round_trip().as_secs(), 27); } + + #[tokio::test] + async fn watch_shutdown_observed_after_request() { + // Mirrors the run-loop pattern: subscribe, then check via + // borrow_and_update so a value set before subscribe is still seen. + let (tx, _initial_rx) = watch::channel(false); + tx.send(true).expect("send"); + + let mut rx = tx.subscribe(); + assert!(*rx.borrow_and_update()); + } } diff --git a/forester/src/metrics.rs b/forester/src/metrics.rs index a7be12dd83..ece7d6a577 100644 --- a/forester/src/metrics.rs +++ b/forester/src/metrics.rs @@ -440,21 +440,19 @@ pub async fn metrics_handler() -> Result { if let Err(e) = encoder.encode(®ISTRY.gather(), &mut buffer) { error!("could not encode custom metrics: {}", e); }; - let mut res = String::from_utf8(buffer.clone()).unwrap_or_else(|e| { + let mut res = String::from_utf8(buffer).unwrap_or_else(|e| { error!("custom metrics could not be from_utf8'd: {}", e); String::new() }); - buffer.clear(); let mut buffer = Vec::new(); if let Err(e) = encoder.encode(&prometheus::gather(), &mut buffer) { error!("could not encode prometheus metrics: {}", e); }; - let res_prometheus = String::from_utf8(buffer.clone()).unwrap_or_else(|e| { + let res_prometheus = String::from_utf8(buffer).unwrap_or_else(|e| { error!("prometheus metrics could not be from_utf8'd: {}", e); String::new() }); - buffer.clear(); res.push_str(&res_prometheus); Ok(res) diff --git a/forester/src/priority_fee.rs b/forester/src/priority_fee.rs index c788712e1c..0b9ee4049c 100644 --- a/forester/src/priority_fee.rs +++ b/forester/src/priority_fee.rs @@ -208,7 +208,7 @@ pub async fn request_priority_fee_estimate( .map_err(|error| PriorityFeeEstimateError::ClientBuild(error.clone()))?; let response = http_client - .post(url.clone()) + .post(url.as_str()) .header("Content-Type", "application/json") .json(&rpc_request) .send() diff --git a/forester/src/processor/v2/helpers.rs b/forester/src/processor/v2/helpers.rs index a2a9ad7506..5173ae16c0 100644 --- a/forester/src/processor/v2/helpers.rs +++ b/forester/src/processor/v2/helpers.rs @@ -11,7 +11,10 @@ use light_client::{ }; use light_hasher::hash_chain::create_hash_chain_from_slice; -use crate::processor::v2::{common::clamp_to_u16, BatchContext}; +use crate::{ + logging::should_emit_rate_limited_warning, + processor::v2::{common::clamp_to_u16, BatchContext}, +}; pub(crate) fn lock_recover<'a, T>(mutex: &'a Mutex, name: &'static str) -> MutexGuard<'a, T> { match mutex.lock() { @@ -496,71 +499,57 @@ impl StreamingAddressQueue { if available < end || start >= end { return Ok(None); } - let actual_end = end; let data = lock_recover(&self.data, "streaming_address_queue.data"); - let min_len = [ - data.addresses.len(), - data.low_element_values.len(), - data.low_element_next_values.len(), - data.low_element_indices.len(), - data.low_element_next_indices.len(), - ] - .into_iter() - .min() - .unwrap_or(0); - if min_len < actual_end { - return Err(anyhow!( - "incomplete batch data: min field length {} < required end {}", - min_len, - actual_end - )); - } - - let addresses = data.addresses[start..actual_end].to_vec(); - if addresses.is_empty() { - return Ok(None); - } - let expected_len = addresses.len(); - let Some(low_element_values) = data - .low_element_values - .get(start..end) - .map(|slice| slice.to_vec()) - else { - return Ok(None); - }; - let Some(low_element_next_values) = data - .low_element_next_values - .get(start..end) - .map(|slice| slice.to_vec()) - else { - return Ok(None); - }; - let Some(low_element_indices) = data - .low_element_indices - .get(start..end) - .map(|slice| slice.to_vec()) + // `available` can be bumped before every parallel array is filled, + // so a missing range here means "not ready yet" — return Ok(None) + // and let the caller retry on the next tick. + let range = start..end; + let ( + Some(addresses), + Some(low_element_values), + Some(low_element_next_values), + Some(low_element_indices), + Some(low_element_next_indices), + ) = ( + data.addresses.get(range.clone()).map(<[_]>::to_vec), + data.low_element_values + .get(range.clone()) + .map(<[_]>::to_vec), + data.low_element_next_values + .get(range.clone()) + .map(<[_]>::to_vec), + data.low_element_indices + .get(range.clone()) + .map(<[_]>::to_vec), + data.low_element_next_indices + .get(range.clone()) + .map(<[_]>::to_vec), + ) else { return Ok(None); }; - let Some(low_element_next_indices) = data - .low_element_next_indices - .get(start..end) - .map(|slice| slice.to_vec()) - else { - return Ok(None); + + // Proofs can also be unavailable if the indexer hasn't populated the + // merkle nodes for this range yet — return Ok(None) and retry. Log + // at warn (rate-limited) so persistent failures are still visible. + let low_element_proofs = match data.reconstruct_proofs::(range) { + Ok(proofs) => proofs, + Err(error) => { + if should_emit_rate_limited_warning( + "address_queue_proofs_not_ready", + std::time::Duration::from_secs(60), + ) { + tracing::warn!( + ?error, + start, + end, + "address proof reconstruction not ready, retrying" + ); + } + return Ok(None); + } }; - if [ - low_element_values.len(), - low_element_next_values.len(), - low_element_indices.len(), - low_element_next_indices.len(), - ] - .iter() - .any(|&len| len != expected_len) - { - return Ok(None); - } let leaves_hashchain = match data.leaves_hash_chains.get(hashchain_idx).copied() { Some(hashchain) => hashchain, @@ -582,15 +571,11 @@ impl StreamingAddressQueue { }; Ok(Some(AddressBatchSnapshot { - low_element_values: data.low_element_values[start..actual_end].to_vec(), - low_element_next_values: data.low_element_next_values[start..actual_end].to_vec(), - low_element_indices: data.low_element_indices[start..actual_end].to_vec(), - low_element_next_indices: data.low_element_next_indices[start..actual_end].to_vec(), - low_element_proofs: data - .reconstruct_proofs::(start..actual_end) - .map_err(|error| { - anyhow!("incomplete batch data: failed to reconstruct proofs: {error}") - })?, + low_element_values, + low_element_next_values, + low_element_indices, + low_element_next_indices, + low_element_proofs, addresses, leaves_hashchain, })) diff --git a/forester/src/processor/v2/proof_worker.rs b/forester/src/processor/v2/proof_worker.rs index 603fa3f19b..72b9d318ae 100644 --- a/forester/src/processor/v2/proof_worker.rs +++ b/forester/src/processor/v2/proof_worker.rs @@ -139,19 +139,19 @@ impl ProofClients { config.polling_interval, config.max_wait_time, config.api_key.clone(), - )?, + ), nullify_client: ProofClient::with_config( config.update_url.clone(), config.polling_interval, config.max_wait_time, config.api_key.clone(), - )?, + ), address_append_client: ProofClient::with_config( config.address_append_url.clone(), config.polling_interval, config.max_wait_time, config.api_key.clone(), - )?, + ), }) } diff --git a/forester/tests/e2e_test.rs b/forester/tests/e2e_test.rs index 1727ed108b..9314b818bd 100644 --- a/forester/tests/e2e_test.rs +++ b/forester/tests/e2e_test.rs @@ -227,16 +227,9 @@ async fn e2e_test() { slot_update_interval_seconds: 10, tree_discovery_interval_seconds: 5, enable_metrics: false, - skip_v1_state_trees: false, - skip_v2_state_trees: false, - skip_v1_address_trees: false, - skip_v2_address_trees: false, - tree_ids: vec![], sleep_after_processing_ms: 50, sleep_when_idle_ms: 100, - queue_polling_mode: Default::default(), - group_authority: None, - helius_rpc: false, + ..Default::default() }, rpc_pool_config: RpcPoolConfig { max_size: 50, diff --git a/forester/tests/legacy/batched_state_async_indexer_test.rs b/forester/tests/legacy/batched_state_async_indexer_test.rs index fe599a39a8..cffd8c92a2 100644 --- a/forester/tests/legacy/batched_state_async_indexer_test.rs +++ b/forester/tests/legacy/batched_state_async_indexer_test.rs @@ -23,9 +23,7 @@ use light_compressed_account::{ use light_compressed_token::process_transfer::{ transfer_sdk::create_transfer_instruction, TokenTransferOutputData, }; -use light_token::compat::TokenDataWithMerkleContext; use light_program_test::accounts::test_accounts::TestAccounts; -use light_prover_client::prover::spawn_prover; use light_registry::{ protocol_config::state::{ProtocolConfig, ProtocolConfigPda}, utils::get_protocol_config_pda_address, @@ -34,6 +32,7 @@ use light_test_utils::{ conversions::sdk_to_program_token_data, spl::create_mint_helper_with_keypair, system_program::create_invoke_instruction, }; +use light_token::compat::TokenDataWithMerkleContext; use rand::{prelude::SliceRandom, rngs::StdRng, Rng, SeedableRng}; use serial_test::serial; use solana_program::{native_token::LAMPORTS_PER_SOL, pubkey::Pubkey}; @@ -87,7 +86,6 @@ async fn test_state_indexer_async_batched() { validator_args: vec![], })) .await; - spawn_prover().await; let env = TestAccounts::get_local_test_validator_accounts(); let mut config = forester_config(); @@ -306,10 +304,7 @@ async fn wait_for_slot(rpc: &mut LightClient, target_slot: u64) { return; } Err(e) => { - println!( - "warp_to_slot unavailable ({}), falling back to polling", - e - ); + println!("warp_to_slot unavailable ({}), falling back to polling", e); } } while rpc.get_slot().await.unwrap() < target_slot { diff --git a/forester/tests/test_utils.rs b/forester/tests/test_utils.rs index 4225503a19..e429bc16a8 100644 --- a/forester/tests/test_utils.rs +++ b/forester/tests/test_utils.rs @@ -105,19 +105,12 @@ pub fn forester_config() -> ForesterConfig { indexer_config: Default::default(), transaction_config: Default::default(), general_config: GeneralConfig { - slot_update_interval_seconds: 10, tree_discovery_interval_seconds: 5, enable_metrics: false, - skip_v1_state_trees: false, - skip_v2_state_trees: false, - skip_v1_address_trees: false, - skip_v2_address_trees: false, - tree_ids: vec![], sleep_after_processing_ms: 50, sleep_when_idle_ms: 100, queue_polling_mode: QueuePollingMode::OnChain, - group_authority: None, - helius_rpc: false, + ..Default::default() }, rpc_pool_config: RpcPoolConfig { max_size: 50, diff --git a/prover/client/src/errors.rs b/prover/client/src/errors.rs index e095bf3579..859cae32b8 100644 --- a/prover/client/src/errors.rs +++ b/prover/client/src/errors.rs @@ -39,7 +39,6 @@ pub enum ProverClientError { #[error("Integer conversion failed: {0}")] IntegerConversion(String), - #[error("Hashchain mismatch: computed {computed:?} != expected {expected:?} (batch_size={batch_size}, next_index={next_index})")] HashchainMismatch { computed: [u8; 32], diff --git a/prover/client/src/helpers.rs b/prover/client/src/helpers.rs index 9a20b8958e..f1d77b3bfa 100644 --- a/prover/client/src/helpers.rs +++ b/prover/client/src/helpers.rs @@ -2,7 +2,7 @@ use std::process::Command; use light_hasher::{Hasher, Poseidon}; use light_sparse_merkle_tree::changelog::ChangelogEntry; -use num_bigint::{BigInt, BigUint}; +use num_bigint::{BigInt, BigUint, Sign}; use num_traits::{Num, ToPrimitive}; use serde::Serialize; @@ -35,10 +35,17 @@ pub fn convert_endianness_128(bytes: &[u8]) -> Vec { .collect::>() } -pub fn bigint_to_u8_32(n: &BigInt) -> Result<[u8; 32], Box> { - let (_, bytes_be) = n.to_bytes_be(); +pub fn bigint_to_u8_32(n: &BigInt) -> Result<[u8; 32], ProverClientError> { + let (sign, bytes_be) = n.to_bytes_be(); + if sign == Sign::Minus { + return Err(ProverClientError::InvalidProofData( + "negative integers are not valid field elements".to_string(), + )); + } if bytes_be.len() > 32 { - Err("Number too large to fit in [u8; 32]")?; + return Err(ProverClientError::InvalidProofData( + "number too large to fit in [u8; 32]".to_string(), + )); } let mut array = [0; 32]; let bytes = &bytes_be[..bytes_be.len()]; diff --git a/prover/client/src/proof_client.rs b/prover/client/src/proof_client.rs index a4f0e27aba..e6c5008c39 100644 --- a/prover/client/src/proof_client.rs +++ b/prover/client/src/proof_client.rs @@ -84,21 +84,21 @@ impl ProofClient { polling_interval: Duration, max_wait_time: Duration, api_key: Option, - ) -> Result { + ) -> Self { let initial_poll_delay = if api_key.is_some() { Duration::from_millis(INITIAL_POLL_DELAY_LARGE_CIRCUIT_MS) } else { Duration::from_millis(INITIAL_POLL_DELAY_SMALL_CIRCUIT_MS) }; - Ok(Self { + Self { client: build_http_client(), server_address, polling_interval, max_wait_time, api_key, initial_poll_delay, - }) + } } #[allow(unused)] diff --git a/prover/client/src/proof_types/batch_address_append/proof_inputs.rs b/prover/client/src/proof_types/batch_address_append/proof_inputs.rs index 477ee73140..bf414b160d 100644 --- a/prover/client/src/proof_types/batch_address_append/proof_inputs.rs +++ b/prover/client/src/proof_types/batch_address_append/proof_inputs.rs @@ -93,7 +93,6 @@ pub struct BatchAddressAppendInputs { pub hashchain_hash: BigUint, pub low_element_values: Vec, pub low_element_indices: Vec, - pub low_element_next_indices: Vec, pub low_element_next_values: Vec, pub low_element_proofs: Vec>, pub new_element_values: Vec, @@ -105,84 +104,6 @@ pub struct BatchAddressAppendInputs { pub tree_height: usize, } -impl BatchAddressAppendInputs { - #[allow(clippy::too_many_arguments)] - pub fn new( - batch_size: usize, - leaves_hashchain: [u8; 32], - low_element_values: &[[u8; 32]], - low_element_indices: &[u64], - low_element_next_indices: &[u64], - low_element_next_values: &[[u8; 32]], - low_element_proofs: Vec>, - new_element_values: &[[u8; 32]], - new_element_proofs: Vec>, - new_root: [u8; 32], - old_root: [u8; 32], - start_index: usize, - ) -> Result { - let hash_chain_inputs = [ - old_root, - new_root, - leaves_hashchain, - bigint_to_be_bytes_array::<32>(&start_index.into())?, - ]; - let public_input_hash = create_hash_chain_from_array(hash_chain_inputs)?; - - let low_element_proofs_bigint: Vec> = low_element_proofs - .into_iter() - .map(|proof| { - proof - .into_iter() - .map(|p| BigUint::from_bytes_be(&p)) - .collect() - }) - .collect(); - - let new_element_proofs_bigint: Vec> = new_element_proofs - .into_iter() - .map(|proof| { - proof - .into_iter() - .map(|p| BigUint::from_bytes_be(&p)) - .collect() - }) - .collect(); - - Ok(Self { - batch_size, - hashchain_hash: BigUint::from_bytes_be(&leaves_hashchain), - low_element_values: low_element_values - .iter() - .map(|v| BigUint::from_bytes_be(v)) - .collect(), - low_element_indices: low_element_indices - .iter() - .map(|&i| BigUint::from(i)) - .collect(), - low_element_next_indices: low_element_next_indices - .iter() - .map(|&i| BigUint::from(i)) - .collect(), - low_element_next_values: low_element_next_values - .iter() - .map(|v| BigUint::from_bytes_be(v)) - .collect(), - low_element_proofs: low_element_proofs_bigint, - new_element_values: new_element_values - .iter() - .map(|v| BigUint::from_bytes_be(v)) - .collect(), - new_element_proofs: new_element_proofs_bigint, - new_root: BigUint::from_bytes_be(&new_root), - old_root: BigUint::from_bytes_be(&old_root), - public_input_hash: BigUint::from_bytes_be(&public_input_hash), - start_index, - tree_height: HEIGHT, - }) - } -} - #[allow(clippy::too_many_arguments)] pub fn get_batch_address_append_circuit_inputs( next_index: usize, @@ -199,32 +120,31 @@ pub fn get_batch_address_append_circuit_inputs( changelog: &mut Vec>, indexed_changelog: &mut Vec>, ) -> Result { - if zkp_batch_size > new_element_values.len() - || zkp_batch_size > low_element_values.len() - || zkp_batch_size > low_element_indices.len() - || zkp_batch_size > low_element_next_indices.len() - || zkp_batch_size > low_element_next_values.len() - || zkp_batch_size > low_element_proofs.len() - { - return Err(ProverClientError::GenericError(format!( - "zkp_batch_size {} exceeds input slice lengths \ - (new_element_values={}, low_element_values={}, low_element_indices={}, \ - low_element_next_indices={}, low_element_next_values={}, low_element_proofs={})", - zkp_batch_size, - new_element_values.len(), - low_element_values.len(), - low_element_indices.len(), - low_element_next_indices.len(), - low_element_next_values.len(), - low_element_proofs.len(), - ))); + for (name, len) in [ + ("new_element_values", new_element_values.len()), + ("low_element_values", low_element_values.len()), + ("low_element_next_values", low_element_next_values.len()), + ("low_element_indices", low_element_indices.len()), + ("low_element_next_indices", low_element_next_indices.len()), + ("low_element_proofs", low_element_proofs.len()), + ] { + if len < zkp_batch_size { + return Err(ProverClientError::GenericError(format!( + "truncated batch from indexer: {} len {} < required batch size {}", + name, len, zkp_batch_size + ))); + } } + let new_element_values = &new_element_values[..zkp_batch_size]; + let mut staged_changelog = changelog.clone(); + let mut staged_indexed_changelog = indexed_changelog.clone(); + let mut staged_sparse_merkle_tree = sparse_merkle_tree.clone(); + let initial_changelog_len = staged_changelog.len(); let mut new_root = [0u8; 32]; let mut low_element_circuit_merkle_proofs = Vec::with_capacity(zkp_batch_size); let mut new_element_circuit_merkle_proofs = Vec::with_capacity(zkp_batch_size); let mut patched_low_element_next_values = Vec::with_capacity(zkp_batch_size); - let mut patched_low_element_next_indices = Vec::with_capacity(zkp_batch_size); let mut patched_low_element_values = Vec::with_capacity(zkp_batch_size); let mut patched_low_element_indices = Vec::with_capacity(zkp_batch_size); @@ -256,11 +176,6 @@ pub fn get_batch_address_append_circuit_inputs( next_index ); - let mut staged_changelog = changelog.clone(); - let mut staged_indexed_changelog = indexed_changelog.clone(); - let mut staged_sparse_merkle_tree = sparse_merkle_tree.clone(); - let initial_changelog_len = staged_changelog.len(); - let mut patcher = ChangelogProofPatcher::new::(&staged_changelog); let is_first_batch = staged_indexed_changelog.is_empty(); @@ -313,7 +228,6 @@ pub fn get_batch_address_append_circuit_inputs( })?; patched_low_element_next_values .push(bigint_to_be_bytes_array::<32>(&low_element_next_value)?); - patched_low_element_next_indices.push(low_element.next_index()); patched_low_element_indices.push(low_element.index); patched_low_element_values.push(bigint_to_be_bytes_array::<32>(&low_element.value)?); @@ -569,10 +483,6 @@ pub fn get_batch_address_append_circuit_inputs( .iter() .map(|&i| BigUint::from(i)) .collect(), - low_element_next_indices: patched_low_element_next_indices - .iter() - .map(|&i| BigUint::from(i)) - .collect(), low_element_next_values: patched_low_element_next_values .iter() .map(|v| BigUint::from_bytes_be(v)) diff --git a/prover/client/src/proof_types/combined/v2/json.rs b/prover/client/src/proof_types/combined/v2/json.rs index 322a5ee8ec..71de6b2ed3 100644 --- a/prover/client/src/proof_types/combined/v2/json.rs +++ b/prover/client/src/proof_types/combined/v2/json.rs @@ -2,6 +2,7 @@ use serde::Serialize; use crate::{ constants::{DEFAULT_BATCH_ADDRESS_TREE_HEIGHT, DEFAULT_BATCH_STATE_TREE_HEIGHT}, + errors::ProverClientError, helpers::{big_int_to_string, create_json_from_struct}, proof_types::{ circuit_type::CircuitType, @@ -29,21 +30,22 @@ pub struct CombinedJsonStruct { } impl CombinedJsonStruct { - pub fn from_combined_inputs(inputs: &CombinedProofInputs) -> Self { + pub fn from_combined_inputs(inputs: &CombinedProofInputs) -> Result { let inclusion_parameters = BatchInclusionJsonStruct::from_inclusion_proof_inputs(&inputs.inclusion_parameters); - let non_inclusion_parameters = BatchNonInclusionJsonStruct::from_non_inclusion_proof_inputs( - &inputs.non_inclusion_parameters, - ); + let non_inclusion_parameters = + BatchNonInclusionJsonStruct::from_non_inclusion_proof_inputs( + &inputs.non_inclusion_parameters, + )?; - Self { + Ok(Self { circuit_type: CircuitType::Combined.to_string(), state_tree_height: DEFAULT_BATCH_STATE_TREE_HEIGHT, address_tree_height: DEFAULT_BATCH_ADDRESS_TREE_HEIGHT, public_input_hash: big_int_to_string(&inputs.public_input_hash), inclusion: inclusion_parameters.inputs, non_inclusion: non_inclusion_parameters.inputs, - } + }) } #[allow(clippy::inherent_to_string)] diff --git a/prover/client/src/proof_types/non_inclusion/v2/json.rs b/prover/client/src/proof_types/non_inclusion/v2/json.rs index f6174e724d..9a556843af 100644 --- a/prover/client/src/proof_types/non_inclusion/v2/json.rs +++ b/prover/client/src/proof_types/non_inclusion/v2/json.rs @@ -2,6 +2,7 @@ use num_traits::ToPrimitive; use serde::Serialize; use crate::{ + errors::ProverClientError, helpers::{big_int_to_string, create_json_from_struct}, proof_types::{circuit_type::CircuitType, non_inclusion::v2::NonInclusionProofInputs}, }; @@ -24,7 +25,7 @@ pub struct NonInclusionJsonStruct { pub value: String, #[serde(rename(serialize = "pathIndex"))] - pub path_index: u32, + pub path_index: u64, #[serde(rename(serialize = "pathElements"))] pub path_elements: Vec, @@ -42,13 +43,23 @@ impl BatchNonInclusionJsonStruct { create_json_from_struct(&self) } - pub fn from_non_inclusion_proof_inputs(inputs: &NonInclusionProofInputs) -> Self { + pub fn from_non_inclusion_proof_inputs( + inputs: &NonInclusionProofInputs, + ) -> Result { let mut proof_inputs: Vec = Vec::new(); for input in inputs.inputs.iter() { let prof_input = NonInclusionJsonStruct { root: big_int_to_string(&input.root), value: big_int_to_string(&input.value), - path_index: input.index_hashed_indexed_element_leaf.to_u32().unwrap(), + path_index: input + .index_hashed_indexed_element_leaf + .to_u64() + .ok_or_else(|| { + ProverClientError::IntegerConversion(format!( + "failed to convert path index {} to u64", + input.index_hashed_indexed_element_leaf + )) + })?, path_elements: input .merkle_proof_hashed_indexed_element_leaf .iter() @@ -60,11 +71,11 @@ impl BatchNonInclusionJsonStruct { proof_inputs.push(prof_input); } - Self { + Ok(Self { circuit_type: CircuitType::NonInclusion.to_string(), address_tree_height: 40, public_input_hash: big_int_to_string(&inputs.public_input_hash), inputs: proof_inputs, - } + }) } } diff --git a/prover/client/tests/batch_address_append.rs b/prover/client/tests/batch_address_append.rs index 7b8ceaa5f9..8e354db75a 100644 --- a/prover/client/tests/batch_address_append.rs +++ b/prover/client/tests/batch_address_append.rs @@ -212,7 +212,6 @@ pub fn get_test_batch_address_append_inputs( let mut low_element_values = Vec::new(); let mut low_element_indices = Vec::new(); - let mut low_element_next_indices = Vec::new(); let mut low_element_next_values = Vec::new(); let mut low_element_proofs = Vec::new(); let mut new_element_values = Vec::new(); @@ -230,7 +229,6 @@ pub fn get_test_batch_address_append_inputs( &non_inclusion_proof.leaf_lower_range_value, )); low_element_indices.push(non_inclusion_proof.leaf_index.into()); - low_element_next_indices.push(non_inclusion_proof.next_index.into()); low_element_next_values.push(BigUint::from_bytes_be( &non_inclusion_proof.leaf_higher_range_value, )); @@ -275,7 +273,6 @@ pub fn get_test_batch_address_append_inputs( hashchain_hash: BigUint::from_bytes_be(&leaves_hashchain), low_element_values, low_element_indices, - low_element_next_indices, low_element_next_values, low_element_proofs, new_element_values, diff --git a/prover/client/tests/init_merkle_tree.rs b/prover/client/tests/init_merkle_tree.rs index 3bb5584cd3..1cba92ad6b 100644 --- a/prover/client/tests/init_merkle_tree.rs +++ b/prover/client/tests/init_merkle_tree.rs @@ -221,7 +221,7 @@ pub fn non_inclusion_new_with_public_inputs_v2( .collect(), path_index: merkle_inputs .index_hashed_indexed_element_leaf - .to_u32() + .to_u64() .unwrap(), leaf_lower_range_value: big_int_to_string(&merkle_inputs.leaf_lower_range_value), leaf_higher_range_value: big_int_to_string(&merkle_inputs.leaf_higher_range_value), diff --git a/prover/server/prover/common/types.go b/prover/server/prover/common/types.go index 65668a764e..2d077e77e7 100644 --- a/prover/server/prover/common/types.go +++ b/prover/server/prover/common/types.go @@ -14,7 +14,7 @@ const ( // JSON input structures (these are not in circuit_utils.go) type InclusionProofInputsJSON struct { Root string `json:"root"` - PathIndex uint32 `json:"pathIndex"` + PathIndex uint64 `json:"pathIndex"` PathElements []string `json:"pathElements"` Leaf string `json:"leaf"` } diff --git a/prover/server/prover/v1/inclusion_proving_system.go b/prover/server/prover/v1/inclusion_proving_system.go index ce5d30b253..a9f075f6fe 100644 --- a/prover/server/prover/v1/inclusion_proving_system.go +++ b/prover/server/prover/v1/inclusion_proving_system.go @@ -14,7 +14,7 @@ import ( type InclusionInputs struct { Root big.Int - PathIndex uint32 + PathIndex uint64 PathElements []big.Int Leaf big.Int } diff --git a/prover/server/prover/v1/marshal_non_inclusion.go b/prover/server/prover/v1/marshal_non_inclusion.go index 583a2348f5..336f928524 100644 --- a/prover/server/prover/v1/marshal_non_inclusion.go +++ b/prover/server/prover/v1/marshal_non_inclusion.go @@ -9,7 +9,7 @@ import ( type NonInclusionProofInputsJSON struct { Root string `json:"root"` Value string `json:"value"` - PathIndex uint32 `json:"pathIndex"` + PathIndex uint64 `json:"pathIndex"` PathElements []string `json:"pathElements"` LeafLowerRangeValue string `json:"leafLowerRangeValue"` LeafHigherRangeValue string `json:"leafHigherRangeValue"` diff --git a/prover/server/prover/v1/non_inclusion_proving_system.go b/prover/server/prover/v1/non_inclusion_proving_system.go index b31d800e93..b79fe9c8c2 100644 --- a/prover/server/prover/v1/non_inclusion_proving_system.go +++ b/prover/server/prover/v1/non_inclusion_proving_system.go @@ -15,7 +15,7 @@ import ( type NonInclusionInputs struct { Root big.Int Value big.Int - PathIndex uint32 + PathIndex uint64 PathElements []big.Int LeafLowerRangeValue big.Int diff --git a/prover/server/prover/v1/non_inclusion_test.go b/prover/server/prover/v1/non_inclusion_test.go index 48f77f0669..8a50708b92 100644 --- a/prover/server/prover/v1/non_inclusion_test.go +++ b/prover/server/prover/v1/non_inclusion_test.go @@ -226,7 +226,7 @@ func TestNonInclusionCircuit(t *testing.T) { LeafLowerRangeValue: *leafLowerRangeValue, LeafHigherRangeValue: *leafHigherRangeValue, NextIndex: uint32(0), - PathIndex: uint32(pathIndex), + PathIndex: uint64(pathIndex), PathElements: pathElements, } diff --git a/prover/server/prover/v1/test_data_helpers.go b/prover/server/prover/v1/test_data_helpers.go index f888422b02..f93d58ec00 100644 --- a/prover/server/prover/v1/test_data_helpers.go +++ b/prover/server/prover/v1/test_data_helpers.go @@ -29,7 +29,7 @@ func BuildTestTree(depth int, numberOfCompressedAccounts int, random bool) Inclu for i := 0; i < numberOfCompressedAccounts; i++ { inputs[i].Leaf = *leaf - inputs[i].PathIndex = uint32(pathIndex) + inputs[i].PathIndex = uint64(pathIndex) inputs[i].PathElements = tree.Update(pathIndex, *leaf) inputs[i].Root = tree.Root.Value() } @@ -96,7 +96,7 @@ func BuildTestNonInclusionTree(depth int, numberOfCompressedAccounts int, random inputs[i].LeafLowerRangeValue = *leafLower inputs[i].LeafHigherRangeValue = *leafUpper inputs[i].NextIndex = uint32(0) // Set NextIndex explicitly - inputs[i].PathIndex = uint32(pathIndex) + inputs[i].PathIndex = uint64(pathIndex) inputs[i].PathElements = pathElements } diff --git a/prover/server/prover/v2/inclusion_proving_system.go b/prover/server/prover/v2/inclusion_proving_system.go index c0c9627d16..1b53448b66 100644 --- a/prover/server/prover/v2/inclusion_proving_system.go +++ b/prover/server/prover/v2/inclusion_proving_system.go @@ -16,7 +16,7 @@ import ( type InclusionInputs struct { Root big.Int - PathIndex uint32 + PathIndex uint64 PathElements []big.Int Leaf big.Int PublicInputHash big.Int diff --git a/prover/server/prover/v2/marshal_inclusion.go b/prover/server/prover/v2/marshal_inclusion.go index 560eed5b5f..cc1b8465c0 100644 --- a/prover/server/prover/v2/marshal_inclusion.go +++ b/prover/server/prover/v2/marshal_inclusion.go @@ -9,7 +9,7 @@ import ( type InclusionProofInputsJSON struct { Root string `json:"root"` - PathIndex uint32 `json:"pathIndex"` + PathIndex uint64 `json:"pathIndex"` PathElements []string `json:"pathElements"` Leaf string `json:"leaf"` } diff --git a/prover/server/prover/v2/marshal_non_inclusion.go b/prover/server/prover/v2/marshal_non_inclusion.go index 9a4e1e2977..a2ae2b46b9 100644 --- a/prover/server/prover/v2/marshal_non_inclusion.go +++ b/prover/server/prover/v2/marshal_non_inclusion.go @@ -10,7 +10,7 @@ import ( type NonInclusionProofInputsJSON struct { Root string `json:"root"` Value string `json:"value"` - PathIndex uint32 `json:"pathIndex"` + PathIndex uint64 `json:"pathIndex"` PathElements []string `json:"pathElements"` LeafLowerRangeValue string `json:"leafLowerRangeValue"` LeafHigherRangeValue string `json:"leafHigherRangeValue"` diff --git a/prover/server/prover/v2/non_inclusion_proving_system.go b/prover/server/prover/v2/non_inclusion_proving_system.go index 952ea922bd..380c8d56f8 100644 --- a/prover/server/prover/v2/non_inclusion_proving_system.go +++ b/prover/server/prover/v2/non_inclusion_proving_system.go @@ -17,7 +17,7 @@ import ( type NonInclusionInputs struct { Root big.Int Value big.Int - PathIndex uint32 + PathIndex uint64 PathElements []big.Int LeafLowerRangeValue big.Int diff --git a/prover/server/prover/v2/test_data_helpers.go b/prover/server/prover/v2/test_data_helpers.go index d6c670f082..09caeee544 100644 --- a/prover/server/prover/v2/test_data_helpers.go +++ b/prover/server/prover/v2/test_data_helpers.go @@ -31,7 +31,7 @@ func BuildTestTree(depth int, numberOfCompressedAccounts int, random bool) Inclu for i := 0; i < numberOfCompressedAccounts; i++ { inputs[i].Leaf = *leaf - inputs[i].PathIndex = uint32(pathIndex) + inputs[i].PathIndex = uint64(pathIndex) inputs[i].PathElements = tree.Update(pathIndex, *leaf) inputs[i].Root = tree.Root.Value() leaves[i] = leaf @@ -92,7 +92,7 @@ func BuildTestNonInclusionTree(depth int, numberOfCompressedAccounts int, random } inputs[i].Value = *value - inputs[i].PathIndex = uint32(pathIndex) + inputs[i].PathIndex = uint64(pathIndex) inputs[i].PathElements = tree.Update(pathIndex, *leaf) inputs[i].Root = tree.Root.Value() inputs[i].LeafLowerRangeValue = *leafLower diff --git a/sdk-libs/client/src/local_test_validator.rs b/sdk-libs/client/src/local_test_validator.rs index b27daa6a25..bc6f0817da 100644 --- a/sdk-libs/client/src/local_test_validator.rs +++ b/sdk-libs/client/src/local_test_validator.rs @@ -58,53 +58,55 @@ impl Default for LightValidatorConfig { pub async fn spawn_validator(config: LightValidatorConfig) { if let Some(project_root) = get_project_root() { - let command = "cli/test_bin/run test-validator"; - let mut command = format!("{}/{}", project_root.trim(), command); + let project_root = project_root.trim_end_matches(['\n', '\r']); + let executable = format!("{}/cli/test_bin/run", project_root); + let mut args = vec!["test-validator".to_string()]; if !config.enable_indexer { - command.push_str(" --skip-indexer"); + args.push("--skip-indexer".to_string()); } if let Some(limit_ledger_size) = config.limit_ledger_size { - command.push_str(&format!(" --limit-ledger-size {}", limit_ledger_size)); + args.push("--limit-ledger-size".to_string()); + args.push(limit_ledger_size.to_string()); } for sbf_program in config.sbf_programs.iter() { - command.push_str(&format!( - " --sbf-program {} {}", - sbf_program.0, sbf_program.1 - )); + args.push("--sbf-program".to_string()); + args.push(sbf_program.0.clone()); + args.push(sbf_program.1.clone()); } for upgradeable_program in config.upgradeable_programs.iter() { - command.push_str(&format!( - " --upgradeable-program {} {} {}", - upgradeable_program.program_id, - upgradeable_program.program_path, - upgradeable_program.upgrade_authority - )); + args.push("--upgradeable-program".to_string()); + args.push(upgradeable_program.program_id.clone()); + args.push(upgradeable_program.program_path.clone()); + args.push(upgradeable_program.upgrade_authority.clone()); } if !config.enable_prover { - command.push_str(" --skip-prover"); + args.push("--skip-prover".to_string()); } if config.use_surfpool { - command.push_str(" --use-surfpool"); + args.push("--use-surfpool".to_string()); } for arg in config.validator_args.iter() { - command.push_str(&format!(" {}", arg)); + args.push(arg.clone()); } - println!("Starting validator with command: {}", command); + println!( + "Starting validator with command: {} {}", + executable, + args.join(" ") + ); if config.use_surfpool { // The CLI starts surfpool, prover, and photon, then exits once all // services are ready. Wait for it to finish so we know everything // is up before the test proceeds. - let mut child = Command::new("sh") - .arg("-c") - .arg(command) + let mut child = Command::new(&executable) + .args(&args) .stdin(Stdio::null()) .stdout(Stdio::inherit()) .stderr(Stdio::inherit()) @@ -113,17 +115,21 @@ pub async fn spawn_validator(config: LightValidatorConfig) { let status = child.wait().await.expect("Failed to wait for CLI process"); assert!(status.success(), "CLI exited with error: {}", status); } else { - let _child = Command::new("sh") - .arg("-c") - .arg(command) + let mut child = Command::new(&executable) + .args(&args) .stdin(Stdio::null()) .stdout(Stdio::null()) .stderr(Stdio::null()) .spawn() .expect("Failed to start server process"); - // Intentionally detaching the spawned child; the caller only waits - // for the validator services to become available. tokio::time::sleep(tokio::time::Duration::from_secs(config.wait_time)).await; + if let Some(status) = child.try_wait().expect("Failed to poll validator process") { + assert!( + status.success(), + "Validator exited early with error: {}", + status + ); + } } } } diff --git a/sdk-libs/program-test/src/indexer/test_indexer.rs b/sdk-libs/program-test/src/indexer/test_indexer.rs index d1b46c9148..9c23d9b488 100644 --- a/sdk-libs/program-test/src/indexer/test_indexer.rs +++ b/sdk-libs/program-test/src/indexer/test_indexer.rs @@ -2373,7 +2373,8 @@ impl TestIndexer { Some( BatchNonInclusionJsonStruct::from_non_inclusion_proof_inputs( &non_inclusion_proof_inputs, - ), + ) + .map_err(|error| IndexerError::CustomError(error.to_string()))?, ), None, )