From b93217c2595ed170a259d106c1c775e1cf352db7 Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Sat, 14 Mar 2026 16:32:06 +0000 Subject: [PATCH 1/2] cleanup --- forester/src/processor/v2/proof_worker.rs | 8 ++++---- sdk-libs/client/src/indexer/types/queue.rs | 13 +++---------- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/forester/src/processor/v2/proof_worker.rs b/forester/src/processor/v2/proof_worker.rs index 72b9d318ae..ba9f133aee 100644 --- a/forester/src/processor/v2/proof_worker.rs +++ b/forester/src/processor/v2/proof_worker.rs @@ -132,8 +132,8 @@ struct ProofClients { } impl ProofClients { - fn new(config: &ProverConfig) -> crate::Result { - Ok(Self { + fn new(config: &ProverConfig) -> Self { + Self { append_client: ProofClient::with_config( config.append_url.clone(), config.polling_interval, @@ -152,7 +152,7 @@ impl ProofClients { config.max_wait_time, config.api_key.clone(), ), - }) + } } fn get_client(&self, input: &ProofInput) -> &ProofClient { @@ -168,7 +168,7 @@ pub fn spawn_proof_workers( config: &ProverConfig, ) -> crate::Result> { let (job_tx, job_rx) = async_channel::bounded::(256); - let clients = Arc::new(ProofClients::new(config)?); + let clients = Arc::new(ProofClients::new(config)); tokio::spawn(async move { run_proof_pipeline(job_rx, clients).await }); Ok(job_tx) } diff --git a/sdk-libs/client/src/indexer/types/queue.rs b/sdk-libs/client/src/indexer/types/queue.rs index 940ba4fa9e..8a9cd479a3 100644 --- a/sdk-libs/client/src/indexer/types/queue.rs +++ b/sdk-libs/client/src/indexer/types/queue.rs @@ -88,7 +88,7 @@ impl AddressQueueData { address_range: std::ops::Range, ) -> Result, IndexerError> { self.validate_proof_height::()?; - let available = self.proof_count()?; + let available = self.proof_count(); if address_range.start > address_range.end { return Err(IndexerError::InvalidParameters(format!( "invalid address proof range {}..{}", @@ -127,15 +127,8 @@ impl AddressQueueData { lookup } - fn proof_count(&self) -> Result { - let addr_len = self.addresses.len(); - let idx_len = self.low_element_indices.len(); - if addr_len != idx_len { - return Err(IndexerError::InvalidParameters(format!( - "address queue length mismatch: addresses.len()={addr_len} != low_element_indices.len()={idx_len}" - ))); - } - Ok(addr_len) + fn proof_count(&self) -> usize { + self.addresses.len().min(self.low_element_indices.len()) } fn reconstruct_proof_with_lookup( From 895b6463e24400f31600f215e63496f01fd79886 Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Fri, 10 Apr 2026 15:59:23 -0700 Subject: [PATCH 2/2] cleanup --- .../src/compressible/ctoken/compressor.rs | 10 ++-- forester/src/compressible/ctoken/state.rs | 10 +++- forester/src/compressible/mint/compressor.rs | 59 +++++++++++-------- forester/src/compressible/pda/compressor.rs | 56 +++++++++++------- forester/src/compressible/pda/state.rs | 16 +++-- forester/src/compressible/traits.rs | 14 ++--- forester/src/epoch_manager.rs | 44 +++++++++----- 7 files changed, 130 insertions(+), 79 deletions(-) diff --git a/forester/src/compressible/ctoken/compressor.rs b/forester/src/compressible/ctoken/compressor.rs index 80e5ca4382..96b1d71778 100644 --- a/forester/src/compressible/ctoken/compressor.rs +++ b/forester/src/compressible/ctoken/compressor.rs @@ -30,16 +30,16 @@ use crate::{ pub struct CTokenCompressor { rpc_pool: Arc>, tracker: Arc, - payer_keypair: Keypair, + payer_keypair: Arc, transaction_policy: TransactionPolicy, } impl Clone for CTokenCompressor { fn clone(&self) -> Self { Self { - rpc_pool: Arc::clone(&self.rpc_pool), - tracker: Arc::clone(&self.tracker), - payer_keypair: self.payer_keypair.insecure_clone(), + rpc_pool: self.rpc_pool.clone(), + tracker: self.tracker.clone(), + payer_keypair: self.payer_keypair.clone(), transaction_policy: self.transaction_policy, } } @@ -49,7 +49,7 @@ impl CTokenCompressor { pub fn new( rpc_pool: Arc>, tracker: Arc, - payer_keypair: Keypair, + payer_keypair: Arc, transaction_policy: TransactionPolicy, ) -> Self { Self { diff --git a/forester/src/compressible/ctoken/state.rs b/forester/src/compressible/ctoken/state.rs index 1c71e785f3..4da2b8a431 100644 --- a/forester/src/compressible/ctoken/state.rs +++ b/forester/src/compressible/ctoken/state.rs @@ -62,9 +62,13 @@ impl CTokenAccountTracker { /// Returns all tracked token accounts (not mints), ignoring compressible_slot. /// Use `get_ready_to_compress(current_slot)` to get only accounts ready for compression. pub fn get_all_token_accounts(&self) -> Vec { - self.get_ready_to_compress(u64::MAX) - .into_iter() - .filter(|state| state.account.is_token_account()) + let pending = self.pending(); + self.accounts() + .iter() + .filter(|entry| { + entry.value().account.is_token_account() && !pending.contains(entry.key()) + }) + .map(|entry| entry.value().clone()) .collect() } diff --git a/forester/src/compressible/mint/compressor.rs b/forester/src/compressible/mint/compressor.rs index 889a28d5e1..408059f134 100644 --- a/forester/src/compressible/mint/compressor.rs +++ b/forester/src/compressible/mint/compressor.rs @@ -30,16 +30,16 @@ use crate::{ pub struct MintCompressor { rpc_pool: Arc>, tracker: Arc, - payer_keypair: Keypair, + payer_keypair: Arc, transaction_policy: TransactionPolicy, } impl Clone for MintCompressor { fn clone(&self) -> Self { Self { - rpc_pool: Arc::clone(&self.rpc_pool), - tracker: Arc::clone(&self.tracker), - payer_keypair: self.payer_keypair.insecure_clone(), + rpc_pool: self.rpc_pool.clone(), + tracker: self.tracker.clone(), + payer_keypair: self.payer_keypair.clone(), transaction_policy: self.transaction_policy, } } @@ -49,7 +49,7 @@ impl MintCompressor { pub fn new( rpc_pool: Arc>, tracker: Arc, - payer_keypair: Keypair, + payer_keypair: Arc, transaction_policy: TransactionPolicy, ) -> Self { Self { @@ -133,21 +133,20 @@ impl MintCompressor { /// Use this when you need fine-grained control over individual compressions. pub async fn compress_batch_concurrent( &self, - mint_states: &[MintAccountState], + pubkeys: &[Pubkey], max_concurrent: usize, cancelled: Arc, - ) -> CompressionOutcomes { - if mint_states.is_empty() { + ) -> CompressionOutcomes { + if pubkeys.is_empty() { return Vec::new(); } // Guard against max_concurrent == 0 to avoid buffer_unordered panic if max_concurrent == 0 { - return mint_states + return pubkeys .iter() - .cloned() - .map(|mint_state| CompressionOutcome::Failed { - state: mint_state, + .map(|&pubkey| CompressionOutcome::Failed { + pubkey, error: CompressionTaskError::Failed(anyhow::anyhow!( "max_concurrent must be > 0" )), @@ -156,30 +155,44 @@ impl MintCompressor { } // Mark all as pending upfront - let all_pubkeys: Vec = mint_states.iter().map(|s| s.pubkey).collect(); - self.tracker.mark_pending(&all_pubkeys); + self.tracker.mark_pending(pubkeys); // Create futures for each mint - let compression_futures = mint_states.iter().cloned().map(|mint_state| { + let compression_futures = pubkeys.iter().copied().map(|pubkey| { let compressor = self.clone(); let cancelled = cancelled.clone(); async move { // Check cancellation before processing if cancelled.load(Ordering::Relaxed) { - compressor.tracker.unmark_pending(&[mint_state.pubkey]); + compressor.tracker.unmark_pending(&[pubkey]); return CompressionOutcome::Failed { - state: mint_state, + pubkey, error: CompressionTaskError::Cancelled, }; } + let mint_state = + match compressor.tracker.accounts().get(&pubkey).map(|r| r.clone()) { + Some(state) => state, + None => { + compressor.tracker.unmark_pending(&[pubkey]); + return CompressionOutcome::Failed { + pubkey, + error: CompressionTaskError::Failed(anyhow::anyhow!( + "mint {} removed from tracker before compression", + pubkey + )), + }; + } + }; + match compressor.compress(&mint_state).await { Ok(sig) => CompressionOutcome::Compressed { signature: sig, - state: mint_state, + pubkey, }, Err(e) => CompressionOutcome::Failed { - state: mint_state, + pubkey, error: e.into(), }, } @@ -195,11 +208,11 @@ impl MintCompressor { // Remove successfully compressed mints; unmark failed ones for result in &results { match result { - CompressionOutcome::Compressed { state, .. } => { - self.tracker.remove_compressed(&state.pubkey); + CompressionOutcome::Compressed { pubkey, .. } => { + self.tracker.remove_compressed(pubkey); } - CompressionOutcome::Failed { state, .. } => { - self.tracker.unmark_pending(&[state.pubkey]); + CompressionOutcome::Failed { pubkey, .. } => { + self.tracker.unmark_pending(&[*pubkey]); } } } diff --git a/forester/src/compressible/pda/compressor.rs b/forester/src/compressible/pda/compressor.rs index 941f0d3e53..964f700349 100644 --- a/forester/src/compressible/pda/compressor.rs +++ b/forester/src/compressible/pda/compressor.rs @@ -55,16 +55,16 @@ pub struct CachedProgramConfig { pub struct PdaCompressor { rpc_pool: Arc>, tracker: Arc, - payer_keypair: Keypair, + payer_keypair: Arc, transaction_policy: TransactionPolicy, } impl Clone for PdaCompressor { fn clone(&self) -> Self { Self { - rpc_pool: Arc::clone(&self.rpc_pool), - tracker: Arc::clone(&self.tracker), - payer_keypair: self.payer_keypair.insecure_clone(), + rpc_pool: self.rpc_pool.clone(), + tracker: self.tracker.clone(), + payer_keypair: self.payer_keypair.clone(), transaction_policy: self.transaction_policy, } } @@ -74,7 +74,7 @@ impl PdaCompressor { pub fn new( rpc_pool: Arc>, tracker: Arc, - payer_keypair: Keypair, + payer_keypair: Arc, transaction_policy: TransactionPolicy, ) -> Self { Self { @@ -156,22 +156,21 @@ impl PdaCompressor { /// Successfully compressed accounts are removed from the tracker. pub async fn compress_batch_concurrent( &self, - account_states: &[PdaAccountState], + pubkeys: &[Pubkey], program_config: &PdaProgramConfig, cached_config: &CachedProgramConfig, max_concurrent: usize, cancelled: Arc, - ) -> CompressionOutcomes { - if account_states.is_empty() { + ) -> CompressionOutcomes { + if pubkeys.is_empty() { return Vec::new(); } // Mark all accounts as pending upfront so concurrent cycles skip them - let all_pubkeys: Vec = account_states.iter().map(|s| s.pubkey).collect(); - self.tracker.mark_pending(&all_pubkeys); + self.tracker.mark_pending(pubkeys); // Create futures for each account - let compression_futures = account_states.iter().cloned().map(|account_state| { + let compression_futures = pubkeys.iter().copied().map(|pubkey| { let compressor = self.clone(); let program_config = program_config.clone(); let cached_config = cached_config.clone(); @@ -180,24 +179,39 @@ impl PdaCompressor { async move { // Check cancellation before processing if cancelled.load(Ordering::Relaxed) { - // Unmark since we won't process this account - compressor.tracker.unmark_pending(&[account_state.pubkey]); + compressor.tracker.unmark_pending(&[pubkey]); return CompressionOutcome::Failed { - state: account_state, + pubkey, error: CompressionTaskError::Cancelled, }; } + // Look up account state from tracker; it may have been removed + let account_state = + match compressor.tracker.accounts().get(&pubkey).map(|r| r.clone()) { + Some(state) => state, + None => { + compressor.tracker.unmark_pending(&[pubkey]); + return CompressionOutcome::Failed { + pubkey, + error: CompressionTaskError::Failed(anyhow::anyhow!( + "account {} removed from tracker before compression", + pubkey + )), + }; + } + }; + match compressor .compress(&account_state, &program_config, &cached_config) .await { Ok(sig) => CompressionOutcome::Compressed { signature: sig, - state: account_state, + pubkey, }, Err(e) => CompressionOutcome::Failed { - state: account_state, + pubkey, error: e.into(), }, } @@ -213,11 +227,11 @@ impl PdaCompressor { // Remove successfully compressed PDAs; unmark failed ones for result in &results { match result { - CompressionOutcome::Compressed { state, .. } => { - self.tracker.remove_compressed(&state.pubkey); + CompressionOutcome::Compressed { pubkey, .. } => { + self.tracker.remove_compressed(pubkey); } - CompressionOutcome::Failed { state, .. } => { - self.tracker.unmark_pending(&[state.pubkey]); + CompressionOutcome::Failed { pubkey, .. } => { + self.tracker.unmark_pending(&[*pubkey]); } } } @@ -396,7 +410,7 @@ impl PdaCompressor { ); let payer_pubkey = self.payer_keypair.pubkey(); - let signers = [&self.payer_keypair]; + let signers = [self.payer_keypair.as_ref()]; let instructions = vec![ix]; let priority_fee_accounts = collect_priority_fee_accounts(payer_pubkey, &instructions); let signature = send_transaction_with_policy( diff --git a/forester/src/compressible/pda/state.rs b/forester/src/compressible/pda/state.rs index 92f9fe0a61..8c04853ea8 100644 --- a/forester/src/compressible/pda/state.rs +++ b/forester/src/compressible/pda/state.rs @@ -13,7 +13,7 @@ use super::types::PdaAccountState; use crate::{ compressible::{ config::PdaProgramConfig, - traits::{CompressibleTracker, SubscriptionHandler}, + traits::{CompressibleState, CompressibleTracker, SubscriptionHandler}, }, Result, }; @@ -72,10 +72,16 @@ impl PdaAccountTracker { &self, program_id: &Pubkey, current_slot: u64, - ) -> Vec { - self.get_ready_to_compress(current_slot) - .into_iter() - .filter(|state| state.program_id == *program_id) + ) -> Vec { + let pending = self.pending(); + self.accounts() + .iter() + .filter(|entry| { + entry.value().program_id == *program_id + && entry.value().is_ready_to_compress(current_slot) + && !pending.contains(entry.key()) + }) + .map(|entry| *entry.key()) .collect() } diff --git a/forester/src/compressible/traits.rs b/forester/src/compressible/traits.rs index ccc7f43386..d9484f3b2e 100644 --- a/forester/src/compressible/traits.rs +++ b/forester/src/compressible/traits.rs @@ -47,20 +47,20 @@ pub enum CompressionTaskError { } #[derive(Debug)] -pub enum CompressionOutcome { +pub enum CompressionOutcome { Compressed { signature: Signature, - state: S, + pubkey: Pubkey, }, Failed { - state: S, + pubkey: Pubkey, error: CompressionTaskError, }, } -pub type CompressionOutcomes = Vec>; +pub type CompressionOutcomes = Vec; -pub trait CompressibleState: Clone + Send + Sync { +pub trait CompressibleState: Send + Sync { fn pubkey(&self) -> &Pubkey; fn lamports(&self) -> u64; fn compressible_slot(&self) -> u64; @@ -128,14 +128,14 @@ pub trait CompressibleTracker: Send + Sync { self.len() == 0 } - fn get_ready_to_compress(&self, current_slot: u64) -> Vec { + fn get_ready_to_compress(&self, current_slot: u64) -> Vec { let pending = self.pending(); self.accounts() .iter() .filter(|entry| { entry.value().is_ready_to_compress(current_slot) && !pending.contains(entry.key()) }) - .map(|entry| entry.value().clone()) + .map(|entry| *entry.key()) .collect() } } diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index dc19de5576..e3a5c6036a 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -46,7 +46,10 @@ use tracing::{debug, error, info, info_span, instrument, trace, warn}; use crate::{ compressible::{ - traits::{Cancelled, CompressibleTracker, CompressionOutcome, CompressionTaskError}, + traits::{ + Cancelled, CompressibleState, CompressibleTracker, CompressionOutcome, + CompressionTaskError, + }, CTokenAccountTracker, CTokenCompressor, CompressibleConfig, }, errors::{ @@ -2534,7 +2537,18 @@ impl EpochManager { .compressible_config .as_ref() .ok_or_else(|| anyhow!("Compressible config not set"))?; - let accounts = tracker.get_ready_to_compress(current_slot); + // CToken compress_batch needs full account state for instruction building, + // so collect states in a single pass rather than pubkeys-then-lookup. + let pending = tracker.pending(); + let accounts: Vec<_> = tracker + .accounts() + .iter() + .filter(|entry| { + entry.value().is_ready_to_compress(current_slot) && !pending.contains(entry.key()) + }) + .map(|entry| entry.value().clone()) + .collect(); + drop(pending); if accounts.is_empty() { trace!("No compressible accounts ready for compression"); @@ -2554,7 +2568,7 @@ impl EpochManager { let compressor = CTokenCompressor::new( self.rpc_pool.clone(), tracker.clone(), - self.config.payer_keypair.insecure_clone(), + self.authority.clone(), self.transaction_policy(), ); @@ -2791,7 +2805,7 @@ impl EpochManager { let pda_compressor = crate::compressible::pda::PdaCompressor::new( self.rpc_pool.clone(), pda_tracker.clone(), - self.config.payer_keypair.insecure_clone(), + self.authority.clone(), self.transaction_policy(), ); @@ -2840,26 +2854,26 @@ impl EpochManager { match result { CompressionOutcome::Compressed { signature: sig, - state: account_state, + pubkey, } => { debug!( "Compressed PDA {} for program {}: {}", - account_state.pubkey, program_config.program_id, sig + pubkey, program_config.program_id, sig ); total_compressed += 1; } CompressionOutcome::Failed { - state: _account_state, error: CompressionTaskError::Cancelled, + .. } => {} CompressionOutcome::Failed { - state: account_state, + pubkey, error: CompressionTaskError::Failed(e), } => { error!( event = "compression_pda_account_failed", run_id = %self.run_id, - account = %account_state.pubkey, + account = %pubkey, program = %program_config.program_id, error = ?e, "Failed to compress PDA account" @@ -2915,7 +2929,7 @@ impl EpochManager { let mint_compressor = crate::compressible::mint::MintCompressor::new( self.rpc_pool.clone(), mint_tracker.clone(), - self.config.payer_keypair.insecure_clone(), + self.authority.clone(), self.transaction_policy(), ); @@ -2933,23 +2947,23 @@ impl EpochManager { match result { CompressionOutcome::Compressed { signature: sig, - state: mint_state, + pubkey, } => { - debug!("Compressed Mint {}: {}", mint_state.pubkey, sig); + debug!("Compressed Mint {}: {}", pubkey, sig); total_compressed += 1; } CompressionOutcome::Failed { - state: _mint_state, error: CompressionTaskError::Cancelled, + .. } => {} CompressionOutcome::Failed { - state: mint_state, + pubkey, error: CompressionTaskError::Failed(e), } => { error!( event = "compression_mint_account_failed", run_id = %self.run_id, - mint = %mint_state.pubkey, + mint = %pubkey, error = ?e, "Failed to compress mint account" );