Skip to content

Commit 4058e37

Browse files
feat: isolate and track forester worker concurrency
1 parent 7ab1c90 commit 4058e37

9 files changed

Lines changed: 580 additions & 304 deletions

forester/src/epoch_manager.rs

Lines changed: 539 additions & 276 deletions
Large diffs are not rendered by default.

forester/src/metrics.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -440,21 +440,19 @@ pub async fn metrics_handler() -> Result<impl warp::Reply> {
440440
if let Err(e) = encoder.encode(&REGISTRY.gather(), &mut buffer) {
441441
error!("could not encode custom metrics: {}", e);
442442
};
443-
let mut res = String::from_utf8(buffer.clone()).unwrap_or_else(|e| {
443+
let mut res = String::from_utf8(buffer).unwrap_or_else(|e| {
444444
error!("custom metrics could not be from_utf8'd: {}", e);
445445
String::new()
446446
});
447-
buffer.clear();
448447

449448
let mut buffer = Vec::new();
450449
if let Err(e) = encoder.encode(&prometheus::gather(), &mut buffer) {
451450
error!("could not encode prometheus metrics: {}", e);
452451
};
453-
let res_prometheus = String::from_utf8(buffer.clone()).unwrap_or_else(|e| {
452+
let res_prometheus = String::from_utf8(buffer).unwrap_or_else(|e| {
454453
error!("prometheus metrics could not be from_utf8'd: {}", e);
455454
String::new()
456455
});
457-
buffer.clear();
458456

459457
res.push_str(&res_prometheus);
460458
Ok(res)

forester/src/priority_fee.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ pub async fn request_priority_fee_estimate(
208208
.map_err(|error| PriorityFeeEstimateError::ClientBuild(error.clone()))?;
209209

210210
let response = http_client
211-
.post(url.clone())
211+
.post(url.as_str())
212212
.header("Content-Type", "application/json")
213213
.json(&rpc_request)
214214
.send()

forester/tests/e2e_test.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ async fn e2e_test() {
277277
validator_args: vec![],
278278
}))
279279
.await;
280-
spawn_prover().await;
280+
spawn_prover().await.unwrap();
281281
}
282282

283283
let mut rpc = setup_rpc_connection(&env.protocol.forester).await;
@@ -799,15 +799,22 @@ async fn setup_forester_pipeline(
799799
let (shutdown_bootstrap_sender, shutdown_bootstrap_receiver) = oneshot::channel();
800800
let (work_report_sender, work_report_receiver) = mpsc::channel(100);
801801

802-
let service_handle = tokio::spawn(run_pipeline::<LightClient>(
803-
Arc::from(config.clone()),
804-
None,
805-
None,
806-
shutdown_receiver,
807-
Some(shutdown_compressible_receiver),
808-
Some(shutdown_bootstrap_receiver),
809-
work_report_sender,
810-
));
802+
let config = Arc::new(config.clone());
803+
let service_handle = tokio::task::spawn_blocking(move || {
804+
let runtime = tokio::runtime::Builder::new_multi_thread()
805+
.worker_threads(2)
806+
.enable_all()
807+
.build()?;
808+
runtime.block_on(run_pipeline::<LightClient>(
809+
config,
810+
None,
811+
None,
812+
shutdown_receiver,
813+
Some(shutdown_compressible_receiver),
814+
Some(shutdown_bootstrap_receiver),
815+
work_report_sender,
816+
))
817+
});
811818

812819
(
813820
service_handle,

forester/tests/legacy/batched_state_async_indexer_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ async fn test_state_indexer_async_batched() {
8787
validator_args: vec![],
8888
}))
8989
.await;
90-
spawn_prover().await;
90+
spawn_prover().await.unwrap();
9191

9292
let env = TestAccounts::get_local_test_validator_accounts();
9393
let mut config = forester_config();

forester/tests/legacy/test_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub async fn init(config: Option<LightValidatorConfig>) {
2626
#[allow(dead_code)]
2727
pub async fn spawn_test_validator(config: Option<LightValidatorConfig>) {
2828
let config = config.unwrap_or_default();
29-
spawn_validator(config).await;
29+
spawn_validator(config).await.unwrap();
3030
}
3131

3232
#[allow(dead_code)]

forester/tests/test_batch_append_spent.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -328,15 +328,22 @@ async fn run_forester(config: &ForesterConfig, duration: Duration) {
328328
tokio::sync::broadcast::channel(1);
329329
let (work_report_sender, _) = mpsc::channel(100);
330330

331-
let service_handle = tokio::spawn(run_pipeline::<LightClient>(
332-
Arc::from(config.clone()),
333-
None,
334-
None,
335-
shutdown_receiver,
336-
Some(shutdown_compressible_receiver),
337-
None, // shutdown_bootstrap
338-
work_report_sender,
339-
));
331+
let config = Arc::new(config.clone());
332+
let service_handle = tokio::task::spawn_blocking(move || {
333+
let runtime = tokio::runtime::Builder::new_multi_thread()
334+
.worker_threads(2)
335+
.enable_all()
336+
.build()?;
337+
runtime.block_on(run_pipeline::<LightClient>(
338+
config,
339+
None,
340+
None,
341+
shutdown_receiver,
342+
Some(shutdown_compressible_receiver),
343+
None, // shutdown_bootstrap
344+
work_report_sender,
345+
))
346+
});
340347

341348
tokio::time::sleep(duration).await;
342349

forester/tests/test_indexer_interface.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ async fn test_indexer_interface_scenarios() {
6565
validator_args: vec![],
6666
use_surfpool: true,
6767
})
68-
.await;
68+
.await
69+
.unwrap();
6970

7071
let mut rpc = LightClient::new(LightClientConfig::local())
7172
.await

forester/tests/test_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub async fn init(config: Option<LightValidatorConfig>) {
3636
#[allow(dead_code)]
3737
pub async fn spawn_test_validator(config: Option<LightValidatorConfig>) {
3838
let config = config.unwrap_or_default();
39-
spawn_validator(config).await;
39+
spawn_validator(config).await.unwrap();
4040
}
4141

4242
#[allow(dead_code)]

0 commit comments

Comments
 (0)