Skip to content

Commit 986107b

Browse files
refactor: reduce clone pressure in compressible trackers
1 parent 2357022 commit 986107b

10 files changed

Lines changed: 52 additions & 39 deletions

File tree

forester/src/compressible/ctoken/compressor.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::{
3030
pub struct CTokenCompressor<R: Rpc + Indexer> {
3131
rpc_pool: Arc<SolanaRpcPool<R>>,
3232
tracker: Arc<CTokenAccountTracker>,
33-
payer_keypair: Keypair,
33+
payer_keypair: Arc<Keypair>,
3434
transaction_policy: TransactionPolicy,
3535
}
3636

@@ -39,7 +39,7 @@ impl<R: Rpc + Indexer> Clone for CTokenCompressor<R> {
3939
Self {
4040
rpc_pool: Arc::clone(&self.rpc_pool),
4141
tracker: Arc::clone(&self.tracker),
42-
payer_keypair: self.payer_keypair.insecure_clone(),
42+
payer_keypair: Arc::clone(&self.payer_keypair),
4343
transaction_policy: self.transaction_policy,
4444
}
4545
}
@@ -55,7 +55,7 @@ impl<R: Rpc + Indexer> CTokenCompressor<R> {
5555
Self {
5656
rpc_pool,
5757
tracker,
58-
payer_keypair,
58+
payer_keypair: Arc::new(payer_keypair),
5959
transaction_policy,
6060
}
6161
}
@@ -253,7 +253,7 @@ impl<R: Rpc + Indexer> CTokenCompressor<R> {
253253
send_and_confirm_with_tracking(
254254
&mut *rpc,
255255
&[ix],
256-
&self.payer_keypair,
256+
self.payer_keypair.as_ref(),
257257
self.transaction_policy,
258258
&*self.tracker,
259259
&pubkeys,

forester/src/compressible/ctoken/state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::atomic::AtomicU64;
1+
use std::sync::{atomic::AtomicU64, Arc};
22

33
use borsh::BorshDeserialize;
44
use dashmap::{DashMap, DashSet};
@@ -126,7 +126,7 @@ impl CTokenAccountTracker {
126126

127127
let state = CTokenAccountState {
128128
pubkey,
129-
account: ctoken,
129+
account: Arc::new(ctoken),
130130
lamports,
131131
compressible_slot,
132132
is_ata,

forester/src/compressible/ctoken/types.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use light_token_interface::state::Token;
24
use solana_sdk::pubkey::Pubkey;
35

@@ -6,7 +8,7 @@ use crate::compressible::traits::CompressibleState;
68
#[derive(Clone, Debug)]
79
pub struct CTokenAccountState {
810
pub pubkey: Pubkey,
9-
pub account: Token,
11+
pub account: Arc<Token>,
1012
pub lamports: u64,
1113
/// Ready to compress when current_slot > compressible_slot
1214
pub compressible_slot: u64,

forester/src/compressible/mint/compressor.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::{
3030
pub struct MintCompressor<R: Rpc + Indexer> {
3131
rpc_pool: Arc<SolanaRpcPool<R>>,
3232
tracker: Arc<MintAccountTracker>,
33-
payer_keypair: Keypair,
33+
payer_keypair: Arc<Keypair>,
3434
transaction_policy: TransactionPolicy,
3535
}
3636

@@ -39,7 +39,7 @@ impl<R: Rpc + Indexer> Clone for MintCompressor<R> {
3939
Self {
4040
rpc_pool: Arc::clone(&self.rpc_pool),
4141
tracker: Arc::clone(&self.tracker),
42-
payer_keypair: self.payer_keypair.insecure_clone(),
42+
payer_keypair: Arc::clone(&self.payer_keypair),
4343
transaction_policy: self.transaction_policy,
4444
}
4545
}
@@ -55,7 +55,7 @@ impl<R: Rpc + Indexer> MintCompressor<R> {
5555
Self {
5656
rpc_pool,
5757
tracker,
58-
payer_keypair,
58+
payer_keypair: Arc::new(payer_keypair),
5959
transaction_policy,
6060
}
6161
}
@@ -117,7 +117,7 @@ impl<R: Rpc + Indexer> MintCompressor<R> {
117117
send_and_confirm_with_tracking(
118118
&mut *rpc,
119119
&instructions,
120-
&self.payer_keypair,
120+
self.payer_keypair.as_ref(),
121121
self.transaction_policy,
122122
&*self.tracker,
123123
&pubkeys,
@@ -160,26 +160,26 @@ impl<R: Rpc + Indexer> MintCompressor<R> {
160160
self.tracker.mark_pending(&all_pubkeys);
161161

162162
// Create futures for each mint
163-
let compression_futures = mint_states.iter().cloned().map(|mint_state| {
163+
let compression_futures = mint_states.iter().map(|mint_state| {
164164
let compressor = self.clone();
165165
let cancelled = cancelled.clone();
166166
async move {
167167
// Check cancellation before processing
168168
if cancelled.load(Ordering::Relaxed) {
169169
compressor.tracker.unmark_pending(&[mint_state.pubkey]);
170170
return CompressionOutcome::Failed {
171-
state: mint_state,
171+
state: mint_state.clone(),
172172
error: CompressionTaskError::Cancelled,
173173
};
174174
}
175175

176-
match compressor.compress(&mint_state).await {
176+
match compressor.compress(mint_state).await {
177177
Ok(sig) => CompressionOutcome::Compressed {
178178
signature: sig,
179-
state: mint_state,
179+
state: mint_state.clone(),
180180
},
181181
Err(e) => CompressionOutcome::Failed {
182-
state: mint_state,
182+
state: mint_state.clone(),
183183
error: e.into(),
184184
},
185185
}
@@ -259,7 +259,7 @@ impl<R: Rpc + Indexer> MintCompressor<R> {
259259
let signature = send_and_confirm_with_tracking(
260260
&mut *rpc,
261261
&[ix],
262-
&self.payer_keypair,
262+
self.payer_keypair.as_ref(),
263263
self.transaction_policy,
264264
&*self.tracker,
265265
&tracked_pubkeys,

forester/src/compressible/mint/state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::atomic::AtomicU64;
1+
use std::sync::{atomic::AtomicU64, Arc};
22

33
use borsh::BorshDeserialize;
44
use dashmap::{DashMap, DashSet};
@@ -109,7 +109,7 @@ impl MintAccountTracker {
109109
pubkey,
110110
mint_seed,
111111
compressed_address,
112-
mint,
112+
mint: Arc::new(mint),
113113
lamports,
114114
compressible_slot,
115115
};

forester/src/compressible/mint/types.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use light_token_interface::state::Mint;
24
use solana_sdk::pubkey::Pubkey;
35

@@ -8,7 +10,7 @@ pub struct MintAccountState {
810
pub pubkey: Pubkey,
911
pub mint_seed: Pubkey,
1012
pub compressed_address: [u8; 32],
11-
pub mint: Mint,
13+
pub mint: Arc<Mint>,
1214
pub lamports: u64,
1315
/// Ready to compress when current_slot > compressible_slot
1416
pub compressible_slot: u64,

forester/src/compressible/pda/compressor.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub struct CachedProgramConfig {
5555
pub struct PdaCompressor<R: Rpc + Indexer> {
5656
rpc_pool: Arc<SolanaRpcPool<R>>,
5757
tracker: Arc<PdaAccountTracker>,
58-
payer_keypair: Keypair,
58+
payer_keypair: Arc<Keypair>,
5959
transaction_policy: TransactionPolicy,
6060
}
6161

@@ -64,7 +64,7 @@ impl<R: Rpc + Indexer> Clone for PdaCompressor<R> {
6464
Self {
6565
rpc_pool: Arc::clone(&self.rpc_pool),
6666
tracker: Arc::clone(&self.tracker),
67-
payer_keypair: self.payer_keypair.insecure_clone(),
67+
payer_keypair: Arc::clone(&self.payer_keypair),
6868
transaction_policy: self.transaction_policy,
6969
}
7070
}
@@ -80,7 +80,7 @@ impl<R: Rpc + Indexer> PdaCompressor<R> {
8080
Self {
8181
rpc_pool,
8282
tracker,
83-
payer_keypair,
83+
payer_keypair: Arc::new(payer_keypair),
8484
transaction_policy,
8585
}
8686
}
@@ -171,10 +171,12 @@ impl<R: Rpc + Indexer> PdaCompressor<R> {
171171
self.tracker.mark_pending(&all_pubkeys);
172172

173173
// Create futures for each account
174-
let compression_futures = account_states.iter().cloned().map(|account_state| {
174+
let program_config = Arc::new(program_config.clone());
175+
let cached_config = Arc::new(cached_config.clone());
176+
let compression_futures = account_states.iter().map(|account_state| {
175177
let compressor = self.clone();
176-
let program_config = program_config.clone();
177-
let cached_config = cached_config.clone();
178+
let program_config = Arc::clone(&program_config);
179+
let cached_config = Arc::clone(&cached_config);
178180
let cancelled = cancelled.clone();
179181

180182
async move {
@@ -183,21 +185,21 @@ impl<R: Rpc + Indexer> PdaCompressor<R> {
183185
// Unmark since we won't process this account
184186
compressor.tracker.unmark_pending(&[account_state.pubkey]);
185187
return CompressionOutcome::Failed {
186-
state: account_state,
188+
state: account_state.clone(),
187189
error: CompressionTaskError::Cancelled,
188190
};
189191
}
190192

191193
match compressor
192-
.compress(&account_state, &program_config, &cached_config)
194+
.compress(account_state, &program_config, &cached_config)
193195
.await
194196
{
195197
Ok(sig) => CompressionOutcome::Compressed {
196198
signature: sig,
197-
state: account_state,
199+
state: account_state.clone(),
198200
},
199201
Err(e) => CompressionOutcome::Failed {
200-
state: account_state,
202+
state: account_state.clone(),
201203
error: e.into(),
202204
},
203205
}
@@ -317,7 +319,7 @@ impl<R: Rpc + Indexer> PdaCompressor<R> {
317319
send_and_confirm_with_tracking(
318320
&mut *rpc,
319321
&[ix],
320-
&self.payer_keypair,
322+
self.payer_keypair.as_ref(),
321323
self.transaction_policy,
322324
&*self.tracker,
323325
&pubkeys,
@@ -396,7 +398,7 @@ impl<R: Rpc + Indexer> PdaCompressor<R> {
396398
);
397399

398400
let payer_pubkey = self.payer_keypair.pubkey();
399-
let signers = [&self.payer_keypair];
401+
let signers = [self.payer_keypair.as_ref()];
400402
let instructions = vec![ix];
401403
let priority_fee_accounts = collect_priority_fee_accounts(payer_pubkey, &instructions);
402404
let signature = send_transaction_with_policy(

forester/src/processor/v2/proof_cache.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,9 @@ impl ProofCache {
126126
}
127127
}
128128

129-
self.proofs = self.warming_proofs.values().cloned().collect();
130-
self.warming_proofs.clear();
129+
self.proofs = std::mem::take(&mut self.warming_proofs)
130+
.into_values()
131+
.collect();
131132

132133
info!(
133134
"Cache warm-up complete for tree {}: {} proofs cached with root {:?}",

forester/tests/test_compressible_ctoken.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,8 @@ async fn test_compressible_ctoken_compression() {
320320
use_surfpool: true,
321321
validator_args: vec![],
322322
})
323-
.await;
323+
.await
324+
.unwrap();
324325
let mut rpc = LightClient::new(LightClientConfig::local())
325326
.await
326327
.expect("Failed to create LightClient");
@@ -403,6 +404,7 @@ async fn test_compressible_ctoken_compression() {
403404
account_type: ACCOUNT_TYPE_TOKEN_ACCOUNT,
404405
extensions: Some(vec![ExtensionStruct::Compressible(compressible_ext)]),
405406
}
407+
.into()
406408
);
407409
assert!(account_state.lamports > 0);
408410
let lamports = account_state.lamports;
@@ -525,7 +527,8 @@ async fn test_compressible_ctoken_bootstrap() {
525527
use_surfpool: true,
526528
validator_args: vec![],
527529
})
528-
.await;
530+
.await
531+
.unwrap();
529532

530533
let mut rpc = LightClient::new(LightClientConfig::local())
531534
.await

forester/tests/test_compressible_mint.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ async fn test_compressible_mint_bootstrap() {
145145
use_surfpool: true,
146146
validator_args: vec![],
147147
})
148-
.await;
148+
.await
149+
.unwrap();
149150

150151
let mut rpc = LightClient::new(LightClientConfig::local())
151152
.await
@@ -287,7 +288,8 @@ async fn test_compressible_mint_compression() {
287288
use_surfpool: true,
288289
validator_args: vec![],
289290
})
290-
.await;
291+
.await
292+
.unwrap();
291293

292294
let mut rpc = LightClient::new(LightClientConfig::local())
293295
.await
@@ -478,7 +480,8 @@ async fn test_compressible_mint_subscription() {
478480
use_surfpool: true,
479481
validator_args: vec![],
480482
})
481-
.await;
483+
.await
484+
.unwrap();
482485

483486
let mut rpc = LightClient::new(LightClientConfig::local())
484487
.await

0 commit comments

Comments
 (0)