diff --git a/forester/dashboard/src/app/api/[...path]/route.ts b/forester/dashboard/src/app/api/[...path]/route.ts new file mode 100644 index 0000000000..3c53696752 --- /dev/null +++ b/forester/dashboard/src/app/api/[...path]/route.ts @@ -0,0 +1,22 @@ +import { NextResponse } from "next/server"; + +const BACKEND_URL = + process.env.FORESTER_API_URL || "http://localhost:8080"; + +export async function GET( + request: Request, + { params }: { params: Promise<{ path: string[] }> } +) { + const { path } = await params; + const backendPath = path.join("/"); + try { + const response = await fetch(`${BACKEND_URL}/${backendPath}`); + const data = await response.json(); + return NextResponse.json(data, { status: response.status }); + } catch { + return NextResponse.json( + { error: "Backend unavailable" }, + { status: 502 } + ); + } +} diff --git a/forester/dashboard/src/lib/api.ts b/forester/dashboard/src/lib/api.ts index 186359c71f..7e1ee6e8e9 100644 --- a/forester/dashboard/src/lib/api.ts +++ b/forester/dashboard/src/lib/api.ts @@ -1,5 +1,5 @@ const API_URL = - process.env.NEXT_PUBLIC_FORESTER_API_URL || "http://localhost:8080"; + process.env.NEXT_PUBLIC_FORESTER_API_URL ?? "/api"; export class ApiError extends Error { constructor( diff --git a/forester/src/api_server.rs b/forester/src/api_server.rs index e83b0f04e4..d08e3b5cfa 100644 --- a/forester/src/api_server.rs +++ b/forester/src/api_server.rs @@ -98,11 +98,13 @@ pub struct CompressibleDashboardState { /// Configuration for the HTTP API server. pub struct ApiServerConfig { + pub run_id: Arc, pub rpc_url: String, pub port: u16, pub allow_public_bind: bool, pub compressible_state: Option, pub prometheus_url: Option, + pub helius_rpc: bool, } /// Default timeout for status endpoint in seconds @@ -120,16 +122,23 @@ pub struct ApiServerHandle { pub thread_handle: JoinHandle<()>, /// Sender to trigger graceful shutdown pub shutdown_tx: oneshot::Sender<()>, + pub run_id: Arc, } impl ApiServerHandle { /// Trigger graceful shutdown and wait for the server to stop pub fn shutdown(self) { + let run_id = self.run_id.clone(); // Send shutdown signal (ignore error if receiver already dropped) let _ = self.shutdown_tx.send(()); // Wait for the thread to finish if let Err(e) = self.thread_handle.join() { - error!("API server thread panicked: {:?}", e); + error!( + event = "api_server_thread_panicked", + run_id = %run_id, + error = ?e, + "API server thread panicked" + ); } } } @@ -142,6 +151,7 @@ impl ApiServerHandle { pub(crate) async fn fetch_metrics_snapshot( client: &reqwest::Client, prometheus_url: &Option, + run_id: &str, ) -> MetricsSnapshot { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -170,7 +180,12 @@ pub(crate) async fn fetch_metrics_snapshot( }; } Err(e) => { - warn!("Prometheus query failed: {}", e); + warn!( + event = "api_server_prometheus_query_failed", + run_id = %run_id, + error = %e, + "Prometheus query failed" + ); } } } @@ -187,6 +202,8 @@ pub(crate) async fn fetch_metrics_snapshot( pub(crate) async fn fetch_compressible_snapshot( trackers: &Option, rpc_url: &str, + helius_rpc: bool, + run_id: &str, ) -> CompressibleSnapshot { use crate::compressible::traits::CompressibleTracker; @@ -211,7 +228,7 @@ pub(crate) async fn fetch_compressible_snapshot( // Standalone mode: RPC with timeout let fetch_result = tokio::time::timeout( COMPRESSIBLE_FETCH_TIMEOUT, - crate::compressible::count_compressible_accounts(rpc_url), + crate::compressible::count_compressible_accounts(rpc_url, helius_rpc), ) .await; @@ -227,7 +244,12 @@ pub(crate) async fn fetch_compressible_snapshot( cached_at: now, }, Ok(Err(e)) => { - warn!("RPC compressible count failed: {}", e); + warn!( + event = "api_server_compressible_count_failed", + run_id = %run_id, + error = %e, + "RPC compressible count failed" + ); CompressibleSnapshot { data: CompressibleResponse { enabled: false, @@ -241,8 +263,10 @@ pub(crate) async fn fetch_compressible_snapshot( } Err(_) => { warn!( - "Compressible count timed out after {}s", - COMPRESSIBLE_FETCH_TIMEOUT.as_secs() + event = "api_server_compressible_count_timeout", + run_id = %run_id, + timeout_seconds = COMPRESSIBLE_FETCH_TIMEOUT.as_secs(), + "Compressible count timed out" ); CompressibleSnapshot { data: CompressibleResponse { @@ -268,9 +292,10 @@ async fn run_metrics_provider( client: reqwest::Client, prometheus_url: Option, mut shutdown: broadcast::Receiver<()>, + run_id: Arc, ) { loop { - let snapshot = fetch_metrics_snapshot(&client, &prometheus_url).await; + let snapshot = fetch_metrics_snapshot(&client, &prometheus_url, run_id.as_ref()).await; if tx.send(snapshot).is_err() { break; // all receivers dropped } @@ -279,7 +304,11 @@ async fn run_metrics_provider( _ = shutdown.recv() => break, } } - info!("Metrics provider stopped"); + info!( + event = "api_server_metrics_provider_stopped", + run_id = %run_id, + "Metrics provider stopped" + ); } /// Periodically fetches compressible counts and publishes via watch channel. @@ -288,6 +317,8 @@ async fn run_compressible_provider( trackers: Option, rpc_url: String, mut shutdown: broadcast::Receiver<()>, + helius_rpc: bool, + run_id: Arc, ) { // In-memory trackers are cheap (.len()); RPC is expensive (getProgramAccounts) let interval = if trackers.is_some() { @@ -297,7 +328,8 @@ async fn run_compressible_provider( }; loop { - let snapshot = fetch_compressible_snapshot(&trackers, &rpc_url).await; + let snapshot = + fetch_compressible_snapshot(&trackers, &rpc_url, helius_rpc, run_id.as_ref()).await; if tx.send(snapshot).is_err() { break; } @@ -306,7 +338,11 @@ async fn run_compressible_provider( _ = shutdown.recv() => break, } } - info!("Compressible provider stopped"); + info!( + event = "api_server_compressible_provider_stopped", + run_id = %run_id, + "Compressible provider stopped" + ); } // --------------------------------------------------------------------------- @@ -319,26 +355,40 @@ async fn run_compressible_provider( /// An `ApiServerHandle` that can be used to trigger graceful shutdown pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle { let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let run_id_for_handle = config.run_id.clone(); let thread_handle = std::thread::spawn(move || { + let run_id = config.run_id.clone(); let rt = match tokio::runtime::Runtime::new() { Ok(rt) => rt, Err(e) => { - error!("Failed to create tokio runtime for API server: {}", e); + error!( + event = "api_server_runtime_create_failed", + run_id = %run_id, + error = %e, + "Failed to create tokio runtime for API server" + ); return; } }; rt.block_on(async move { let addr = if config.allow_public_bind { warn!( - "API server binding to 0.0.0.0:{} - endpoints will be publicly accessible", - config.port + event = "api_server_public_bind_enabled", + run_id = %run_id, + port = config.port, + "API server binding to 0.0.0.0; endpoints will be publicly accessible" ); SocketAddr::from(([0, 0, 0, 0], config.port)) } else { SocketAddr::from(([127, 0, 0, 1], config.port)) }; - info!("Starting HTTP API server on {}", addr); + info!( + event = "api_server_started", + run_id = %run_id, + address = %addr, + "Starting HTTP API server" + ); // Shared HTTP client with timeout for external requests (Prometheus) let http_client = reqwest::Client::builder() @@ -369,6 +419,7 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle { http_client.clone(), config.prometheus_url.clone(), provider_shutdown_tx.subscribe(), + run_id.clone(), )); tokio::spawn(run_compressible_provider( @@ -376,6 +427,8 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle { trackers, config.rpc_url.clone(), provider_shutdown_tx.subscribe(), + config.helius_rpc, + run_id.clone(), )); let cors = warp::cors() @@ -391,8 +444,10 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle { // --- Status route (unchanged — per-request RPC call) --- let rpc_url_for_status = config.rpc_url.clone(); + let run_id_for_status = run_id.clone(); let status_route = warp::path("status").and(warp::get()).and_then(move || { let rpc_url = rpc_url_for_status.clone(); + let run_id = run_id_for_status.clone(); async move { let timeout_duration = Duration::from_secs(STATUS_TIMEOUT_SECS); match tokio::time::timeout(timeout_duration, get_forester_status(&rpc_url)) @@ -403,7 +458,12 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle { warp::http::StatusCode::OK, )), Ok(Err(e)) => { - error!("Failed to get forester status: {:?}", e); + error!( + event = "api_server_status_fetch_failed", + run_id = %run_id, + error = ?e, + "Failed to get forester status" + ); let error_response = ErrorResponse { error: format!("Failed to get forester status: {}", e), }; @@ -414,8 +474,10 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle { } Err(_elapsed) => { error!( - "Forester status request timed out after {}s", - STATUS_TIMEOUT_SECS + event = "api_server_status_timeout", + run_id = %run_id, + timeout_seconds = STATUS_TIMEOUT_SECS, + "Forester status request timed out" ); let error_response = ErrorResponse { error: format!( @@ -453,21 +515,33 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle { warp::serve(routes) .bind(addr) .await - .graceful(async move { - let _ = shutdown_rx.await; - info!("API server received shutdown signal"); - // Signal providers to stop - let _ = provider_shutdown_tx.send(()); + .graceful({ + let run_id_for_shutdown = run_id.clone(); + async move { + let _ = shutdown_rx.await; + info!( + event = "api_server_shutdown_signal_received", + run_id = %run_id_for_shutdown, + "API server received shutdown signal" + ); + // Signal providers to stop + let _ = provider_shutdown_tx.send(()); + } }) .run() .await; - info!("API server shut down gracefully"); + info!( + event = "api_server_stopped", + run_id = %run_id, + "API server shut down gracefully" + ); }); }); ApiServerHandle { thread_handle, shutdown_tx, + run_id: run_id_for_handle, } } diff --git a/forester/src/cli.rs b/forester/src/cli.rs index 8da994cc17..05aef7fc2c 100644 --- a/forester/src/cli.rs +++ b/forester/src/cli.rs @@ -36,7 +36,7 @@ pub struct StartArgs { env = "INDEXER_URL", help = "Photon indexer URL. API key can be included as query param: https://host?api-key=KEY" )] - pub indexer_url: Option, + pub indexer_url: String, #[arg(long, env = "PROVER_URL")] pub prover_url: Option, @@ -270,6 +270,14 @@ pub struct StartArgs { )] pub api_server_public_bind: bool, + #[arg( + long, + env = "HELIUS_RPC", + help = "Use Helius getProgramAccountsV2 for compressible account queries (default: standard getProgramAccounts)", + default_value = "false" + )] + pub helius_rpc: bool, + #[arg( long, env = "GROUP_AUTHORITY", @@ -438,6 +446,7 @@ mod tests { "forester", "--processor-mode", "v1", "--rpc-url", "http://test.com", + "--indexer-url", "http://indexer.test.com", "--payer", "[1,2,3]", "--derivation", "[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]" ]).unwrap(); @@ -448,6 +457,7 @@ mod tests { "forester", "--processor-mode", "v2", "--rpc-url", "http://test.com", + "--indexer-url", "http://indexer.test.com", "--payer", "[1,2,3]", "--derivation", "[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]" ]).unwrap(); @@ -457,6 +467,7 @@ mod tests { let args = StartArgs::try_parse_from([ "forester", "--rpc-url", "http://test.com", + "--indexer-url", "http://indexer.test.com", "--payer", "[1,2,3]", "--derivation", "[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]" ]).unwrap(); @@ -467,6 +478,7 @@ mod tests { "forester", "--processor-mode", "invalid-mode", "--rpc-url", "http://test.com", + "--indexer-url", "http://indexer.test.com", "--payer", "[1,2,3]", "--derivation", "[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]" ]); diff --git a/forester/src/compressible/bootstrap_helpers.rs b/forester/src/compressible/bootstrap_helpers.rs index 625aa97bf7..e6f622f915 100644 --- a/forester/src/compressible/bootstrap_helpers.rs +++ b/forester/src/compressible/bootstrap_helpers.rs @@ -227,6 +227,13 @@ pub fn is_localhost(rpc_url: &str) -> bool { rpc_url.contains("localhost") || rpc_url.contains("127.0.0.1") } +/// Whether to use Helius `getProgramAccountsV2` instead of standard +/// `getProgramAccounts`. Returns `true` only when `--helius-rpc` CLI flag +/// is set **and** the URL is not localhost. +pub fn use_helius_rpc(rpc_url: &str, helius_rpc_flag: bool) -> bool { + helius_rpc_flag && !is_localhost(rpc_url) +} + /// Generic bootstrap using standard getProgramAccounts API /// /// Calls `process_fn` for each account that passes initial extraction. @@ -390,6 +397,7 @@ pub async fn run_bootstrap( shutdown_rx: Option>, process_fn: F, label: &str, + helius_rpc: bool, ) -> Result where F: FnMut(RawAccountData) -> bool, @@ -415,8 +423,8 @@ where label, program_id ); - let result = if is_localhost(rpc_url) { - debug!("Detected localhost, using standard getProgramAccounts"); + let result = if !use_helius_rpc(rpc_url, helius_rpc) { + debug!("Using standard getProgramAccounts"); let api_result = bootstrap_standard_api( &client, rpc_url, @@ -489,13 +497,14 @@ pub async fn count_program_accounts( rpc_url: &str, program_id: &Pubkey, filters: Option>, + helius_rpc: bool, ) -> Result { let client = reqwest::Client::builder() .timeout(Duration::from_secs(30)) .build() .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {}", e))?; - if is_localhost(rpc_url) { + if !use_helius_rpc(rpc_url, helius_rpc) { let mut params = json!({ "encoding": "base64", "commitment": "confirmed", @@ -563,7 +572,10 @@ pub async fn count_program_accounts( /// Uses `count_program_accounts` with `dataSlice` to minimize /// bandwidth. Both queries run concurrently. Errors are propagated so callers /// can distinguish "0 accounts" from "RPC failure". -pub async fn count_compressible_accounts(rpc_url: &str) -> Result<(usize, usize)> { +pub async fn count_compressible_accounts( + rpc_url: &str, + helius_rpc: bool, +) -> Result<(usize, usize)> { let program_id = Pubkey::new_from_array(light_token_interface::LIGHT_TOKEN_PROGRAM_ID); let ctoken_filters = vec![json!({"memcmp": { @@ -579,8 +591,8 @@ pub async fn count_compressible_accounts(rpc_url: &str) -> Result<(usize, usize) }})]; let (ctoken_result, mint_result) = tokio::join!( - count_program_accounts(rpc_url, &program_id, Some(ctoken_filters)), - count_program_accounts(rpc_url, &program_id, Some(mint_filters)), + count_program_accounts(rpc_url, &program_id, Some(ctoken_filters), helius_rpc), + count_program_accounts(rpc_url, &program_id, Some(mint_filters), helius_rpc), ); Ok((ctoken_result?, mint_result?)) diff --git a/forester/src/compressible/ctoken/bootstrap.rs b/forester/src/compressible/ctoken/bootstrap.rs index caf03b79d9..a85e3e28b2 100644 --- a/forester/src/compressible/ctoken/bootstrap.rs +++ b/forester/src/compressible/ctoken/bootstrap.rs @@ -11,7 +11,7 @@ use super::state::CTokenAccountTracker; use crate::{ compressible::{ bootstrap_helpers::{ - bootstrap_standard_api, bootstrap_v2_api, is_localhost, RawAccountData, + bootstrap_standard_api, bootstrap_v2_api, use_helius_rpc, RawAccountData, }, config::{ACCOUNT_TYPE_OFFSET, CTOKEN_ACCOUNT_TYPE_FILTER}, }, @@ -24,6 +24,7 @@ pub async fn bootstrap_ctoken_accounts( rpc_url: String, tracker: Arc, shutdown_rx: Option>, + helius_rpc: bool, ) -> Result<()> { info!("Starting bootstrap of CToken accounts"); @@ -88,8 +89,8 @@ pub async fn bootstrap_ctoken_accounts( true }; - if is_localhost(&rpc_url) { - info!("Detected localhost, using standard getProgramAccounts"); + if !use_helius_rpc(&rpc_url, helius_rpc) { + info!("Using standard getProgramAccounts"); let (total_fetched, total_inserted) = bootstrap_standard_api( &client, &rpc_url, diff --git a/forester/src/compressible/mint/bootstrap.rs b/forester/src/compressible/mint/bootstrap.rs index 104c8dd00c..27a55b059e 100644 --- a/forester/src/compressible/mint/bootstrap.rs +++ b/forester/src/compressible/mint/bootstrap.rs @@ -18,6 +18,7 @@ pub async fn bootstrap_mint_accounts( rpc_url: String, tracker: Arc, shutdown_rx: Option>, + helius_rpc: bool, ) -> Result<()> { // Light Token Program ID let program_id = @@ -50,6 +51,7 @@ pub async fn bootstrap_mint_accounts( shutdown_rx, process_account, "Mint", + helius_rpc, ) .await?; diff --git a/forester/src/compressible/pda/bootstrap.rs b/forester/src/compressible/pda/bootstrap.rs index 0142b56570..c2000070b8 100644 --- a/forester/src/compressible/pda/bootstrap.rs +++ b/forester/src/compressible/pda/bootstrap.rs @@ -7,7 +7,7 @@ use super::state::PdaAccountTracker; use crate::{ compressible::{ bootstrap_helpers::{ - bootstrap_standard_api, bootstrap_v2_api, is_localhost, RawAccountData, + bootstrap_standard_api, bootstrap_v2_api, use_helius_rpc, RawAccountData, }, config::PdaProgramConfig, traits::CompressibleTracker, @@ -20,6 +20,7 @@ pub async fn bootstrap_pda_accounts( rpc_url: String, tracker: Arc, shutdown_rx: Option>, + helius_rpc: bool, ) -> Result<()> { info!("Starting bootstrap of compressible PDA accounts"); @@ -54,8 +55,15 @@ pub async fn bootstrap_pda_accounts( program_config.program_id ); - let result = - bootstrap_program(&client, &rpc_url, &tracker, &program_config, &shutdown_flag).await; + let result = bootstrap_program( + &client, + &rpc_url, + &tracker, + &program_config, + &shutdown_flag, + helius_rpc, + ) + .await; if let Err(e) = result { error!( @@ -81,6 +89,7 @@ async fn bootstrap_program( tracker: &PdaAccountTracker, program_config: &PdaProgramConfig, shutdown_flag: &std::sync::atomic::AtomicBool, + helius_rpc: bool, ) -> Result<()> { let program_id = &program_config.program_id; @@ -108,7 +117,7 @@ async fn bootstrap_program( } })]); - if is_localhost(rpc_url) { + if !use_helius_rpc(rpc_url, helius_rpc) { let (total_fetched, total_inserted) = bootstrap_standard_api( client, rpc_url, diff --git a/forester/src/config.rs b/forester/src/config.rs index 0dc22b9227..42eee8d2ee 100644 --- a/forester/src/config.rs +++ b/forester/src/config.rs @@ -97,6 +97,8 @@ pub struct GeneralConfig { pub sleep_when_idle_ms: u64, pub queue_polling_mode: QueuePollingMode, pub group_authority: Option, + /// Use Helius getProgramAccountsV2 instead of standard getProgramAccounts + pub helius_rpc: bool, } impl Default for GeneralConfig { @@ -114,6 +116,7 @@ impl Default for GeneralConfig { sleep_when_idle_ms: 45_000, queue_polling_mode: QueuePollingMode::Indexer, group_authority: None, + helius_rpc: false, } } } @@ -121,35 +124,23 @@ impl Default for GeneralConfig { impl GeneralConfig { pub fn test_address_v2() -> Self { GeneralConfig { - slot_update_interval_seconds: 10, - tree_discovery_interval_seconds: 1, - enable_metrics: true, skip_v1_state_trees: true, skip_v1_address_trees: true, skip_v2_state_trees: true, - skip_v2_address_trees: false, - tree_ids: vec![], sleep_after_processing_ms: 50, sleep_when_idle_ms: 100, - queue_polling_mode: QueuePollingMode::Indexer, - group_authority: None, + ..Default::default() } } pub fn test_state_v2() -> Self { GeneralConfig { - slot_update_interval_seconds: 10, - tree_discovery_interval_seconds: 1, - enable_metrics: true, skip_v1_state_trees: true, skip_v1_address_trees: true, - skip_v2_state_trees: false, skip_v2_address_trees: true, - tree_ids: vec![], sleep_after_processing_ms: 50, sleep_when_idle_ms: 100, - queue_polling_mode: QueuePollingMode::Indexer, - group_authority: None, + ..Default::default() } } } @@ -245,12 +236,13 @@ impl ForesterConfig { .rpc_url .clone() .ok_or(ConfigError::MissingField { field: "rpc_url" })?; + let indexer_url = args.indexer_url.clone(); Ok(Self { external_services: ExternalServicesConfig { rpc_url, ws_rpc_url: args.ws_rpc_url.clone(), - indexer_url: args.indexer_url.clone(), + indexer_url: Some(indexer_url), prover_url: args.prover_url.clone(), prover_append_url: args .prover_append_url @@ -341,6 +333,7 @@ impl ForesterConfig { }) }) .transpose()?, + helius_rpc: args.helius_rpc, }, rpc_pool_config: RpcPoolConfig { max_size: args.rpc_pool_size, @@ -436,18 +429,10 @@ impl ForesterConfig { indexer_config: IndexerConfig::default(), transaction_config: TransactionConfig::default(), general_config: GeneralConfig { - slot_update_interval_seconds: 10, tree_discovery_interval_seconds: 60, enable_metrics: args.enable_metrics(), - skip_v1_state_trees: false, - skip_v2_state_trees: false, - skip_v1_address_trees: false, - skip_v2_address_trees: false, - tree_ids: vec![], - sleep_after_processing_ms: 10_000, - sleep_when_idle_ms: 45_000, queue_polling_mode: QueuePollingMode::OnChain, // Status uses on-chain reads - group_authority: None, + ..Default::default() }, rpc_pool_config: RpcPoolConfig { max_size: 10, diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 49da980118..b58cf20223 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -4,12 +4,12 @@ use std::{ atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, Arc, }, - time::Duration, + time::{Duration, SystemTime, UNIX_EPOCH}, }; use anyhow::{anyhow, Context}; use borsh::BorshSerialize; -use dashmap::{mapref::entry::Entry, DashMap}; +use dashmap::DashMap; use forester_utils::{ forester_epoch::{get_epoch_phases, Epoch, ForesterSlot, TreeAccounts, TreeForesterSchedule}, rpc_pool::SolanaRpcPool, @@ -41,7 +41,7 @@ use solana_sdk::{ use tokio::{ sync::{broadcast, broadcast::error::RecvError, mpsc, oneshot, Mutex}, task::JoinHandle, - time::{sleep, Instant}, + time::{sleep, Instant, MissedTickBehavior}, }; use tracing::{debug, error, info, info_span, instrument, trace, warn}; @@ -50,6 +50,7 @@ use crate::{ errors::{ ChannelError, ForesterError, InitializationError, RegistrationError, WorkReportError, }, + logging::{should_emit_rate_limited_warning, ServiceHeartbeat}, metrics::{push_metrics, queue_metric_update, update_forester_sol_balance}, pagerduty::send_pagerduty_alert, processor::{ @@ -84,6 +85,7 @@ type StateBatchProcessorMap = Arc>>)>>; type AddressBatchProcessorMap = Arc>>)>>; +type ProcessorInitLockMap = Arc>>>; /// Timing for a single circuit type (circuit inputs + proof generation) #[derive(Copy, Clone, Debug, Default)] @@ -207,12 +209,16 @@ pub struct EpochManager { proof_caches: Arc>>, state_processors: StateBatchProcessorMap, address_processors: AddressBatchProcessorMap, + state_processor_init_locks: ProcessorInitLockMap, + address_processor_init_locks: ProcessorInitLockMap, compressible_tracker: Option>, pda_tracker: Option>, mint_tracker: Option>, /// Cached zkp_batch_size per tree to filter queue updates below threshold zkp_batch_sizes: Arc>, address_lookup_tables: Arc>, + heartbeat: Arc, + run_id: Arc, } impl Clone for EpochManager { @@ -234,11 +240,15 @@ impl Clone for EpochManager { proof_caches: self.proof_caches.clone(), state_processors: self.state_processors.clone(), address_processors: self.address_processors.clone(), + state_processor_init_locks: self.state_processor_init_locks.clone(), + address_processor_init_locks: self.address_processor_init_locks.clone(), compressible_tracker: self.compressible_tracker.clone(), pda_tracker: self.pda_tracker.clone(), mint_tracker: self.mint_tracker.clone(), zkp_batch_sizes: self.zkp_batch_sizes.clone(), address_lookup_tables: self.address_lookup_tables.clone(), + heartbeat: self.heartbeat.clone(), + run_id: self.run_id.clone(), } } } @@ -259,6 +269,8 @@ impl EpochManager { pda_tracker: Option>, mint_tracker: Option>, address_lookup_tables: Arc>, + heartbeat: Arc, + run_id: String, ) -> Result { let authority = Arc::new(config.payer_keypair.insecure_clone()); Ok(Self { @@ -278,11 +290,15 @@ impl EpochManager { proof_caches: Arc::new(DashMap::new()), state_processors: Arc::new(DashMap::new()), address_processors: Arc::new(DashMap::new()), + state_processor_init_locks: Arc::new(DashMap::new()), + address_processor_init_locks: Arc::new(DashMap::new()), compressible_tracker, pda_tracker, mint_tracker, zkp_batch_sizes: Arc::new(DashMap::new()), address_lookup_tables, + heartbeat, + run_id: Arc::::from(run_id), }) } @@ -324,7 +340,11 @@ impl EpochManager { balance_check_handle, ), |(h2, h3, h4)| { - info!("Aborting EpochManager background tasks"); + info!( + event = "background_tasks_aborting", + run_id = %self.run_id, + "Aborting EpochManager background tasks" + ); h2.abort(); h3.abort(); h4.abort(); @@ -336,16 +356,49 @@ impl EpochManager { epoch_opt = rx.recv() => { match epoch_opt { Some(epoch) => { - debug!("Received new epoch: {}", epoch); + debug!( + event = "epoch_queued_for_processing", + run_id = %self.run_id, + epoch, + "Received epoch from monitor" + ); let self_clone = Arc::clone(&self); tokio::spawn(async move { if let Err(e) = self_clone.process_epoch(epoch).await { - error!("Error processing epoch {}: {:?}", epoch, e); + if let Some(ForesterError::Registration( + RegistrationError::FinalizeRegistrationPhaseEnded { + epoch, + current_slot, + active_phase_end_slot, + }, + )) = e.downcast_ref::() + { + debug!( + event = "epoch_processing_skipped_finalize_registration_phase_ended", + run_id = %self_clone.run_id, + epoch = *epoch, + current_slot = *current_slot, + active_phase_end_slot = *active_phase_end_slot, + "Skipping epoch processing because FinalizeRegistration is no longer possible" + ); + } else { + error!( + event = "epoch_processing_failed", + run_id = %self_clone.run_id, + epoch, + error = ?e, + "Error processing epoch" + ); + } } }); } None => { - error!("Epoch monitor channel closed unexpectedly!"); + error!( + event = "epoch_monitor_channel_closed", + run_id = %self.run_id, + "Epoch monitor channel closed unexpectedly" + ); break Err(anyhow!( "Epoch monitor channel closed - forester cannot function without it" )); @@ -355,13 +408,27 @@ impl EpochManager { result = &mut monitor_handle => { match result { Ok(Ok(())) => { - error!("Epoch monitor exited unexpectedly with Ok(())"); + error!( + event = "epoch_monitor_exited_unexpected_ok", + run_id = %self.run_id, + "Epoch monitor exited unexpectedly with Ok(())" + ); } Ok(Err(e)) => { - error!("Epoch monitor exited with error: {:?}", e); + error!( + event = "epoch_monitor_exited_with_error", + run_id = %self.run_id, + error = ?e, + "Epoch monitor exited with error" + ); } Err(e) => { - error!("Epoch monitor task panicked or was cancelled: {:?}", e); + error!( + event = "epoch_monitor_task_failed", + run_id = %self.run_id, + error = ?e, + "Epoch monitor task panicked or was cancelled" + ); } } if let Some(pagerduty_key) = &self.config.external_services.pagerduty_routing_key { @@ -396,11 +463,26 @@ impl EpochManager { &self.config.payer_keypair.pubkey().to_string(), balance_in_sol, ); - debug!("Current SOL balance: {} SOL", balance_in_sol); + debug!( + event = "forester_balance_updated", + run_id = %self.run_id, + balance_sol = balance_in_sol, + "Current SOL balance updated" + ); } - Err(e) => error!("Failed to get balance: {:?}", e), + Err(e) => error!( + event = "forester_balance_fetch_failed", + run_id = %self.run_id, + error = ?e, + "Failed to get balance" + ), }, - Err(e) => error!("Failed to get RPC connection for balance check: {:?}", e), + Err(e) => error!( + event = "forester_balance_rpc_connection_failed", + run_id = %self.run_id, + error = ?e, + "Failed to get RPC connection for balance check" + ), } } } @@ -410,18 +492,37 @@ impl EpochManager { loop { match receiver.recv().await { Ok(new_tree) => { - info!("Received new tree: {:?}", new_tree); + info!( + event = "new_tree_received", + run_id = %self.run_id, + tree = %new_tree.merkle_tree, + tree_type = ?new_tree.tree_type, + "Received new tree" + ); if let Err(e) = self.add_new_tree(new_tree).await { - error!("Failed to add new tree: {:?}", e); + error!( + event = "new_tree_add_failed", + run_id = %self.run_id, + error = ?e, + "Failed to add new tree" + ); // Continue processing other trees instead of crashing } } Err(e) => match e { RecvError::Lagged(lag) => { - warn!("Lagged in receiving new trees: {:?}", lag); + warn!( + event = "new_tree_receiver_lagged", + run_id = %self.run_id, + lag, "Lagged while receiving new trees" + ); } RecvError::Closed => { - info!("New tree receiver closed"); + info!( + event = "new_tree_receiver_closed", + run_id = %self.run_id, + "New tree receiver closed" + ); break; } }, @@ -431,23 +532,54 @@ impl EpochManager { } async fn add_new_tree(&self, new_tree: TreeAccounts) -> Result<()> { - info!("Adding new tree: {:?}", new_tree); + info!( + event = "new_tree_add_started", + run_id = %self.run_id, + tree = %new_tree.merkle_tree, + tree_type = ?new_tree.tree_type, + "Adding new tree" + ); let mut trees = self.trees.lock().await; trees.push(new_tree); drop(trees); - info!("New tree added to the list of trees"); + info!( + event = "new_tree_added", + run_id = %self.run_id, + tree = %new_tree.merkle_tree, + "New tree added to tracked list" + ); let (current_slot, current_epoch) = self.get_current_slot_and_epoch().await?; let phases = get_epoch_phases(&self.protocol_config, current_epoch); // Check if we're currently in the active phase if current_slot >= phases.active.start && current_slot < phases.active.end { - info!("Currently in active phase. Attempting to process the new tree immediately."); - info!("Recovering registration info..."); + info!( + event = "new_tree_active_phase_injection", + run_id = %self.run_id, + tree = %new_tree.merkle_tree, + current_slot, + active_phase_start_slot = phases.active.start, + active_phase_end_slot = phases.active.end, + "In active phase; attempting immediate processing for new tree" + ); + info!( + event = "new_tree_recover_registration_started", + run_id = %self.run_id, + tree = %new_tree.merkle_tree, + epoch = current_epoch, + "Recovering registration info for new tree" + ); match self.recover_registration_info(current_epoch).await { Ok(mut epoch_info) => { - info!("Recovered registration info for current epoch"); + info!( + event = "new_tree_recover_registration_succeeded", + run_id = %self.run_id, + tree = %new_tree.merkle_tree, + epoch = current_epoch, + "Recovered registration info for current epoch" + ); let tree_schedule = TreeForesterSchedule::new_with_schedule( &new_tree, current_slot, @@ -458,8 +590,15 @@ impl EpochManager { let self_clone = Arc::new(self.clone()); - info!("Spawning task to process new tree in current epoch"); + info!( + event = "new_tree_processing_task_spawned", + run_id = %self.run_id, + tree = %new_tree.merkle_tree, + epoch = current_epoch, + "Spawning task to process new tree in current epoch" + ); tokio::spawn(async move { + let tree_pubkey = tree_schedule.tree_accounts.merkle_tree; if let Err(e) = self_clone .process_queue( &epoch_info.epoch, @@ -468,9 +607,20 @@ impl EpochManager { ) .await { - error!("Error processing queue for new tree: {:?}", e); + error!( + event = "new_tree_process_queue_failed", + run_id = %self_clone.run_id, + tree = %tree_pubkey, + error = ?e, + "Error processing queue for new tree" + ); } else { - info!("Successfully processed new tree in current epoch"); + info!( + event = "new_tree_process_queue_succeeded", + run_id = %self_clone.run_id, + tree = %tree_pubkey, + "Successfully processed new tree in current epoch" + ); } }); } @@ -482,19 +632,33 @@ impl EpochManager { ) { debug!("Not registered for current epoch yet, new tree will be picked up during next registration"); } else { - warn!("Failed to recover registration info for new tree: {:?}", e); + warn!( + event = "new_tree_recover_registration_failed", + run_id = %self.run_id, + tree = %new_tree.merkle_tree, + epoch = current_epoch, + error = ?e, + "Failed to recover registration info for new tree" + ); } } } info!( - "Injected new tree into current epoch {}: {:?}", - current_epoch, new_tree + event = "new_tree_injected_into_current_epoch", + run_id = %self.run_id, + tree = %new_tree.merkle_tree, + epoch = current_epoch, + "Injected new tree into current epoch" ); } else { info!( - "Not in active phase (current slot: {}, active start: {}). Tree will be picked up in next registration.", - current_slot, phases.active.start + event = "new_tree_queued_for_next_registration", + run_id = %self.run_id, + tree = %new_tree.merkle_tree, + current_slot, + active_phase_start_slot = phases.active.start, + "Not in active phase; new tree will be picked up in next registration" ); } @@ -507,15 +671,20 @@ impl EpochManager { let mut consecutive_failures = 0u32; const MAX_BACKOFF_SECS: u64 = 60; - info!("Starting epoch monitor"); + info!( + event = "epoch_monitor_started", + run_id = %self.run_id, + "Starting epoch monitor" + ); loop { let (slot, current_epoch) = match self.get_current_slot_and_epoch().await { Ok(result) => { if consecutive_failures > 0 { info!( - "Epoch monitor recovered after {} consecutive failures", - consecutive_failures + event = "epoch_monitor_recovered", + run_id = %self.run_id, + consecutive_failures, "Epoch monitor recovered after failures" ); } consecutive_failures = 0; @@ -528,13 +697,21 @@ impl EpochManager { if consecutive_failures == 1 { warn!( - "Epoch monitor: failed to get slot/epoch: {:?}. Retrying in {:?}", - e, backoff + event = "epoch_monitor_slot_epoch_failed", + run_id = %self.run_id, + consecutive_failures, + error = ?e, + backoff_ms = backoff.as_millis() as u64, + "Epoch monitor failed to get slot/epoch; retrying" ); } else if consecutive_failures.is_multiple_of(10) { error!( - "Epoch monitor: {} consecutive failures, last error: {:?}. Still retrying every {:?}", - consecutive_failures, e, backoff + event = "epoch_monitor_slot_epoch_failed_repeated", + run_id = %self.run_id, + consecutive_failures, + error = ?e, + backoff_ms = backoff.as_millis() as u64, + "Epoch monitor still failing repeatedly" ); } @@ -544,19 +721,36 @@ impl EpochManager { }; debug!( - "last_epoch: {:?}, current_epoch: {:?}, slot: {:?}", - last_epoch, current_epoch, slot + event = "epoch_monitor_tick", + run_id = %self.run_id, + last_epoch = ?last_epoch, + current_epoch, + slot, + "Epoch monitor tick" ); if last_epoch.is_none_or(|last| current_epoch > last) { - debug!("New epoch detected: {}", current_epoch); + debug!( + event = "epoch_monitor_new_epoch_detected", + run_id = %self.run_id, + epoch = current_epoch, + "New epoch detected" + ); let phases = get_epoch_phases(&self.protocol_config, current_epoch); if slot < phases.registration.end { - debug!("Sending current epoch {} for processing", current_epoch); + debug!( + event = "epoch_monitor_send_current_epoch", + run_id = %self.run_id, + epoch = current_epoch, + "Sending current epoch for processing" + ); if let Err(e) = tx.send(current_epoch).await { error!( - "Failed to send current epoch {} for processing: {:?}. Channel closed, exiting.", - current_epoch, e + event = "epoch_monitor_send_current_epoch_failed", + run_id = %self.run_id, + epoch = current_epoch, + error = ?e, + "Failed to send current epoch for processing; channel closed" ); return Err(anyhow!("Epoch channel closed: {}", e)); } @@ -577,7 +771,13 @@ impl EpochManager { let mut rpc = match self.rpc_pool.get_connection().await { Ok(rpc) => rpc, Err(e) => { - warn!("Failed to get RPC connection for slot waiting: {:?}", e); + warn!( + event = "epoch_monitor_wait_rpc_connection_failed", + run_id = %self.run_id, + target_epoch, + error = ?e, + "Failed to get RPC connection while waiting for registration slot" + ); tokio::time::sleep(Duration::from_secs(1)).await; break; } @@ -591,36 +791,59 @@ impl EpochManager { let slots_to_wait = wait_target.saturating_sub(slot); debug!( - "Waiting for epoch {} registration phase. Current slot: {}, Wait target: {} (registration starts at {}), Slots to wait: {}", - target_epoch, slot, wait_target, target_phases.registration.start, slots_to_wait + event = "epoch_monitor_wait_for_registration", + run_id = %self.run_id, + target_epoch, + current_slot = slot, + wait_target_slot = wait_target, + registration_start_slot = target_phases.registration.start, + slots_to_wait, + "Waiting for target epoch registration phase" ); if let Err(e) = wait_until_slot_reached(&mut *rpc, &self.slot_tracker, wait_target) .await { - error!("Error waiting for registration phase: {:?}", e); + error!( + event = "epoch_monitor_wait_for_registration_failed", + run_id = %self.run_id, + target_epoch, + error = ?e, + "Error waiting for registration phase" + ); break; } let current_slot = self.slot_tracker.estimated_current_slot(); if current_slot >= target_phases.registration.end { debug!( - "Epoch {} registration ended while waiting (current slot {} >= end {}), trying next epoch", - target_epoch, current_slot, target_phases.registration.end + event = "epoch_monitor_registration_ended_while_waiting", + run_id = %self.run_id, + target_epoch, + current_slot, + registration_end_slot = target_phases.registration.end, + "Target epoch registration ended while waiting; trying next epoch" ); target_epoch += 1; continue; } debug!( - "Epoch {} registration phase ready, sending for processing (current slot: {}, registration end: {})", - target_epoch, current_slot, target_phases.registration.end + event = "epoch_monitor_send_target_epoch_after_wait", + run_id = %self.run_id, + target_epoch, + current_slot, + registration_end_slot = target_phases.registration.end, + "Target epoch registration phase ready; sending for processing" ); if let Err(e) = tx.send(target_epoch).await { error!( - "Failed to send epoch {} for processing: {:?}", - target_epoch, e + event = "epoch_monitor_send_target_epoch_failed", + run_id = %self.run_id, + target_epoch, + error = ?e, + "Failed to send target epoch for processing" ); break; } @@ -631,13 +854,20 @@ impl EpochManager { // If we're within the registration window, send it if slot < target_phases.registration.end { debug!( - "Epoch {} registration phase is open (slot {} < end {}), sending for processing", - target_epoch, slot, target_phases.registration.end + event = "epoch_monitor_send_target_epoch_window_open", + run_id = %self.run_id, + target_epoch, + slot, + registration_end_slot = target_phases.registration.end, + "Target epoch registration window is open; sending for processing" ); if let Err(e) = tx.send(target_epoch).await { error!( - "Failed to send epoch {} for processing: {:?}", - target_epoch, e + event = "epoch_monitor_send_target_epoch_failed", + run_id = %self.run_id, + target_epoch, + error = ?e, + "Failed to send target epoch for processing" ); break; } @@ -647,8 +877,12 @@ impl EpochManager { // Registration already ended, try next epoch debug!( - "Epoch {} registration already ended (slot {} >= end {}), checking next epoch", - target_epoch, slot, target_phases.registration.end + event = "epoch_monitor_target_epoch_registration_closed", + run_id = %self.run_id, + target_epoch, + slot, + registration_end_slot = target_phases.registration.end, + "Target epoch registration already ended; checking next epoch" ); target_epoch += 1; } @@ -718,7 +952,13 @@ impl EpochManager { if slot > current_phases.registration.start { debug!("Processing previous epoch: {}", previous_epoch); if let Err(e) = tx.send(previous_epoch).await { - error!("Failed to send previous epoch for processing: {:?}", e); + error!( + event = "initial_epoch_send_previous_failed", + run_id = %self.run_id, + epoch = previous_epoch, + error = ?e, + "Failed to send previous epoch for processing" + ); return Ok(()); } } @@ -731,7 +971,13 @@ impl EpochManager { current_epoch ); if let Err(e) = tx.send(current_epoch).await { - error!("Failed to send current epoch for processing: {:?}", e); + error!( + event = "initial_epoch_send_current_failed", + run_id = %self.run_id, + epoch = current_epoch, + error = ?e, + "Failed to send current epoch for processing" + ); return Ok(()); // Channel closed, exit gracefully } } else { @@ -752,20 +998,32 @@ impl EpochManager { current_epoch ); if let Err(e) = tx.send(current_epoch).await { - error!("Failed to send current epoch for processing: {:?}", e); + error!( + event = "initial_epoch_send_current_registered_failed", + run_id = %self.run_id, + epoch = current_epoch, + error = ?e, + "Failed to send current epoch for processing" + ); return Ok(()); // Channel closed, exit gracefully } } else { - warn!( - "Skipping current epoch {} - registration ended at slot {} (current slot: {})", - current_epoch, current_phases.registration.end, slot + info!( + event = "skip_current_epoch_registration_closed", + run_id = %self.run_id, + epoch = current_epoch, + registration_end_slot = current_phases.registration.end, + current_slot = slot, + "Skipping current epoch because registration has ended" ); } } Err(e) => { warn!( - "Failed to get RPC connection to check registration, skipping: {:?}", - e + event = "registration_check_rpc_failed", + run_id = %self.run_id, + error = ?e, + "Failed to get RPC connection to check registration, skipping" ); } } @@ -818,7 +1076,13 @@ impl EpochManager { epoch ); } else { - warn!("Failed to recover registration info: {:?}", e); + warn!( + event = "recover_registration_info_failed", + run_id = %self.run_id, + epoch, + error = ?e, + "Failed to recover registration info" + ); } // Attempt to register match self @@ -839,9 +1103,16 @@ impl EpochManager { next_phases.registration.start.saturating_sub(current_slot); info!( - "Too late to register for epoch {} (registration ended at slot {}, current slot: {}). Next available epoch: {}. Registration opens at slot {} ({} slots to wait).", - failed_epoch, registration_end, current_slot, next_epoch, next_phases.registration.start, slots_to_wait - ); + event = "registration_window_missed", + run_id = %self.run_id, + failed_epoch, + registration_end_slot = registration_end, + current_slot, + next_epoch, + next_registration_start_slot = next_phases.registration.start, + slots_to_wait, + "Too late to register for requested epoch; next epoch will be used" + ); return Ok(()); } Err(e) => return Err(e.into()), @@ -870,16 +1141,25 @@ impl EpochManager { if self.sync_slot().await? < phases.report_work.end { self.report_work_onchain(®istration_info).await?; } else { + let current_slot = self.slot_tracker.estimated_current_slot(); info!( - "Skipping on-chain work report for epoch {} (report_work phase ended)", - registration_info.epoch.epoch + event = "skip_onchain_work_report_phase_ended", + run_id = %self.run_id, + epoch = registration_info.epoch.epoch, + current_slot, + report_work_end_slot = phases.report_work.end, + "Skipping on-chain work report because report_work phase has ended" ); } // TODO: implement // self.claim(®istration_info).await?; - info!("Exiting process_epoch"); + info!( + event = "process_epoch_completed", + run_id = %self.run_id, + epoch, "Exiting process_epoch" + ); Ok(()) } @@ -920,8 +1200,13 @@ impl EpochManager { if slot < phases.registration.start { let slots_to_wait = phases.registration.start.saturating_sub(slot); info!( - "Registration for epoch {} hasn't started yet (current slot: {}, starts at: {}). Waiting {} slots...", - epoch, slot, phases.registration.start, slots_to_wait + event = "registration_wait_for_window", + run_id = %self.run_id, + epoch, + current_slot = slot, + registration_start_slot = phases.registration.start, + slots_to_wait, + "Registration window not open yet; waiting" ); let wait_duration = slot_duration() * slots_to_wait as u32; sleep(wait_duration).await; @@ -932,10 +1217,13 @@ impl EpochManager { Ok(registration_info) => return Ok(registration_info), Err(e) => { warn!( - "Failed to register for epoch {} (attempt {}): {:?}", + event = "registration_attempt_failed", + run_id = %self.run_id, epoch, - attempt + 1, - e + attempt = attempt + 1, + max_attempts = max_retries, + error = ?e, + "Failed to register for epoch; retrying" ); if attempt < max_retries - 1 { sleep(retry_delay).await; @@ -954,7 +1242,13 @@ impl EpochManager { ) .await { - error!("Failed to send PagerDuty alert: {:?}", alert_err); + error!( + event = "pagerduty_alert_failed", + run_id = %self.run_id, + epoch, + error = ?alert_err, + "Failed to send PagerDuty alert" + ); } } return Err(ForesterError::Other(e)); @@ -972,7 +1266,11 @@ impl EpochManager { #[instrument(level = "debug", skip(self), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch ))] async fn register_for_epoch(&self, epoch: u64) -> Result { - info!("Registering for epoch: {}", epoch); + info!( + event = "registration_attempt_started", + run_id = %self.run_id, + epoch, "Registering for epoch" + ); let mut rpc = LightClient::new(LightClientConfig { url: self.config.external_services.rpc_url.to_string(), photon_url: self.config.external_services.indexer_url.clone(), @@ -992,8 +1290,9 @@ impl EpochManager { if let Some(existing_pda) = existing_registration { info!( - "Already registered for epoch {}. Recovering registration info.", - epoch + event = "registration_already_exists", + run_id = %self.run_id, + epoch, "Already registered for epoch; recovering registration info" ); let registration_info = self .recover_registration_info_internal( @@ -1067,8 +1366,12 @@ impl EpochManager { Ok(registration_info) } else if slot < phases.registration.start { warn!( - "Too early to register for epoch {}. Current slot: {}, Registration starts: {}", - epoch, slot, phases.registration.start + event = "registration_too_early", + run_id = %self.run_id, + epoch, + current_slot = slot, + registration_start_slot = phases.registration.start, + "Too early to register for epoch" ); Err(RegistrationError::RegistrationPhaseNotStarted { epoch, @@ -1078,8 +1381,12 @@ impl EpochManager { .into()) } else { warn!( - "Too late to register for epoch {}. Current slot: {}, Registration end: {}", - epoch, slot, phases.registration.end + event = "registration_too_late", + run_id = %self.run_id, + epoch, + current_slot = slot, + registration_end_slot = phases.registration.end, + "Too late to register for epoch" ); Err(RegistrationError::RegistrationPhaseEnded { epoch, @@ -1144,16 +1451,26 @@ impl EpochManager { if current_slot >= active_phase_start_slot { info!( - "Active phase has already started. Current slot: {}. Active phase start slot: {}. Slots left: {}.", - current_slot, active_phase_start_slot, active_phase_end_slot.saturating_sub(current_slot) + event = "active_phase_already_started", + run_id = %self.run_id, + current_slot, + active_phase_start_slot, + active_phase_end_slot, + slots_left = active_phase_end_slot.saturating_sub(current_slot), + "Active phase has already started" ); } else { let waiting_slots = active_phase_start_slot - current_slot; let waiting_secs = waiting_slots / 2; - info!("Waiting for active phase to start. Current slot: {}. Active phase start slot: {}. Waiting time: ~ {} seconds", + info!( + event = "wait_for_active_phase", + run_id = %self.run_id, current_slot, active_phase_start_slot, - waiting_secs); + waiting_slots, + approx_wait_seconds = waiting_secs, + "Waiting for active phase to start" + ); } self.prewarm_all_trees_during_wait(epoch_info, active_phase_start_slot) @@ -1175,13 +1492,19 @@ impl EpochManager { let current_slot = rpc.get_slot().await?; if current_slot > epoch_info.epoch.phases.active.end { info!( - "Skipping FinalizeRegistration for epoch {}: active phase ended (current slot: {}, end: {})", - epoch_info.epoch.epoch, current_slot, epoch_info.epoch.phases.active.end + event = "skip_finalize_registration_phase_ended", + run_id = %self.run_id, + epoch = epoch_info.epoch.epoch, + current_slot, + active_phase_end_slot = epoch_info.epoch.phases.active.end, + "Skipping FinalizeRegistration because active phase ended" ); - return Err(anyhow::anyhow!( - "Epoch {} active phase has ended, cannot finalize registration", - epoch_info.epoch.epoch - )); + return Err(RegistrationError::FinalizeRegistrationPhaseEnded { + epoch: epoch_info.epoch.epoch, + current_slot, + active_phase_end_slot: epoch_info.epoch.phases.active.end, + } + .into()); } // TODO: we can put this ix into every tx of the first batch of the current active phase @@ -1237,7 +1560,12 @@ impl EpochManager { debug!("Added compression tree to epoch {}", epoch_info.epoch.epoch); } - info!("Finished waiting for active phase"); + info!( + event = "active_phase_ready", + run_id = %self.run_id, + epoch = epoch_info.epoch.epoch, + "Finished waiting for active phase" + ); Ok(epoch_info) } @@ -1249,13 +1577,19 @@ impl EpochManager { fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch_info.epoch.epoch ))] async fn perform_active_work(&self, epoch_info: &ForesterEpochInfo) -> Result<()> { - info!("Performing active work"); + self.heartbeat.increment_active_cycle(); let current_slot = self.slot_tracker.estimated_current_slot(); let active_phase_end = epoch_info.epoch.phases.active.end; if !self.is_in_active_phase(current_slot, epoch_info)? { - info!("No longer in active phase. Skipping work."); + info!( + event = "active_work_skipped_not_in_phase", + run_id = %self.run_id, + current_slot, + active_phase_end, + "No longer in active phase. Skipping work." + ); return Ok(()); } @@ -1268,16 +1602,29 @@ impl EpochManager { .cloned() .collect(); + info!( + event = "active_work_cycle_started", + run_id = %self.run_id, + current_slot, + active_phase_end, + tree_count = trees_to_process.len(), + "Starting active work cycle" + ); + let self_arc = Arc::new(self.clone()); let epoch_info_arc = Arc::new(epoch_info.clone()); let mut handles: Vec>> = Vec::with_capacity(trees_to_process.len()); for tree in trees_to_process { - info!( - "Creating thread for tree {} (type: {:?})", - tree.tree_accounts.merkle_tree, tree.tree_accounts.tree_type + debug!( + event = "tree_processing_task_spawned", + run_id = %self.run_id, + tree = %tree.tree_accounts.merkle_tree, + tree_type = ?tree.tree_accounts.tree_type, + "Spawning tree processing task" ); + self.heartbeat.add_tree_tasks_spawned(1); let self_clone = self_arc.clone(); let epoch_info_clone = epoch_info_arc.clone(); @@ -1295,17 +1642,43 @@ impl EpochManager { handles.push(handle); } - info!("Waiting for {} tree processing tasks", handles.len()); + debug!("Waiting for {} tree processing tasks", handles.len()); let results = join_all(handles).await; + let mut success_count = 0usize; + let mut error_count = 0usize; + let mut panic_count = 0usize; for result in results { match result { - Ok(Ok(())) => { - debug!("Queue processed successfully"); + Ok(Ok(())) => success_count += 1, + Ok(Err(e)) => { + error_count += 1; + error!( + event = "tree_processing_task_failed", + run_id = %self.run_id, + error = ?e, + "Error processing queue" + ); + } + Err(e) => { + panic_count += 1; + error!( + event = "tree_processing_task_panicked", + run_id = %self.run_id, + error = ?e, + "Tree processing task panicked" + ); } - Ok(Err(e)) => error!("Error processing queue: {:?}", e), - Err(e) => error!("Task panicked: {:?}", e), } } + info!( + event = "active_work_cycle_completed", + run_id = %self.run_id, + tree_tasks = success_count + error_count + panic_count, + succeeded = success_count, + failed = error_count, + panicked = panic_count, + "Active work cycle completed" + ); debug!("Waiting for active phase to end"); let mut rpc = self.rpc_pool.get_connection().await?; @@ -1332,19 +1705,23 @@ impl EpochManager { epoch_pda: &ForesterEpochPda, mut tree_schedule: TreeForesterSchedule, ) -> Result<()> { + self.heartbeat.increment_queue_started(); let mut current_slot = self.slot_tracker.estimated_current_slot(); let total_slots = tree_schedule.slots.len(); let eligible_slots = tree_schedule.slots.iter().filter(|s| s.is_some()).count(); let tree_type = tree_schedule.tree_accounts.tree_type; - info!( - "process_queue tree={}, total_slots={}, eligible_slots={}, current_slot={}, active_phase_end={}", - tree_schedule.tree_accounts.merkle_tree, + debug!( + event = "process_queue_started", + run_id = %self.run_id, + tree = %tree_schedule.tree_accounts.merkle_tree, + tree_type = ?tree_type, total_slots, eligible_slots, current_slot, - epoch_info.phases.active.end + active_phase_end = epoch_info.phases.active.end, + "Processing queue for tree" ); 'outer_slot_loop: while current_slot < epoch_info.phases.active.end { @@ -1389,16 +1766,21 @@ impl EpochManager { } Err(e) => { error!( - "Error processing light slot {:?}: {:?}", - light_slot_details.slot, e + event = "light_slot_processing_error", + run_id = %self.run_id, + light_slot = light_slot_details.slot, + error = ?e, + "Error processing light slot" ); } } tree_schedule.slots[slot_idx] = None; } else { - info!( - "No further eligible slots in schedule for tree {}", - tree_schedule.tree_accounts.merkle_tree + debug!( + event = "process_queue_no_eligible_slots", + run_id = %self.run_id, + tree = %tree_schedule.tree_accounts.merkle_tree, + "No further eligible slots in schedule" ); break 'outer_slot_loop; } @@ -1406,9 +1788,12 @@ impl EpochManager { current_slot = self.slot_tracker.estimated_current_slot(); } - info!( - "Exiting process_queue for tree {}", - tree_schedule.tree_accounts.merkle_tree + self.heartbeat.increment_queue_finished(); + debug!( + event = "process_queue_finished", + run_id = %self.run_id, + tree = %tree_schedule.tree_accounts.merkle_tree, + "Exiting process_queue" ); Ok(()) } @@ -1426,12 +1811,15 @@ impl EpochManager { tree_accounts: &TreeAccounts, forester_slot_details: &ForesterSlot, ) -> Result<()> { - info!( - "Processing slot {} ({}-{}) epoch {}", - forester_slot_details.slot, - forester_slot_details.start_solana_slot, - forester_slot_details.end_solana_slot, - epoch_info.epoch + debug!( + event = "light_slot_processing_started", + run_id = %self.run_id, + tree = %tree_accounts.merkle_tree, + epoch = epoch_info.epoch, + light_slot = forester_slot_details.slot, + slot_start = forester_slot_details.start_solana_slot, + slot_end = forester_slot_details.end_solana_slot, + "Processing light slot" ); let mut rpc = self.rpc_pool.get_connection().await?; wait_until_slot_reached( @@ -1454,7 +1842,15 @@ impl EpochManager { let current_light_slot = (estimated_slot - epoch_info.phases.active.start) / epoch_pda.protocol_config.slot_length; if current_light_slot != forester_slot_details.slot { - warn!("Light slot mismatch. Exiting processing for this slot."); + warn!( + event = "light_slot_mismatch", + run_id = %self.run_id, + tree = %tree_accounts.merkle_tree, + expected_light_slot = forester_slot_details.slot, + actual_light_slot = current_light_slot, + estimated_slot, + "Light slot mismatch; exiting processing for this slot" + ); break 'inner_processing_loop; } @@ -1486,16 +1882,23 @@ impl EpochManager { Ok(count) => count, Err(e) => { error!( - "Failed processing in slot {:?}: {:?}", - forester_slot_details.slot, e + event = "light_slot_processing_failed", + run_id = %self.run_id, + tree = %tree_accounts.merkle_tree, + light_slot = forester_slot_details.slot, + error = ?e, + "Failed processing in light slot" ); break 'inner_processing_loop; } }; if items_processed_this_iteration > 0 { debug!( - "Processed {} items in slot {:?}", - items_processed_this_iteration, forester_slot_details.slot + event = "light_slot_items_processed", + run_id = %self.run_id, + light_slot = forester_slot_details.slot, + items = items_processed_this_iteration, + "Processed items in light slot" ); } @@ -1507,7 +1910,21 @@ impl EpochManager { .await; if let Err(e) = push_metrics(&self.config.external_services.pushgateway_url).await { - warn!("Failed to push metrics: {:?}", e); + if should_emit_rate_limited_warning("push_metrics_v1", Duration::from_secs(30)) { + warn!( + event = "metrics_push_failed", + run_id = %self.run_id, + error = ?e, + "Failed to push metrics" + ); + } else { + debug!( + event = "metrics_push_failed_suppressed", + run_id = %self.run_id, + error = ?e, + "Suppressing repeated metrics push failure" + ); + } } estimated_slot = self.slot_tracker.estimated_current_slot(); @@ -1535,12 +1952,15 @@ impl EpochManager { forester_slot_details: &ForesterSlot, consecutive_eligibility_end: u64, ) -> Result<()> { - info!( - "Processing V2 light slot {} ({}-{}, consecutive_end={})", - forester_slot_details.slot, - forester_slot_details.start_solana_slot, - forester_slot_details.end_solana_slot, - consecutive_eligibility_end + debug!( + event = "v2_light_slot_processing_started", + run_id = %self.run_id, + tree = %tree_accounts.merkle_tree, + light_slot = forester_slot_details.slot, + slot_start = forester_slot_details.start_solana_slot, + slot_end = forester_slot_details.end_solana_slot, + consecutive_eligibility_end_slot = consecutive_eligibility_end, + "Processing V2 light slot" ); let tree_pubkey = tree_accounts.merkle_tree; @@ -1562,8 +1982,12 @@ impl EpochManager { if items_sent > 0 { let cached_send_duration = cached_send_start.elapsed(); info!( - "Sent {} items from cache for tree {} in {:?}", - items_sent, tree_pubkey, cached_send_duration + event = "cached_proofs_sent", + run_id = %self.run_id, + tree = %tree_pubkey, + items = items_sent, + duration_ms = cached_send_duration.as_millis() as u64, + "Sent items from proof cache" ); self.update_metrics_and_counts(epoch_info.epoch, items_sent, cached_send_duration) .await; @@ -1587,7 +2011,15 @@ impl EpochManager { let current_light_slot = (estimated_slot - epoch_info.phases.active.start) / epoch_pda.protocol_config.slot_length; if current_light_slot != forester_slot_details.slot { - warn!("V2 slot mismatch. Exiting processing."); + warn!( + event = "v2_light_slot_mismatch", + run_id = %self.run_id, + tree = %tree_pubkey, + expected_light_slot = forester_slot_details.slot, + actual_light_slot = current_light_slot, + estimated_slot, + "V2 slot mismatch; exiting processing" + ); break 'inner_processing_loop; } @@ -1619,7 +2051,14 @@ impl EpochManager { { Ok(count) => { if count > 0 { - info!("V2 processed {} items for tree {}", count, tree_pubkey); + info!( + event = "v2_tree_processed_items", + run_id = %self.run_id, + tree = %tree_pubkey, + items = count, + epoch = epoch_info.epoch, + "V2 processed items for tree" + ); self.update_metrics_and_counts( epoch_info.epoch, count, @@ -1632,13 +2071,33 @@ impl EpochManager { } } Err(e) => { - error!("V2 processing failed for tree {}: {:?}", tree_pubkey, e); + error!( + event = "v2_tree_processing_failed", + run_id = %self.run_id, + tree = %tree_pubkey, + error = ?e, + "V2 processing failed for tree" + ); tokio::time::sleep(POLL_INTERVAL).await; } } if let Err(e) = push_metrics(&self.config.external_services.pushgateway_url).await { - warn!("Failed to push metrics: {:?}", e); + if should_emit_rate_limited_warning("push_metrics_v2", Duration::from_secs(30)) { + warn!( + event = "metrics_push_failed", + run_id = %self.run_id, + error = ?e, + "Failed to push metrics" + ); + } else { + debug!( + event = "metrics_push_failed_suppressed", + run_id = %self.run_id, + error = ?e, + "Suppressing repeated metrics push failure" + ); + } } estimated_slot = self.slot_tracker.estimated_current_slot(); } @@ -1680,16 +2139,26 @@ impl EpochManager { current_epoch_num, ) .map_err(|e| { - error!("Failed to calculate eligible forester index: {:?}", e); + error!( + event = "eligibility_index_calculation_failed", + run_id = %self.run_id, + queue = %queue_pubkey, + epoch = current_epoch_num, + light_slot = current_light_slot, + error = ?e, + "Failed to calculate eligible forester index" + ); anyhow::anyhow!("Eligibility calculation failed: {}", e) })?; if !epoch_pda.is_eligible(eligible_forester_slot_index) { warn!( - "Forester {} is no longer eligible to process tree {} in light slot {}.", - self.config.payer_keypair.pubkey(), - queue_pubkey, - current_light_slot + event = "forester_not_eligible_for_slot", + run_id = %self.run_id, + forester = %self.config.payer_keypair.pubkey(), + queue = %queue_pubkey, + light_slot = current_light_slot, + "Forester is no longer eligible to process this queue in current light slot" ); return Ok(false); } @@ -1781,10 +2250,12 @@ impl EpochManager { let num_batches = accounts.len().div_ceil(config.batch_size); info!( - "Processing {} compressible accounts in {} batches (batch_size={})", - accounts.len(), - num_batches, - config.batch_size + event = "compression_ctoken_started", + run_id = %self.run_id, + accounts = accounts.len(), + batches = num_batches, + batch_size = config.batch_size, + "Starting ctoken compression batches" ); let compressor = CTokenCompressor::new( @@ -1835,11 +2306,17 @@ impl EpochManager { // Signal cancellation to all other futures cancelled.store(true, Ordering::Relaxed); warn!( - "Cancelling compression: forester no longer eligible (current_slot={}, eligibility_end={})", + event = "compression_ctoken_cancelled_not_eligible", + run_id = %self.run_id, current_slot, - consecutive_eligibility_end + eligibility_end_slot = consecutive_eligibility_end, + "Cancelling compression because forester is no longer eligible" ); - return Err((batch_idx, batch.len(), anyhow!("Forester no longer eligible"))); + return Err(( + batch_idx, + batch.len(), + anyhow!("Forester no longer eligible"), + )); } debug!( @@ -1864,10 +2341,12 @@ impl EpochManager { } Err(e) => { error!( - "Compression batch {}/{} failed: {:?}", - batch_idx + 1, - num_batches, - e + event = "compression_ctoken_batch_failed", + run_id = %self.run_id, + batch = batch_idx + 1, + total_batches = num_batches, + error = ?e, + "Compression batch failed" ); Err((batch_idx, batch.len(), e)) } @@ -1887,29 +2366,36 @@ impl EpochManager { match result { Ok((batch_idx, count, sig)) => { info!( - "Successfully compressed {} accounts in batch {}/{}: {}", - count, - batch_idx + 1, - num_batches, - sig + event = "compression_ctoken_batch_succeeded", + run_id = %self.run_id, + batch = batch_idx + 1, + total_batches = num_batches, + accounts = count, + signature = %sig, + "Compression batch succeeded" ); total_compressed += count; } Err((batch_idx, count, e)) => { error!( - "Compression batch {}/{} ({} accounts) failed: {:?}", - batch_idx + 1, - num_batches, - count, - e + event = "compression_ctoken_batch_failed_final", + run_id = %self.run_id, + batch = batch_idx + 1, + total_batches = num_batches, + accounts = count, + error = ?e, + "Compression batch failed" ); } } } info!( - "Completed ctoken compression for epoch {}: compressed {} accounts", - epoch_info.epoch, total_compressed + event = "compression_ctoken_completed", + run_id = %self.run_id, + epoch = epoch_info.epoch, + compressed_accounts = total_compressed, + "Completed ctoken compression" ); // Process PDA compression if configured @@ -1917,7 +2403,12 @@ impl EpochManager { .dispatch_pda_compression(consecutive_eligibility_end) .await .unwrap_or_else(|e| { - error!("PDA compression failed: {:?}", e); + error!( + event = "compression_pda_dispatch_failed", + run_id = %self.run_id, + error = ?e, + "PDA compression failed" + ); 0 }); @@ -1926,14 +2417,25 @@ impl EpochManager { .dispatch_mint_compression(consecutive_eligibility_end) .await .unwrap_or_else(|e| { - error!("Mint compression failed: {:?}", e); + error!( + event = "compression_mint_dispatch_failed", + run_id = %self.run_id, + error = ?e, + "Mint compression failed" + ); 0 }); let total = total_compressed + pda_compressed + mint_compressed; info!( - "Completed all compression for epoch {}: {} ctoken + {} PDA + {} Mint = {} total", - epoch_info.epoch, total_compressed, pda_compressed, mint_compressed, total + event = "compression_all_completed", + run_id = %self.run_id, + epoch = epoch_info.epoch, + ctoken_compressed = total_compressed, + pda_compressed, + mint_compressed, + total_compressed = total, + "Completed all compression" ); Ok(total) } @@ -1986,9 +2488,11 @@ impl EpochManager { } info!( - "Processing {} compressible PDA accounts for program {}", - accounts.len(), - program_config.program_id + event = "compression_pda_program_started", + run_id = %self.run_id, + program = %program_config.program_id, + accounts = accounts.len(), + "Processing compressible PDA accounts for program" ); let pda_compressor = crate::compressible::pda::PdaCompressor::new( @@ -2002,8 +2506,11 @@ impl EpochManager { Ok(cfg) => cfg, Err(e) => { error!( - "Failed to fetch config for program {}: {:?}", - program_config.program_id, e + event = "compression_pda_program_config_failed", + run_id = %self.run_id, + program = %program_config.program_id, + error = ?e, + "Failed to fetch config for PDA program" ); continue; } @@ -2014,8 +2521,11 @@ impl EpochManager { if current_slot >= consecutive_eligibility_end { cancelled.store(true, Ordering::Relaxed); warn!( - "Stopping PDA compression: forester no longer eligible (current_slot={}, eligibility_end={})", - current_slot, consecutive_eligibility_end + event = "compression_pda_cancelled_not_eligible", + run_id = %self.run_id, + current_slot, + eligibility_end_slot = consecutive_eligibility_end, + "Stopping PDA compression because forester is no longer eligible" ); break; } @@ -2044,8 +2554,12 @@ impl EpochManager { Err((account_state, e)) => { if e.to_string() != "Cancelled" { error!( - "Failed to compress PDA {} for program {}: {:?}", - account_state.pubkey, program_config.program_id, e + event = "compression_pda_account_failed", + run_id = %self.run_id, + account = %account_state.pubkey, + program = %program_config.program_id, + error = ?e, + "Failed to compress PDA account" ); } } @@ -2053,7 +2567,12 @@ impl EpochManager { } } - info!("Completed PDA compression: {} accounts", total_compressed); + info!( + event = "compression_pda_completed", + run_id = %self.run_id, + compressed_accounts = total_compressed, + "Completed PDA compression" + ); Ok(total_compressed) } @@ -2085,9 +2604,11 @@ impl EpochManager { } info!( - "Processing {} compressible Mint accounts concurrently (max_concurrent={})", - accounts.len(), - config.max_concurrent_batches + event = "compression_mint_started", + run_id = %self.run_id, + accounts = accounts.len(), + max_concurrent = config.max_concurrent_batches, + "Processing compressible Mint accounts" ); let mint_compressor = crate::compressible::mint::MintCompressor::new( @@ -2114,13 +2635,24 @@ impl EpochManager { } Err((mint_state, e)) => { if e.to_string() != "Cancelled" { - error!("Failed to compress Mint {}: {:?}", mint_state.pubkey, e); + error!( + event = "compression_mint_account_failed", + run_id = %self.run_id, + mint = %mint_state.pubkey, + error = ?e, + "Failed to compress mint account" + ); } } } } - info!("Completed Mint compression: {} accounts", total_compressed); + info!( + event = "compression_mint_completed", + run_id = %self.run_id, + compressed_accounts = total_compressed, + "Completed Mint compression" + ); Ok(total_compressed) } @@ -2174,15 +2706,25 @@ impl EpochManager { if num_sent > 0 { debug!( - "processed {} items v1 tree {}", - num_sent, tree_accounts.merkle_tree + event = "v1_tree_items_processed", + run_id = %self.run_id, + tree = %tree_accounts.merkle_tree, + items = num_sent, + "Processed items for V1 tree" ); } match self.rollover_if_needed(tree_accounts).await { Ok(_) => Ok(num_sent), Err(e) => { - error!("Failed to rollover tree: {:?}", e); + error!( + event = "tree_rollover_failed", + run_id = %self.run_id, + tree = %tree_accounts.merkle_tree, + tree_type = ?tree_accounts.tree_type, + error = ?e, + "Failed to rollover tree" + ); Err(e) } } @@ -2201,6 +2743,7 @@ impl EpochManager { BatchContext { rpc_pool: self.rpc_pool.clone(), authority: self.authority.clone(), + run_id: self.run_id.clone(), derivation: self.config.derivation_pubkey, epoch: epoch_info.epoch, merkle_tree: tree_accounts.merkle_tree, @@ -2257,6 +2800,14 @@ impl EpochManager { epoch_info: &Epoch, tree_accounts: &TreeAccounts, ) -> Result>>> { + // Serialize initialization per tree to avoid duplicate expensive processor construction. + let init_lock = self + .state_processor_init_locks + .entry(tree_accounts.merkle_tree) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone(); + let _init_guard = init_lock.lock().await; + // First check if we already have a processor for this tree // We REUSE processors across epochs to preserve cached state for optimistic processing if let Some(entry) = self.state_processors.get(&tree_accounts.merkle_tree) { @@ -2295,17 +2846,11 @@ impl EpochManager { self.zkp_batch_sizes .insert(tree_accounts.merkle_tree, batch_size); - // Insert the new processor (or get existing if another task beat us to it) - match self.state_processors.entry(tree_accounts.merkle_tree) { - Entry::Occupied(occupied) => { - // Another task already inserted - use theirs (they may have cached state) - Ok(occupied.get().1.clone()) - } - Entry::Vacant(vacant) => { - vacant.insert((epoch_info.epoch, processor.clone())); - Ok(processor) - } - } + self.state_processors.insert( + tree_accounts.merkle_tree, + (epoch_info.epoch, processor.clone()), + ); + Ok(processor) } async fn get_or_create_address_processor( @@ -2313,6 +2858,14 @@ impl EpochManager { epoch_info: &Epoch, tree_accounts: &TreeAccounts, ) -> Result>>> { + // Serialize initialization per tree to avoid duplicate expensive processor construction. + let init_lock = self + .address_processor_init_locks + .entry(tree_accounts.merkle_tree) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone(); + let _init_guard = init_lock.lock().await; + if let Some(entry) = self.address_processors.get(&tree_accounts.merkle_tree) { let (stored_epoch, processor_ref) = entry.value(); let processor_clone = processor_ref.clone(); @@ -2347,14 +2900,11 @@ impl EpochManager { self.zkp_batch_sizes .insert(tree_accounts.merkle_tree, batch_size); - // Insert the new processor (or get existing if another task beat us to it) - match self.address_processors.entry(tree_accounts.merkle_tree) { - Entry::Occupied(occupied) => Ok(occupied.get().1.clone()), - Entry::Vacant(vacant) => { - vacant.insert((epoch_info.epoch, processor.clone())); - Ok(processor) - } - } + self.address_processors.insert( + tree_accounts.merkle_tree, + (epoch_info.epoch, processor.clone()), + ); + Ok(processor) } async fn process_v2( @@ -2387,28 +2937,52 @@ impl EpochManager { Err(e) => { if is_v2_error(&e, V2Error::is_constraint) { warn!( - "State processing hit constraint error for tree {}: {}. Dropping processor to flush cache.", - tree_accounts.merkle_tree, - e + event = "v2_state_constraint_error", + run_id = %self.run_id, + tree = %tree_accounts.merkle_tree, + error = %e, + "State processing hit constraint error. Dropping processor to flush cache." ); drop(proc); // Release lock before removing self.state_processors.remove(&tree_accounts.merkle_tree); self.proof_caches.remove(&tree_accounts.merkle_tree); Err(e) } else if is_v2_error(&e, V2Error::is_hashchain_mismatch) { - warn!( - "State processing hit hashchain mismatch for tree {}: {}. Clearing cache and retrying.", - tree_accounts.merkle_tree, - e + let warning_key = format!( + "v2_state_hashchain_mismatch:{}", + tree_accounts.merkle_tree ); + if should_emit_rate_limited_warning( + warning_key, + Duration::from_secs(15), + ) { + warn!( + event = "v2_state_hashchain_mismatch", + run_id = %self.run_id, + tree = %tree_accounts.merkle_tree, + error = %e, + "State processing hit hashchain mismatch. Clearing cache and retrying." + ); + } + self.heartbeat.increment_v2_recoverable_error(); proc.clear_cache().await; Ok(ProcessingResult::default()) } else { - warn!( - "Failed to process state queue for tree {}: {}. Will retry next tick without dropping processor.", - tree_accounts.merkle_tree, - e - ); + let warning_key = + format!("v2_state_process_failed:{}", tree_accounts.merkle_tree); + if should_emit_rate_limited_warning( + warning_key, + Duration::from_secs(10), + ) { + warn!( + event = "v2_state_process_failed_retrying", + run_id = %self.run_id, + tree = %tree_accounts.merkle_tree, + error = %e, + "Failed to process state queue. Will retry next tick without dropping processor." + ); + } + self.heartbeat.increment_v2_recoverable_error(); Ok(ProcessingResult::default()) } } @@ -2437,28 +3011,52 @@ impl EpochManager { Err(e) => { if is_v2_error(&e, V2Error::is_constraint) { warn!( - "Address processing hit constraint error for tree {}: {}. Dropping processor to flush cache.", - tree_accounts.merkle_tree, - e + event = "v2_address_constraint_error", + run_id = %self.run_id, + tree = %tree_accounts.merkle_tree, + error = %e, + "Address processing hit constraint error. Dropping processor to flush cache." ); drop(proc); self.address_processors.remove(&tree_accounts.merkle_tree); self.proof_caches.remove(&tree_accounts.merkle_tree); Err(e) } else if is_v2_error(&e, V2Error::is_hashchain_mismatch) { - warn!( - "Address processing hit hashchain mismatch for tree {}: {}. Clearing cache and retrying.", - tree_accounts.merkle_tree, - e + let warning_key = format!( + "v2_address_hashchain_mismatch:{}", + tree_accounts.merkle_tree ); + if should_emit_rate_limited_warning( + warning_key, + Duration::from_secs(15), + ) { + warn!( + event = "v2_address_hashchain_mismatch", + run_id = %self.run_id, + tree = %tree_accounts.merkle_tree, + error = %e, + "Address processing hit hashchain mismatch. Clearing cache and retrying." + ); + } + self.heartbeat.increment_v2_recoverable_error(); proc.clear_cache().await; Ok(ProcessingResult::default()) } else { - warn!( - "Failed to process address queue for tree {}: {}. Will retry next tick without dropping processor.", - tree_accounts.merkle_tree, - e - ); + let warning_key = + format!("v2_address_process_failed:{}", tree_accounts.merkle_tree); + if should_emit_rate_limited_warning( + warning_key, + Duration::from_secs(10), + ) { + warn!( + event = "v2_address_process_failed_retrying", + run_id = %self.run_id, + tree = %tree_accounts.merkle_tree, + error = %e, + "Failed to process address queue. Will retry next tick without dropping processor." + ); + } + self.heartbeat.increment_v2_recoverable_error(); Ok(ProcessingResult::default()) } } @@ -2466,8 +3064,10 @@ impl EpochManager { } _ => { warn!( - "Unsupported tree type for V2 processing: {:?}", - tree_accounts.tree_type + event = "v2_unsupported_tree_type", + run_id = %self.run_id, + tree_type = ?tree_accounts.tree_type, + "Unsupported tree type for V2 processing" ); Ok(ProcessingResult::default()) } @@ -2489,6 +3089,7 @@ impl EpochManager { queue_metric_update(epoch_num, items_processed, duration); self.increment_processed_items_count(epoch_num, items_processed) .await; + self.heartbeat.add_items_processed(items_processed); } } @@ -2519,8 +3120,10 @@ impl EpochManager { if v2_state_trees.is_empty() { if skipped_count > 0 { info!( - "No trees to pre-warm: {} StateV2 trees skipped by config", - skipped_count + event = "prewarm_skipped_all_trees_filtered", + run_id = %self.run_id, + skipped_trees = skipped_count, + "No trees to pre-warm; all StateV2 trees skipped by config" ); } return; @@ -2528,8 +3131,11 @@ impl EpochManager { if slots_until_active < 15 { info!( - "Skipping pre-warming: only {} slots until active phase, not enough time", - slots_until_active + event = "prewarm_skipped_not_enough_time", + run_id = %self.run_id, + slots_until_active, + min_required_slots = 15, + "Skipping pre-warming; not enough slots until active phase" ); return; } @@ -2554,7 +3160,13 @@ impl EpochManager { let mut rpc = match self_clone.rpc_pool.get_connection().await { Ok(r) => r, Err(e) => { - warn!("Failed to get RPC for cache validation: {:?}", e); + warn!( + event = "prewarm_cache_validation_rpc_failed", + run_id = %self_clone.run_id, + tree = %tree_pubkey, + error = ?e, + "Failed to get RPC for cache validation" + ); return; } }; @@ -2562,8 +3174,12 @@ impl EpochManager { self_clone.fetch_current_root(&mut *rpc, &tree_accounts).await { info!( - "Tree {} has {} cached proofs from previous epoch (root: {:?}), skipping pre-warm", - tree_pubkey, cache_len, ¤t_root[..4] + event = "prewarm_skipped_cache_already_warm", + run_id = %self_clone.run_id, + tree = %tree_pubkey, + cached_proofs = cache_len, + root_prefix = ?¤t_root[..4], + "Tree already has cached proofs from previous epoch; skipping pre-warm" ); return; } @@ -2576,8 +3192,11 @@ impl EpochManager { Ok(p) => p, Err(e) => { warn!( - "Failed to create processor for pre-warming tree {}: {:?}", - tree_pubkey, e + event = "prewarm_processor_create_failed", + run_id = %self_clone.run_id, + tree = %tree_pubkey, + error = ?e, + "Failed to create processor for pre-warming tree" ); return; } @@ -2596,8 +3215,11 @@ impl EpochManager { Ok(result) => { if result.items_processed > 0 { info!( - "Pre-warmed {} items for tree {} during wait (metrics: {:?})", - result.items_processed, tree_pubkey, result.metrics + event = "prewarm_tree_completed", + run_id = %self_clone.run_id, + tree = %tree_pubkey, + items = result.items_processed, + "Pre-warmed items for tree during wait" ); self_clone .add_processing_metrics(epoch_info.epoch.epoch, result.metrics) @@ -2621,20 +3243,32 @@ impl EpochManager { (slot_duration() * timeout_slots as u32).min(Duration::from_secs(30)); info!( - "Starting pre-warming for {} trees ({} skipped by config) with {}ms timeout", - v2_state_trees.len(), - skipped_count, - timeout_duration.as_millis() + event = "prewarm_started", + run_id = %self.run_id, + trees = v2_state_trees.len(), + skipped_trees = skipped_count, + timeout_ms = timeout_duration.as_millis() as u64, + "Starting pre-warming" ); match tokio::time::timeout(timeout_duration, futures::future::join_all(prewarm_futures)) .await { Ok(_) => { - info!("Completed pre-warming for all trees"); + info!( + event = "prewarm_completed", + run_id = %self.run_id, + trees = v2_state_trees.len(), + "Completed pre-warming for all trees" + ); } Err(_) => { - info!("Pre-warming timed out after {:?}", timeout_duration); + info!( + event = "prewarm_timed_out", + run_id = %self.run_id, + timeout_ms = timeout_duration.as_millis() as u64, + "Pre-warming timed out" + ); } } } @@ -2651,8 +3285,12 @@ impl EpochManager { let current_slot = self.slot_tracker.estimated_current_slot(); if current_slot >= consecutive_eligibility_end { debug!( - "Skipping cached proof send for tree {}: past eligibility window (slot {} >= {})", - tree_pubkey, current_slot, consecutive_eligibility_end + event = "cached_proofs_skipped_outside_eligibility", + run_id = %self.run_id, + tree = %tree_pubkey, + current_slot, + eligibility_end_slot = consecutive_eligibility_end, + "Skipping cached proof send because eligibility window has ended" ); return Ok(None); } @@ -2663,7 +3301,12 @@ impl EpochManager { }; if cache.is_warming().await { - debug!("Cache still warming for tree {}, skipping", tree_pubkey); + debug!( + event = "cached_proofs_skipped_cache_warming", + run_id = %self.run_id, + tree = %tree_pubkey, + "Skipping cached proofs because cache is still warming" + ); return Ok(None); } @@ -2672,8 +3315,11 @@ impl EpochManager { Ok(root) => root, Err(e) => { warn!( - "Failed to fetch current root for tree {}: {:?}", - tree_pubkey, e + event = "cached_proofs_root_fetch_failed", + run_id = %self.run_id, + tree = %tree_pubkey, + error = ?e, + "Failed to fetch current root for tree" ); return Ok(None); } @@ -2683,9 +3329,11 @@ impl EpochManager { Some(proofs) => proofs, None => { debug!( - "No valid cached proofs for tree {} (root: {:?})", - tree_pubkey, - ¤t_root[..4] + event = "cached_proofs_not_available", + run_id = %self.run_id, + tree = %tree_pubkey, + root_prefix = ?¤t_root[..4], + "No valid cached proofs for tree" ); return Ok(None); } @@ -2696,10 +3344,12 @@ impl EpochManager { } info!( - "Sending {} cached proofs for tree {} (root: {:?})", - cached_proofs.len(), - tree_pubkey, - ¤t_root[..4] + event = "cached_proofs_send_started", + run_id = %self.run_id, + tree = %tree_pubkey, + proofs = cached_proofs.len(), + root_prefix = ?¤t_root[..4], + "Sending cached proofs for tree" ); let items_sent = self @@ -2813,14 +3463,21 @@ impl EpochManager { { Ok(sig) => { info!( - "Sent cached proofs tx: {} ({} instructions)", - sig, - instructions.len() + event = "cached_proofs_tx_sent", + run_id = %self.run_id, + signature = %sig, + instruction_count = instructions.len(), + "Sent cached proofs transaction" ); total_items += chunk_items; } Err(e) => { - warn!("Failed to send cached proofs tx: {:?}", e); + warn!( + event = "cached_proofs_tx_send_failed", + run_id = %self.run_id, + error = ?e, + "Failed to send cached proofs transaction" + ); } } } @@ -2834,7 +3491,13 @@ impl EpochManager { if is_tree_ready_for_rollover(&mut *rpc, tree_account.merkle_tree, tree_account.tree_type) .await? { - info!("Starting {} rollover.", tree_account.merkle_tree); + info!( + event = "tree_rollover_started", + run_id = %self.run_id, + tree = %tree_account.merkle_tree, + tree_type = ?tree_account.tree_type, + "Starting tree rollover" + ); self.perform_rollover(tree_account).await?; } Ok(()) @@ -2855,12 +3518,23 @@ impl EpochManager { #[instrument(level = "debug", skip(self, epoch_info), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch_info.epoch.epoch ))] async fn wait_for_report_work_phase(&self, epoch_info: &ForesterEpochInfo) -> Result<()> { - info!("Waiting for report work phase"); + info!( + event = "wait_for_report_work_phase", + run_id = %self.run_id, + epoch = epoch_info.epoch.epoch, + report_work_start_slot = epoch_info.epoch.phases.report_work.start, + "Waiting for report work phase" + ); let mut rpc = self.rpc_pool.get_connection().await?; let report_work_start_slot = epoch_info.epoch.phases.report_work.start; wait_until_slot_reached(&mut *rpc, &self.slot_tracker, report_work_start_slot).await?; - info!("Finished waiting for report work phase"); + info!( + event = "report_work_phase_ready", + run_id = %self.run_id, + epoch = epoch_info.epoch.epoch, + "Finished waiting for report work phase" + ); Ok(()) } @@ -2874,8 +3548,15 @@ impl EpochManager { }; info!( - "Sending work report: epoch={} items={} metrics={:?}", - report.epoch, report.processed_items, report.metrics + event = "work_report_sent_to_channel", + run_id = %self.run_id, + epoch = report.epoch, + items = report.processed_items, + total_circuit_inputs_ms = report.metrics.total_circuit_inputs().as_millis() as u64, + total_proof_generation_ms = report.metrics.total_proof_generation().as_millis() as u64, + total_round_trip_ms = report.metrics.total_round_trip().as_millis() as u64, + tx_sending_ms = report.metrics.tx_sending_duration.as_millis() as u64, + "Sending work report to channel" ); self.work_report_sender @@ -2885,6 +3566,7 @@ impl EpochManager { epoch: report.epoch, error: e.to_string(), })?; + self.heartbeat.increment_work_report_sent(); Ok(()) } @@ -2892,7 +3574,12 @@ impl EpochManager { #[instrument(level = "debug", skip(self, epoch_info), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch_info.epoch.epoch ))] async fn report_work_onchain(&self, epoch_info: &ForesterEpochInfo) -> Result<()> { - info!("Reporting work on-chain"); + info!( + event = "work_report_onchain_started", + run_id = %self.run_id, + epoch = epoch_info.epoch.epoch, + "Reporting work on-chain" + ); let mut rpc = LightClient::new(LightClientConfig { url: self.config.external_services.rpc_url.to_string(), photon_url: self.config.external_services.indexer_url.clone(), @@ -2935,11 +3622,21 @@ impl EpochManager { .await { Ok(_) => { - info!("Work reported on-chain"); + info!( + event = "work_report_onchain_succeeded", + run_id = %self.run_id, + epoch = epoch_info.epoch.epoch, + "Work reported on-chain" + ); } Err(e) => { if e.to_string().contains("already been processed") { - info!("Work already reported for epoch {}", epoch_info.epoch.epoch); + info!( + event = "work_report_onchain_already_reported", + run_id = %self.run_id, + epoch = epoch_info.epoch.epoch, + "Work already reported on-chain for epoch" + ); return Ok(()); } if let RpcError::ClientError(client_error) = &e { @@ -2985,7 +3682,13 @@ impl EpochManager { ) .await?; - info!("Address rollover signature: {:?}", rollover_signature); + info!( + event = "address_tree_rollover_succeeded", + run_id = %self.run_id, + tree = %tree_account.merkle_tree, + signature = %rollover_signature, + "Address tree rollover succeeded" + ); Ok(()) } TreeType::StateV1 => { @@ -3007,7 +3710,13 @@ impl EpochManager { ) .await?; - info!("State rollover signature: {:?}", rollover_signature); + info!( + event = "state_tree_rollover_succeeded", + run_id = %self.run_id, + tree = %tree_account.merkle_tree, + signature = %rollover_signature, + "State tree rollover succeeded" + ); Ok(()) } @@ -3052,9 +3761,73 @@ fn should_skip_tree(config: &ForesterConfig, tree_type: &TreeType) -> bool { } } +pub fn generate_run_id() -> String { + let epoch_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + format!("{}-{}", std::process::id(), epoch_ms) +} + +fn spawn_heartbeat_task( + heartbeat: Arc, + slot_tracker: Arc, + protocol_config: Arc, + run_id: String, +) -> JoinHandle<()> { + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(20)); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut previous = heartbeat.snapshot(); + + loop { + interval.tick().await; + + let slot = slot_tracker.estimated_current_slot(); + let epoch = protocol_config.get_current_active_epoch(slot).ok(); + let epoch_known = epoch.is_some(); + let epoch_value = epoch.unwrap_or_default(); + let current = heartbeat.snapshot(); + let delta = current.delta_since(&previous); + previous = current; + + info!( + event = "service_heartbeat", + run_id = %run_id, + slot, + epoch = epoch_value, + epoch_known, + cycle_delta = delta.active_cycles, + tree_tasks_delta = delta.tree_tasks_spawned, + queues_started_delta = delta.queues_started, + queues_finished_delta = delta.queues_finished, + items_processed_delta = delta.items_processed, + work_reports_delta = delta.work_reports_sent, + recoverable_v2_errors_delta = delta.v2_recoverable_errors, + cycle_total = current.active_cycles, + items_processed_total = current.items_processed, + "Forester heartbeat" + ); + } + }) +} + #[instrument( level = "info", - skip(config, protocol_config, rpc_pool, shutdown, work_report_sender, slot_tracker), + skip( + config, + protocol_config, + rpc_pool, + shutdown, + work_report_sender, + slot_tracker, + tx_cache, + ops_cache, + compressible_tracker, + pda_tracker, + mint_tracker, + run_id + ), fields(forester = %config.payer_keypair.pubkey()) )] #[allow(clippy::too_many_arguments)] @@ -3070,9 +3843,22 @@ pub async fn run_service( compressible_tracker: Option>, pda_tracker: Option>, mint_tracker: Option>, + run_id: String, ) -> Result<()> { - info_span!("run_service", forester = %config.payer_keypair.pubkey()) - .in_scope(|| async { + let heartbeat = Arc::new(ServiceHeartbeat::default()); + let heartbeat_handle = spawn_heartbeat_task( + heartbeat.clone(), + slot_tracker.clone(), + protocol_config.clone(), + run_id.clone(), + ); + + let run_id_for_logs = run_id.clone(); + let result = info_span!( + "run_service", + forester = %config.payer_keypair.pubkey() + ) + .in_scope(|| async move { let processor_mode_str = match ( config.general_config.skip_v1_state_trees && config.general_config.skip_v1_address_trees, @@ -3084,7 +3870,12 @@ pub async fn run_service( (false, false) => "all", _ => "unknown", }; - info!("Starting forester in {} mode", processor_mode_str); + info!( + event = "forester_starting", + run_id = %run_id_for_logs, + processor_mode = processor_mode_str, + "Starting forester" + ); const INITIAL_RETRY_DELAY: Duration = Duration::from_secs(1); const MAX_RETRY_DELAY: Duration = Duration::from_secs(30); @@ -3102,7 +3893,12 @@ pub async fn run_service( tokio::select! { biased; _ = &mut shutdown => { - info!("Received shutdown signal during tree fetch. Stopping."); + info!( + event = "shutdown_received", + run_id = %run_id_for_logs, + phase = "tree_fetch", + "Received shutdown signal during tree fetch. Stopping." + ); return Ok(()); } result = rpc_pool.get_connection() => { @@ -3111,7 +3907,12 @@ pub async fn run_service( tokio::select! { biased; _ = &mut shutdown => { - info!("Received shutdown signal during tree fetch. Stopping."); + info!( + event = "shutdown_received", + run_id = %run_id_for_logs, + phase = "tree_fetch", + "Received shutdown signal during tree fetch. Stopping." + ); return Ok(()); } fetch_result = fetch_trees(&*rpc) => { @@ -3120,15 +3921,22 @@ pub async fn run_service( let group_authority = match config.general_config.group_authority { Some(ga) => Some(ga), None => { - match fetch_protocol_group_authority(&*rpc).await { + match fetch_protocol_group_authority(&*rpc, run_id_for_logs.as_str()).await { Ok(ga) => { - info!("Using protocol default group authority: {}", ga); + info!( + event = "group_authority_default_fetched", + run_id = %run_id_for_logs, + group_authority = %ga, + "Using protocol default group authority" + ); Some(ga) } Err(e) => { warn!( - "Failed to fetch protocol group authority, processing all trees: {:?}", - e + event = "group_authority_fetch_failed", + run_id = %run_id_for_logs, + error = ?e, + "Failed to fetch protocol group authority; processing all trees" ); None } @@ -3140,10 +3948,12 @@ pub async fn run_service( let before_count = fetched_trees.len(); fetched_trees.retain(|tree| tree.owner == group_authority); info!( - "Filtered trees by group authority {}: {} -> {} trees", - group_authority, - before_count, - fetched_trees.len() + event = "trees_filtered_by_group_authority", + run_id = %run_id_for_logs, + group_authority = %group_authority, + trees_before = before_count, + trees_after = fetched_trees.len(), + "Filtered trees by group authority" ); } @@ -3151,13 +3961,24 @@ pub async fn run_service( let tree_ids = &config.general_config.tree_ids; fetched_trees.retain(|tree| tree_ids.contains(&tree.merkle_tree)); if fetched_trees.is_empty() { - error!("None of the specified trees found: {:?}", tree_ids); + error!( + event = "trees_filter_explicit_ids_empty", + run_id = %run_id_for_logs, + requested_tree_count = tree_ids.len(), + requested_trees = ?tree_ids, + "None of the specified trees were found" + ); return Err(anyhow::anyhow!( "None of the specified trees found: {:?}", tree_ids )); } - info!("Processing only trees: {:?}", tree_ids); + info!( + event = "trees_filter_explicit_ids", + run_id = %run_id_for_logs, + tree_count = tree_ids.len(), + "Processing only explicitly requested trees" + ); } break fetched_trees; } @@ -3171,8 +3992,13 @@ pub async fn run_service( )); } warn!( - "Failed to fetch trees (attempt {}/{}), retrying in {:?}: {:?}", - attempts, max_attempts, delay, e + event = "fetch_trees_failed_retrying", + run_id = %run_id_for_logs, + attempt = attempts, + max_attempts, + retry_delay_ms = delay.as_millis() as u64, + error = ?e, + "Failed to fetch trees; retrying" ); } } @@ -3189,8 +4015,13 @@ pub async fn run_service( )); } warn!( - "Failed to get RPC connection (attempt {}/{}), retrying in {:?}: {:?}", - attempts, max_attempts, delay, e + event = "rpc_connection_failed_retrying", + run_id = %run_id_for_logs, + attempt = attempts, + max_attempts, + retry_delay_ms = delay.as_millis() as u64, + error = ?e, + "Failed to get RPC connection; retrying" ); } } @@ -3200,7 +4031,12 @@ pub async fn run_service( tokio::select! { biased; _ = &mut shutdown => { - info!("Received shutdown signal during retry wait. Stopping."); + info!( + event = "shutdown_received", + run_id = %run_id_for_logs, + phase = "tree_fetch_retry_wait", + "Received shutdown signal during retry wait. Stopping." + ); return Ok(()); } _ = sleep(delay) => { @@ -3214,7 +4050,12 @@ pub async fn run_service( let (new_tree_sender, _) = broadcast::channel(100); if !config.general_config.tree_ids.is_empty() { - info!("Processing specific trees, tree discovery will be limited"); + info!( + event = "tree_discovery_limited_to_explicit_ids", + run_id = %run_id_for_logs, + tree_count = config.general_config.tree_ids.len(), + "Processing specific trees; tree discovery will be limited" + ); } while retry_count < config.retry_config.max_retries { @@ -3226,9 +4067,11 @@ pub async fn run_service( match load_lookup_table_async(&*rpc, lut_address).await { Ok(lut) => { info!( - "Loaded lookup table {} with {} addresses", - lut_address, - lut.addresses.len() + event = "lookup_table_loaded", + run_id = %run_id_for_logs, + lookup_table = %lut_address, + address_count = lut.addresses.len(), + "Loaded lookup table" ); Arc::new(vec![lut]) } @@ -3260,6 +4103,8 @@ pub async fn run_service( pda_tracker.clone(), mint_tracker.clone(), address_lookup_tables, + heartbeat.clone(), + run_id.clone(), ) .await { @@ -3273,7 +4118,12 @@ pub async fn run_service( let result = tokio::select! { result = epoch_manager.run() => result, _ = shutdown => { - info!("Received shutdown signal. Stopping the service."); + info!( + event = "shutdown_received", + run_id = %run_id_for_logs, + phase = "service_run", + "Received shutdown signal. Stopping the service." + ); Ok(()) } }; @@ -3282,9 +4132,11 @@ pub async fn run_service( } Err(e) => { warn!( - "Failed to create EpochManager (attempt {}): {:?}", - retry_count + 1, - e + event = "epoch_manager_create_failed", + run_id = %run_id_for_logs, + attempt = retry_count + 1, + error = ?e, + "Failed to create EpochManager" ); retry_count += 1; if retry_count < config.retry_config.max_retries { @@ -3293,9 +4145,12 @@ pub async fn run_service( retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY); } else { error!( - "Failed to start forester after {} attempts over {:?}", - config.retry_config.max_retries, - start_time.elapsed() + event = "forester_start_failed_max_retries", + run_id = %run_id_for_logs, + attempts = config.retry_config.max_retries, + elapsed_ms = start_time.elapsed().as_millis() as u64, + error = ?e, + "Failed to start forester after max retries" ); return Err(InitializationError::MaxRetriesExceeded { attempts: config.retry_config.max_retries, @@ -3312,7 +4167,10 @@ pub async fn run_service( .into(), ) }) - .await + .await; + + heartbeat_handle.abort(); + result } /// Async version of load_lookup_table that works with the Rpc trait @@ -3379,18 +4237,14 @@ mod tests { indexer_config: Default::default(), transaction_config: Default::default(), general_config: GeneralConfig { - slot_update_interval_seconds: 10, - tree_discovery_interval_seconds: 1, enable_metrics: false, skip_v1_state_trees: skip_v1_state, skip_v1_address_trees: skip_v1_address, skip_v2_state_trees: skip_v2_state, skip_v2_address_trees: skip_v2_address, - tree_ids: vec![], sleep_after_processing_ms: 50, sleep_when_idle_ms: 100, - queue_polling_mode: crate::cli::QueuePollingMode::Indexer, - group_authority: None, + ..Default::default() }, rpc_pool_config: Default::default(), registry_pubkey: Pubkey::default(), diff --git a/forester/src/errors.rs b/forester/src/errors.rs index 250c1f684c..f88c6e1f21 100644 --- a/forester/src/errors.rs +++ b/forester/src/errors.rs @@ -81,6 +81,13 @@ pub enum RegistrationError { registration_end: u64, }, + #[error("Cannot finalize registration for epoch {epoch}. Current slot: {current_slot}, active phase ended: {active_phase_end_slot}")] + FinalizeRegistrationPhaseEnded { + epoch: u64, + current_slot: u64, + active_phase_end_slot: u64, + }, + #[error("Epoch registration returned no result")] EmptyRegistration, diff --git a/forester/src/forester_status.rs b/forester/src/forester_status.rs index 62812710f5..801d6a1799 100644 --- a/forester/src/forester_status.rs +++ b/forester/src/forester_status.rs @@ -250,7 +250,7 @@ pub async fn get_forester_status_with_options( // Filter trees by protocol group authority if enabled if filter_by_group_authority { - match fetch_protocol_group_authority(&rpc).await { + match fetch_protocol_group_authority(&rpc, "status").await { Ok(group_authority) => { let before_count = trees.len(); trees.retain(|tree| tree.owner == group_authority); diff --git a/forester/src/lib.rs b/forester/src/lib.rs index 203015ddbd..eb224a1e4e 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -9,6 +9,7 @@ pub mod errors; pub mod forester_status; pub mod health_check; pub mod helius_priority_fee_types; +pub mod logging; pub mod metrics; pub mod pagerduty; pub mod processor; @@ -38,7 +39,7 @@ use tokio::sync::{mpsc, oneshot, Mutex}; use tracing::debug; use crate::{ - epoch_manager::{run_service, WorkReport}, + epoch_manager::{generate_run_id, run_service, WorkReport}, metrics::QUEUE_LENGTH, processor::tx_cache::ProcessedHashCache, queue_helpers::{ @@ -148,6 +149,30 @@ pub async fn run_pipeline( shutdown_compressible: Option>, shutdown_bootstrap: Option>, work_report_sender: mpsc::Sender, +) -> Result<()> { + run_pipeline_with_run_id::( + config, + rpc_rate_limiter, + send_tx_rate_limiter, + shutdown_service, + shutdown_compressible, + shutdown_bootstrap, + work_report_sender, + generate_run_id(), + ) + .await +} + +#[allow(clippy::too_many_arguments)] +pub async fn run_pipeline_with_run_id( + config: Arc, + rpc_rate_limiter: Option, + send_tx_rate_limiter: Option, + shutdown_service: oneshot::Receiver<()>, + shutdown_compressible: Option>, + shutdown_bootstrap: Option>, + work_report_sender: mpsc::Sender, + run_id: String, ) -> Result<()> { let mut builder = SolanaRpcPoolBuilder::::default() .url(config.external_services.rpc_url.to_string()) @@ -240,6 +265,9 @@ pub async fn run_pipeline( } }); + // Extract flag before async closures to avoid moving config Arc + let helius_rpc = config.general_config.helius_rpc; + // Spawn bootstrap task for ctokens with shutdown support if let Some(mut shutdown_bootstrap_rx) = shutdown_bootstrap { let tracker_clone = ctoken_tracker.clone(); @@ -254,7 +282,10 @@ pub async fn run_pipeline( let rpc_url = rpc_url.clone(); let tracker = tracker_clone.clone(); async move { - compressible::bootstrap_ctoken_accounts(rpc_url, tracker, None).await + compressible::bootstrap_ctoken_accounts( + rpc_url, tracker, None, helius_rpc, + ) + .await } }); @@ -321,7 +352,10 @@ pub async fn run_pipeline( let rpc_url = rpc_url.clone(); let tracker = pda_tracker_clone.clone(); async move { - compressible::pda::bootstrap_pda_accounts(rpc_url, tracker, None).await + compressible::pda::bootstrap_pda_accounts( + rpc_url, tracker, None, helius_rpc, + ) + .await } }); @@ -377,8 +411,10 @@ pub async fn run_pipeline( let rpc_url = rpc_url.clone(); let tracker = mint_tracker_clone.clone(); async move { - compressible::mint::bootstrap_mint_accounts(rpc_url, tracker, None) - .await + compressible::mint::bootstrap_mint_accounts( + rpc_url, tracker, None, helius_rpc, + ) + .await } }); @@ -420,6 +456,7 @@ pub async fn run_pipeline( compressible_tracker, pda_tracker, mint_tracker, + run_id, ) .await; diff --git a/forester/src/logging.rs b/forester/src/logging.rs new file mode 100644 index 0000000000..22ef24f049 --- /dev/null +++ b/forester/src/logging.rs @@ -0,0 +1,121 @@ +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicU64, Ordering}, + Mutex, OnceLock, + }, + time::{Duration, Instant}, +}; + +#[derive(Debug, Default)] +pub struct ServiceHeartbeat { + active_cycles: AtomicU64, + tree_tasks_spawned: AtomicU64, + queues_started: AtomicU64, + queues_finished: AtomicU64, + items_processed: AtomicU64, + work_reports_sent: AtomicU64, + v2_recoverable_errors: AtomicU64, +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct HeartbeatSnapshot { + pub active_cycles: u64, + pub tree_tasks_spawned: u64, + pub queues_started: u64, + pub queues_finished: u64, + pub items_processed: u64, + pub work_reports_sent: u64, + pub v2_recoverable_errors: u64, +} + +impl HeartbeatSnapshot { + pub fn delta_since(&self, previous: &Self) -> Self { + Self { + active_cycles: self.active_cycles.saturating_sub(previous.active_cycles), + tree_tasks_spawned: self + .tree_tasks_spawned + .saturating_sub(previous.tree_tasks_spawned), + queues_started: self.queues_started.saturating_sub(previous.queues_started), + queues_finished: self + .queues_finished + .saturating_sub(previous.queues_finished), + items_processed: self + .items_processed + .saturating_sub(previous.items_processed), + work_reports_sent: self + .work_reports_sent + .saturating_sub(previous.work_reports_sent), + v2_recoverable_errors: self + .v2_recoverable_errors + .saturating_sub(previous.v2_recoverable_errors), + } + } +} + +impl ServiceHeartbeat { + pub fn increment_active_cycle(&self) { + self.active_cycles.fetch_add(1, Ordering::Relaxed); + } + + pub fn add_tree_tasks_spawned(&self, count: usize) { + self.tree_tasks_spawned + .fetch_add(count as u64, Ordering::Relaxed); + } + + pub fn increment_queue_started(&self) { + self.queues_started.fetch_add(1, Ordering::Relaxed); + } + + pub fn increment_queue_finished(&self) { + self.queues_finished.fetch_add(1, Ordering::Relaxed); + } + + pub fn add_items_processed(&self, count: usize) { + self.items_processed + .fetch_add(count as u64, Ordering::Relaxed); + } + + pub fn increment_work_report_sent(&self) { + self.work_reports_sent.fetch_add(1, Ordering::Relaxed); + } + + pub fn increment_v2_recoverable_error(&self) { + self.v2_recoverable_errors.fetch_add(1, Ordering::Relaxed); + } + + pub fn snapshot(&self) -> HeartbeatSnapshot { + HeartbeatSnapshot { + active_cycles: self.active_cycles.load(Ordering::Relaxed), + tree_tasks_spawned: self.tree_tasks_spawned.load(Ordering::Relaxed), + queues_started: self.queues_started.load(Ordering::Relaxed), + queues_finished: self.queues_finished.load(Ordering::Relaxed), + items_processed: self.items_processed.load(Ordering::Relaxed), + work_reports_sent: self.work_reports_sent.load(Ordering::Relaxed), + v2_recoverable_errors: self.v2_recoverable_errors.load(Ordering::Relaxed), + } + } +} + +pub fn should_emit_rate_limited_warning(key: impl Into, interval: Duration) -> bool { + static LAST_EMIT_AT: OnceLock>> = OnceLock::new(); + const MAX_KEYS: usize = 2_048; + + let key = key.into(); + let now = Instant::now(); + let map = LAST_EMIT_AT.get_or_init(|| Mutex::new(HashMap::new())); + let mut map = map.lock().expect("rate limiter mutex poisoned"); + + if map.len() > MAX_KEYS { + let stale_after = interval.saturating_mul(4); + map.retain(|_, ts| now.duration_since(*ts) < stale_after); + } + + match map.get(&key) { + Some(last_emit) if now.duration_since(*last_emit) < interval => false, + _ => { + map.insert(key, now); + true + } + } +} diff --git a/forester/src/main.rs b/forester/src/main.rs index 91cdb9d506..fe99ac90e0 100644 --- a/forester/src/main.rs +++ b/forester/src/main.rs @@ -4,11 +4,12 @@ use clap::Parser; use forester::{ api_server::{spawn_api_server, ApiServerConfig}, cli::{Cli, Commands}, + epoch_manager::generate_run_id, errors::ForesterError, forester_status, health_check::run_health_check, metrics::register_metrics, - run_pipeline, + run_pipeline_with_run_id, telemetry::setup_telemetry, ForesterConfig, }; @@ -87,12 +88,15 @@ async fn main() -> Result<(), ForesterError> { .map(RateLimiter::new); let rpc_url_for_api: String = config.external_services.rpc_url.to_string(); + let run_id = generate_run_id(); let api_server_handle = spawn_api_server(ApiServerConfig { + run_id: Arc::::from(run_id.clone()), rpc_url: rpc_url_for_api, port: args.api_server_port, allow_public_bind: args.api_server_public_bind, compressible_state: None, prometheus_url: args.prometheus_url.clone(), + helius_rpc: args.helius_rpc, }); // Create compressible shutdown channels if compressible is enabled @@ -124,7 +128,7 @@ async fn main() -> Result<(), ForesterError> { (None, None) }; - run_pipeline::( + run_pipeline_with_run_id::( config, rpc_rate_limiter, send_tx_limiter, @@ -132,6 +136,7 @@ async fn main() -> Result<(), ForesterError> { shutdown_receiver_compressible, shutdown_receiver_bootstrap, work_report_sender, + run_id, ) .await? } @@ -146,22 +151,33 @@ async fn main() -> Result<(), ForesterError> { } Commands::Dashboard(args) => { tracing::info!( - "Starting standalone dashboard API server on port {}", - args.port + event = "dashboard_server_starting", + port = args.port, + "Starting standalone dashboard API server" ); + let run_id = generate_run_id(); let api_server_handle = spawn_api_server(ApiServerConfig { + run_id: Arc::::from(run_id), rpc_url: args.rpc_url.clone(), port: args.port, allow_public_bind: args.public_bind, compressible_state: None, prometheus_url: args.prometheus_url.clone(), + helius_rpc: false, }); // Block until Ctrl+C if let Err(e) = ctrl_c().await { - tracing::error!("Failed to listen for Ctrl+C: {}", e); + tracing::error!( + event = "dashboard_ctrlc_listener_failed", + error = %e, + "Failed to listen for Ctrl+C" + ); } - tracing::info!("Received Ctrl+C, shutting down dashboard API server..."); + tracing::info!( + event = "dashboard_shutdown_signal_received", + "Received Ctrl+C, shutting down dashboard API server" + ); api_server_handle.shutdown(); } } diff --git a/forester/src/processor/v1/helpers.rs b/forester/src/processor/v1/helpers.rs index 0d00d82c4b..5999fc0dc5 100644 --- a/forester/src/processor/v1/helpers.rs +++ b/forester/src/processor/v1/helpers.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use account_compression::{ processor::initialize_address_merkle_tree::Pubkey, @@ -19,7 +19,10 @@ use solana_program::instruction::Instruction; use tokio::time::Instant; use tracing::{info, warn}; -use crate::metrics::{update_indexer_proof_count, update_indexer_response_time}; +use crate::{ + logging::should_emit_rate_limited_warning, + metrics::{update_indexer_proof_count, update_indexer_response_time}, +}; const ADDRESS_PROOF_BATCH_SIZE: usize = 100; const ADDRESS_PROOF_MAX_RETRIES: u32 = 3; @@ -53,8 +56,9 @@ pub async fn fetch_proofs_and_create_instructions( for item in state_items.iter() { if item.tree_account.tree_type != TreeType::StateV1 { warn!( - "State item has unexpected tree type: {:?}", - item.tree_account.tree_type + event = "v1_state_item_unexpected_tree_type", + tree_type = ?item.tree_account.tree_type, + "State item has unexpected tree type" ); } } @@ -93,14 +97,22 @@ pub async fn fetch_proofs_and_create_instructions( let rpc = pool.get_connection().await?; if let Err(e) = wait_for_indexer(&*rpc).await { - warn!("Indexer not fully caught up, but proceeding anyway: {}", e); + if should_emit_rate_limited_warning("v1_wait_for_indexer", Duration::from_secs(30)) { + warn!( + event = "v1_wait_for_indexer_error", + error = %e, + "Indexer not fully caught up, but proceeding anyway" + ); + } } let address_proofs = if let Some((merkle_tree, addresses)) = address_data { let total_addresses = addresses.len(); info!( - "Fetching {} address proofs in batches of {}", - total_addresses, ADDRESS_PROOF_BATCH_SIZE + event = "v1_address_proofs_fetch_started", + requested = total_addresses, + batch_size = ADDRESS_PROOF_BATCH_SIZE, + "Fetching address proofs in batches" ); let start_time = Instant::now(); @@ -119,11 +131,12 @@ pub async fn fetch_proofs_and_create_instructions( // Exponential backoff: 500ms, 1000ms, 2000ms let delay_ms = ADDRESS_PROOF_RETRY_BASE_DELAY_MS * (1 << (attempt - 1)); warn!( - "Retrying address proof batch {} (attempt {}/{}), waiting {}ms", - batch_idx, - attempt + 1, - ADDRESS_PROOF_MAX_RETRIES + 1, - delay_ms + event = "v1_address_proof_batch_retrying", + batch_index = batch_idx, + attempt = attempt + 1, + max_attempts = ADDRESS_PROOF_MAX_RETRIES + 1, + delay_ms, + "Retrying address proof batch" ); tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; } @@ -138,22 +151,22 @@ pub async fn fetch_proofs_and_create_instructions( let proofs_received = response.value.items.len(); info!( - "Address proof batch {}: requested={}, received={}, duration={:.3}s{}", - batch_idx, - batch_size, - proofs_received, - batch_duration.as_secs_f64(), - if attempt > 0 { - format!(" (after {} retries)", attempt) - } else { - String::new() - } + event = "v1_address_proof_batch_completed", + batch_index = batch_idx, + requested = batch_size, + received = proofs_received, + duration_s = batch_duration.as_secs_f64(), + retries = attempt, + "Address proof batch completed" ); if proofs_received != batch_size { warn!( - "Address proof count mismatch in batch {}: requested={}, received={}", - batch_idx, batch_size, proofs_received + event = "v1_address_proof_batch_count_mismatch", + batch_index = batch_idx, + requested = batch_size, + received = proofs_received, + "Address proof count mismatch in batch" ); } @@ -171,11 +184,12 @@ pub async fn fetch_proofs_and_create_instructions( if let Some(e) = last_error { let batch_duration = batch_start.elapsed(); warn!( - "Failed to get address proofs for batch {} after {} attempts ({:.3}s): {}", - batch_idx, - ADDRESS_PROOF_MAX_RETRIES + 1, - batch_duration.as_secs_f64(), - e + event = "v1_address_proof_batch_failed", + batch_index = batch_idx, + attempts = ADDRESS_PROOF_MAX_RETRIES + 1, + duration_s = batch_duration.as_secs_f64(), + error = %e, + "Failed to get address proofs for batch" ); return Err(anyhow::anyhow!( "Failed to get address proofs for batch {} after {} retries: {}", @@ -188,10 +202,11 @@ pub async fn fetch_proofs_and_create_instructions( let total_duration = start_time.elapsed(); info!( - "Address proofs complete: requested={}, received={}, total_duration={:.3}s", - total_addresses, - all_proofs.len(), - total_duration.as_secs_f64() + event = "v1_address_proofs_fetch_completed", + requested = total_addresses, + received = all_proofs.len(), + duration_s = total_duration.as_secs_f64(), + "Address proofs fetch completed" ); update_indexer_response_time( @@ -208,7 +223,11 @@ pub async fn fetch_proofs_and_create_instructions( let state_proofs = if let Some(states) = state_data { let total_states = states.len(); - info!("Fetching {} state proofs", total_states); + info!( + event = "v1_state_proofs_fetch_started", + requested = total_states, + "Fetching state proofs" + ); let start_time = Instant::now(); @@ -221,10 +240,11 @@ pub async fn fetch_proofs_and_create_instructions( // Exponential backoff: 500ms, 1000ms, 2000ms let delay_ms = ADDRESS_PROOF_RETRY_BASE_DELAY_MS * (1 << (attempt - 1)); warn!( - "Retrying state proofs (attempt {}/{}), waiting {}ms", - attempt + 1, - ADDRESS_PROOF_MAX_RETRIES + 1, - delay_ms + event = "v1_state_proofs_retrying", + attempt = attempt + 1, + max_attempts = ADDRESS_PROOF_MAX_RETRIES + 1, + delay_ms, + "Retrying state proofs" ); tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; } @@ -239,21 +259,20 @@ pub async fn fetch_proofs_and_create_instructions( let proofs_received = response.value.items.len(); info!( - "State proofs complete: requested={}, received={}, duration={:.3}s{}", - total_states, - proofs_received, - duration.as_secs_f64(), - if attempt > 0 { - format!(" (after {} retries)", attempt) - } else { - String::new() - } + event = "v1_state_proofs_fetch_completed", + requested = total_states, + received = proofs_received, + duration_s = duration.as_secs_f64(), + retries = attempt, + "State proofs fetch completed" ); if proofs_received != total_states { warn!( - "State proof count mismatch: requested={}, received={}", - total_states, proofs_received + event = "v1_state_proof_count_mismatch", + requested = total_states, + received = proofs_received, + "State proof count mismatch" ); } @@ -282,10 +301,11 @@ pub async fn fetch_proofs_and_create_instructions( if let Some(e) = last_error { let duration = start_time.elapsed(); warn!( - "Failed to get state proofs after {} attempts ({:.3}s): {}", - ADDRESS_PROOF_MAX_RETRIES + 1, - duration.as_secs_f64(), - e + event = "v1_state_proofs_fetch_failed", + attempts = ADDRESS_PROOF_MAX_RETRIES + 1, + duration_s = duration.as_secs_f64(), + error = %e, + "Failed to get state proofs" ); return Err(anyhow::anyhow!( "Failed to get state proofs after {} retries: {}", @@ -361,9 +381,10 @@ pub async fn fetch_proofs_and_create_instructions( Ok(tree) => tree, Err(e) => { tracing::error!( - "Failed to deserialize onchain tree {}: {}", - item.tree_account.merkle_tree, - e + event = "v1_onchain_tree_deserialize_failed", + tree = %item.tree_account.merkle_tree, + error = %e, + "Failed to deserialize onchain tree" ); return Err(anyhow::anyhow!("Failed to deserialize onchain tree: {}", e)); } @@ -374,21 +395,23 @@ pub async fn fetch_proofs_and_create_instructions( let onchain_changelog_index = onchain_tree.changelog_index(); tracing::info!( - "Creating nullify instruction for tree {}: hash={}, leaf_index={}, root_seq={}, changelog_index={}, indexer_root={}", - item.tree_account.merkle_tree, - bs58::encode(&item.queue_item_data.hash).into_string(), - proof.leaf_index, - proof.root_seq, - proof.root_seq % STATE_MERKLE_TREE_CHANGELOG, - bs58::encode(&proof.root).into_string() + event = "v1_debug_nullify_instruction", + tree = %item.tree_account.merkle_tree, + hash = %bs58::encode(&item.queue_item_data.hash).into_string(), + leaf_index = proof.leaf_index, + root_seq = proof.root_seq, + changelog_index = proof.root_seq % STATE_MERKLE_TREE_CHANGELOG, + indexer_root = %bs58::encode(&proof.root).into_string(), + "Creating nullify instruction" ); tracing::info!( - "Onchain tree {} state: current_root={}, root_index={}, changelog_index={}", - item.tree_account.merkle_tree, - bs58::encode(&onchain_root).into_string(), - onchain_root_index, - onchain_changelog_index + event = "v1_debug_onchain_tree_state", + tree = %item.tree_account.merkle_tree, + current_root = %bs58::encode(&onchain_root).into_string(), + root_index = onchain_root_index, + changelog_index = onchain_changelog_index, + "Onchain tree state" ); let capacity = onchain_tree.roots.capacity(); @@ -405,10 +428,11 @@ pub async fn fetch_proofs_and_create_instructions( .collect(); tracing::info!( - "Onchain root history (len={}, capacity={}): {:?}", - onchain_tree.roots.len(), + event = "v1_debug_onchain_root_history", + history_len = onchain_tree.roots.len(), capacity, - root_history, + root_history = ?root_history, + "Onchain root history" ); let indexer_root_position = @@ -421,9 +445,10 @@ pub async fn fetch_proofs_and_create_instructions( }); tracing::info!( - "Indexer root {} present_at_buffer_index={:?}", - bs58::encode(&proof.root).into_string(), - indexer_root_position, + event = "v1_debug_indexer_root_position", + indexer_root = %bs58::encode(&proof.root).into_string(), + present_at_buffer_index = ?indexer_root_position, + "Indexer root position in onchain buffer" ); if indexer_root_position.is_none() { @@ -527,8 +552,10 @@ pub fn calculate_compute_unit_price(target_lamports: u64, compute_units: u64) -> pub fn get_capped_priority_fee(cap_config: CapConfig) -> u64 { if cap_config.max_fee_lamports < cap_config.min_fee_lamports { warn!( - "Invalid priority fee cap config: max_fee_lamports ({}) < min_fee_lamports ({}); clamping max to min", - cap_config.max_fee_lamports, cap_config.min_fee_lamports + event = "v1_priority_fee_cap_invalid", + max_fee_lamports = cap_config.max_fee_lamports, + min_fee_lamports = cap_config.min_fee_lamports, + "Invalid priority fee cap config; clamping max to min" ); } let max_fee_lamports = cap_config.max_fee_lamports.max(cap_config.min_fee_lamports); diff --git a/forester/src/processor/v2/common.rs b/forester/src/processor/v2/common.rs index 21703db081..9917e5a9eb 100644 --- a/forester/src/processor/v2/common.rs +++ b/forester/src/processor/v2/common.rs @@ -78,6 +78,7 @@ pub struct ProverConfig { pub struct BatchContext { pub rpc_pool: Arc>, pub authority: Arc, + pub run_id: Arc, pub derivation: Pubkey, pub epoch: u64, pub merkle_tree: Pubkey, @@ -104,6 +105,7 @@ impl Clone for BatchContext { Self { rpc_pool: self.rpc_pool.clone(), authority: self.authority.clone(), + run_id: self.run_id.clone(), derivation: self.derivation, epoch: self.epoch, merkle_tree: self.merkle_tree, diff --git a/forester/src/processor/v2/helpers.rs b/forester/src/processor/v2/helpers.rs index 22dd157aad..ed135cb6a4 100644 --- a/forester/src/processor/v2/helpers.rs +++ b/forester/src/processor/v2/helpers.rs @@ -134,7 +134,7 @@ pub async fn fetch_paginated_batches( let page_size_elements = PAGE_SIZE_BATCHES * zkp_batch_size; if total_elements <= page_size_elements { - tracing::info!( + tracing::debug!( "fetch_paginated_batches: single page fetch with start_index=None, total_elements={}, page_size={}", total_elements, page_size_elements ); @@ -302,7 +302,7 @@ pub async fn fetch_batches( fetch_len: u64, zkp_batch_size: u64, ) -> crate::Result> { - tracing::info!( + tracing::debug!( "fetch_batches: tree={}, output_start={:?}, input_start={:?}, fetch_len={}, zkp_batch_size={}", context.merkle_tree, output_start_index, input_start_index, fetch_len, zkp_batch_size ); @@ -574,7 +574,7 @@ pub async fn fetch_streaming_address_batches( let page_size_elements = ADDRESS_PAGE_SIZE_BATCHES * zkp_batch_size; let num_pages = total_elements.div_ceil(page_size_elements) as usize; - tracing::info!( + tracing::debug!( "address fetch: {} elements ({} batches) in {} pages of {} batches each", total_elements, total_elements / zkp_batch_size, diff --git a/forester/src/processor/v2/processor.rs b/forester/src/processor/v2/processor.rs index 4df6ed4eae..1d166efa35 100644 --- a/forester/src/processor/v2/processor.rs +++ b/forester/src/processor/v2/processor.rs @@ -13,6 +13,7 @@ use tracing::{debug, info, warn}; use crate::{ epoch_manager::{CircuitMetrics, ProcessingMetrics}, + logging::should_emit_rate_limited_warning, processor::v2::{ batch_job_builder::BatchJobBuilder, common::WorkerPool, @@ -90,10 +91,13 @@ where let zkp_batch_size = strategy.fetch_zkp_batch_size(&context).await?; let current_root = strategy.fetch_onchain_root(&context).await?; info!( - "Initializing {} processor for tree {} with on-chain root {:?}[..4]", - strategy.name(), - context.merkle_tree, - ¤t_root[..4] + event = "v2_processor_initialized", + run_id = %context.run_id, + processor = strategy.name(), + tree = %context.merkle_tree, + zkp_batch_size, + root_prefix = ?¤t_root[..4], + "Initializing V2 processor" ); Ok(Self { context, @@ -142,9 +146,13 @@ where let remaining = total_batches.saturating_sub(cached.batches_processed); if remaining > 0 { info!( - "Using cached state: {} remaining batches (processed {}/{}, actual available: {})", - remaining, cached.batches_processed, total_batches, - if actual_available == usize::MAX { "max".to_string() } else { actual_available.to_string() } + event = "v2_cached_state_reused", + tree = %self.context.merkle_tree, + remaining_batches = remaining, + processed_batches = cached.batches_processed, + total_batches, + actual_available = if actual_available == usize::MAX { "max".to_string() } else { actual_available.to_string() }, + "Using cached queue state" ); let batches_to_process = remaining.min(self.context.max_batches_per_tree); @@ -180,7 +188,20 @@ where { let rpc = self.context.rpc_pool.get_connection().await?; if let Err(e) = wait_for_indexer(&*rpc).await { - warn!("wait_for_indexer error (proceeding anyway): {}", e); + if should_emit_rate_limited_warning("v2_wait_for_indexer", Duration::from_secs(30)) + { + warn!( + event = "wait_for_indexer_error", + error = %e, + "wait_for_indexer error (proceeding anyway)" + ); + } else { + debug!( + event = "wait_for_indexer_error_suppressed", + error = %e, + "Suppressing repeated wait_for_indexer warning" + ); + } } } @@ -232,10 +253,12 @@ where } RootReconcileDecision::ResetToOnchainAndStop(root) => { warn!( - "Root divergence: expected {:?}[..4], indexer {:?}[..4], on-chain {:?}[..4]. Resetting.", - &self.current_root[..4], - &queue_data.initial_root[..4], - &root[..4] + event = "v2_root_divergence_reset", + tree = %self.context.merkle_tree, + expected_root_prefix = ?&self.current_root[..4], + indexer_root_prefix = ?&queue_data.initial_root[..4], + onchain_root_prefix = ?&root[..4], + "Root divergence detected; resetting to on-chain root" ); self.current_root = root; self.cached_state = None; @@ -337,8 +360,10 @@ where if let Some(v2) = e.downcast_ref::() { if v2.is_constraint() { warn!( - "Tx sender constraint error for tree {}: {}", - self.context.merkle_tree, e + event = "v2_tx_sender_constraint_error", + tree = %self.context.merkle_tree, + error = %e, + "Tx sender constraint error" ); return Err(tx_result.unwrap_err()); } @@ -353,8 +378,10 @@ where ), Err(e) => { warn!( - "Tx sender error for tree {}: {}", - self.context.merkle_tree, e + event = "v2_tx_sender_error", + tree = %self.context.merkle_tree, + error = %e, + "Tx sender error" ); (0, Default::default(), Duration::ZERO) } @@ -395,8 +422,10 @@ where if let Err(e) = tx_result { warn!( - "Returning partial metrics despite error for tree {}: {}", - self.context.merkle_tree, e + event = "v2_partial_metrics_after_error", + tree = %self.context.merkle_tree, + error = %e, + "Returning partial metrics despite processing error" ); } @@ -572,10 +601,11 @@ where .clone(); info!( - "Pre-warming {} proofs for tree {} with root {:?}", - num_batches, - self.context.merkle_tree, - &initial_root[..4] + event = "v2_prewarm_started", + tree = %self.context.merkle_tree, + proofs = num_batches, + root_prefix = ?&initial_root[..4], + "Pre-warming proofs for tree" ); let (jobs_sent, timings, _staging_tree) = self @@ -612,8 +642,11 @@ where } Err(e) => { warn!( - "Proof generation failed during pre-warm for seq={}: {}", - result.seq, e + event = "v2_prewarm_proof_generation_failed", + tree = %self.context.merkle_tree, + seq = result.seq, + error = %e, + "Proof generation failed during pre-warm" ); } } @@ -623,16 +656,20 @@ where if proofs_cached < jobs_sent { warn!( - "Pre-warmed {} proofs but expected {} for tree {}", - proofs_cached, jobs_sent, self.context.merkle_tree + event = "v2_prewarm_partial", + tree = %self.context.merkle_tree, + proofs_cached, + expected_proofs = jobs_sent, + "Pre-warm completed with fewer proofs than expected" ); } else { info!( - "Pre-warmed {} proofs for tree {} (zkp_batch_size={}, items={})", + event = "v2_prewarm_completed", + tree = %self.context.merkle_tree, proofs_cached, - self.context.merkle_tree, - self.zkp_batch_size, - proofs_cached * self.zkp_batch_size as usize + zkp_batch_size = self.zkp_batch_size, + items = proofs_cached * self.zkp_batch_size as usize, + "Pre-warm completed" ); } diff --git a/forester/src/telemetry.rs b/forester/src/telemetry.rs index 2d881406b2..ae006d6c03 100644 --- a/forester/src/telemetry.rs +++ b/forester/src/telemetry.rs @@ -31,15 +31,22 @@ pub fn setup_telemetry() { let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); - let stdout_layer = fmt::Layer::new() + let stdout_layer = fmt::layer() .with_writer(std::io::stdout) + .compact() .with_ansi(true); if let Some(file_appender) = file_appender { let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); let _ = LOG_GUARD.set(guard); - let file_layer = fmt::Layer::new().with_writer(non_blocking); + let file_layer = fmt::layer() + .json() + .with_current_span(true) + .with_span_list(true) + .flatten_event(true) + .with_ansi(false) + .with_writer(non_blocking); tracing_subscriber::registry() .with(stdout_layer) diff --git a/forester/src/tree_data_sync.rs b/forester/src/tree_data_sync.rs index 7d452e4da9..f21187d5e8 100644 --- a/forester/src/tree_data_sync.rs +++ b/forester/src/tree_data_sync.rs @@ -35,8 +35,9 @@ pub async fn fetch_trees(rpc: &R) -> Result> { } Err(e) => { warn!( - "Filtered tree fetch failed, falling back to unfiltered: {:?}", - e + event = "filtered_tree_fetch_failed_fallback_unfiltered", + error = ?e, + "Filtered tree fetch failed; falling back to unfiltered fetch" ); fetch_trees_unfiltered(rpc).await } @@ -92,7 +93,11 @@ pub async fn fetch_trees_filtered(rpc_url: &str) -> Result> { } } Err(e) => { - warn!("Failed to fetch batched trees: {:?}", e); + warn!( + event = "fetch_batched_trees_failed", + error = ?e, + "Failed to fetch batched trees" + ); errors.push(format!("batched: {}", e)); } } @@ -108,7 +113,11 @@ pub async fn fetch_trees_filtered(rpc_url: &str) -> Result> { } } Err(e) => { - warn!("Failed to fetch state V1 trees: {:?}", e); + warn!( + event = "fetch_state_v1_trees_failed", + error = ?e, + "Failed to fetch StateV1 trees" + ); errors.push(format!("state_v1: {}", e)); } } @@ -124,7 +133,11 @@ pub async fn fetch_trees_filtered(rpc_url: &str) -> Result> { } } Err(e) => { - warn!("Failed to fetch address V1 trees: {:?}", e); + warn!( + event = "fetch_address_v1_trees_failed", + error = ?e, + "Failed to fetch AddressV1 trees" + ); errors.push(format!("address_v1: {}", e)); } } @@ -316,7 +329,7 @@ fn create_tree_accounts( tree_accounts } -pub async fn fetch_protocol_group_authority(rpc: &R) -> Result { +pub async fn fetch_protocol_group_authority(rpc: &R, run_id: &str) -> Result { let registered_program_pda = light_registry::account_compression_cpi::sdk::get_registered_program_pda( &light_registry::ID, @@ -336,8 +349,10 @@ pub async fn fetch_protocol_group_authority(rpc: &R) -> Result { .map_err(|e| anyhow::anyhow!("Failed to deserialize RegisteredProgram: {}", e))?; info!( - "Fetched protocol group authority: {}", - registered_program.group_authority_pda + event = "protocol_group_authority_fetched", + run_id = %run_id, + group_authority = %registered_program.group_authority_pda, + "Fetched protocol group authority" ); Ok(registered_program.group_authority_pda) diff --git a/forester/tests/e2e_test.rs b/forester/tests/e2e_test.rs index afca1dc78f..855b765908 100644 --- a/forester/tests/e2e_test.rs +++ b/forester/tests/e2e_test.rs @@ -234,6 +234,7 @@ async fn e2e_test() { sleep_when_idle_ms: 100, queue_polling_mode: Default::default(), group_authority: None, + helius_rpc: false, }, rpc_pool_config: RpcPoolConfig { max_size: 50, diff --git a/forester/tests/legacy/priority_fee_test.rs b/forester/tests/legacy/priority_fee_test.rs index f7c805db30..875d20c5fc 100644 --- a/forester/tests/legacy/priority_fee_test.rs +++ b/forester/tests/legacy/priority_fee_test.rs @@ -30,10 +30,8 @@ async fn test_priority_fee_request() { std::env::var("FORESTER_WS_RPC_URL") .expect("FORESTER_WS_RPC_URL must be set in environment"), ), - indexer_url: Some( - std::env::var("FORESTER_INDEXER_URL") - .expect("FORESTER_INDEXER_URL must be set in environment"), - ), + indexer_url: std::env::var("FORESTER_INDEXER_URL") + .expect("FORESTER_INDEXER_URL must be set in environment"), prover_url: Some( std::env::var("FORESTER_PROVER_URL") .expect("FORESTER_PROVER_URL must be set in environment"), diff --git a/forester/tests/priority_fee_test.rs b/forester/tests/priority_fee_test.rs index 3772539302..dddb76a737 100644 --- a/forester/tests/priority_fee_test.rs +++ b/forester/tests/priority_fee_test.rs @@ -27,9 +27,7 @@ async fn test_priority_fee_request() { ws_rpc_url: Some( std::env::var("WS_RPC_URL").expect("WS_RPC_URL must be set in environment"), ), - indexer_url: Some( - std::env::var("INDEXER_URL").expect("INDEXER_URL must be set in environment"), - ), + indexer_url: std::env::var("INDEXER_URL").expect("INDEXER_URL must be set in environment"), prover_url: Some( std::env::var("PROVER_URL").expect("PROVER_URL must be set in environment"), ), @@ -84,6 +82,7 @@ async fn test_priority_fee_request() { api_server_port: 8080, group_authority: None, light_pda_programs: vec![], + helius_rpc: false, prometheus_url: None, }; diff --git a/forester/tests/test_compressible_ctoken.rs b/forester/tests/test_compressible_ctoken.rs index 3342fb5cb8..080e1c93a4 100644 --- a/forester/tests/test_compressible_ctoken.rs +++ b/forester/tests/test_compressible_ctoken.rs @@ -539,6 +539,7 @@ async fn run_bootstrap_test( rpc_url_clone, tracker_clone, Some(shutdown_rx), + false, ) .await { diff --git a/forester/tests/test_compressible_mint.rs b/forester/tests/test_compressible_mint.rs index 688f6c3c47..0e73f773e7 100644 --- a/forester/tests/test_compressible_mint.rs +++ b/forester/tests/test_compressible_mint.rs @@ -214,6 +214,7 @@ async fn test_compressible_mint_bootstrap() { rpc_url, tracker_clone, Some(shutdown_rx), + false, ) .await { diff --git a/forester/tests/test_compressible_pda.rs b/forester/tests/test_compressible_pda.rs index e04f630432..f3a447fe23 100644 --- a/forester/tests/test_compressible_pda.rs +++ b/forester/tests/test_compressible_pda.rs @@ -398,6 +398,7 @@ async fn test_compressible_pda_bootstrap() { rpc_url, tracker_clone, Some(shutdown_rx), + false, ) .await { diff --git a/forester/tests/test_utils.rs b/forester/tests/test_utils.rs index d9aaa2f0c2..1a1d3f18f6 100644 --- a/forester/tests/test_utils.rs +++ b/forester/tests/test_utils.rs @@ -115,6 +115,7 @@ pub fn forester_config() -> ForesterConfig { sleep_when_idle_ms: 100, queue_polling_mode: QueuePollingMode::OnChain, group_authority: None, + helius_rpc: false, }, rpc_pool_config: RpcPoolConfig { max_size: 50, diff --git a/js/compressed-token/src/v3/actions/create-mint-interface.ts b/js/compressed-token/src/v3/actions/create-mint-interface.ts index 757c38a6b7..f57f7bbd40 100644 --- a/js/compressed-token/src/v3/actions/create-mint-interface.ts +++ b/js/compressed-token/src/v3/actions/create-mint-interface.ts @@ -81,9 +81,7 @@ export async function createMintInterface( // Default: light-token mint creation if (!('secretKey' in mintAuthority)) { - throw new Error( - 'mintAuthority must be a Signer for light-token mints', - ); + throw new Error('mintAuthority must be a Signer for light-token mints'); } if ( addressTreeInfo && diff --git a/js/compressed-token/src/v3/get-mint-interface.ts b/js/compressed-token/src/v3/get-mint-interface.ts index aeffe71611..3d5bf10033 100644 --- a/js/compressed-token/src/v3/get-mint-interface.ts +++ b/js/compressed-token/src/v3/get-mint-interface.ts @@ -103,9 +103,7 @@ export async function getMintInterface( ); if (!compressedAccount?.data?.data) { - throw new Error( - `Light mint not found for ${address.toString()}`, - ); + throw new Error(`Light mint not found for ${address.toString()}`); } const compressedData = Buffer.from(compressedAccount.data.data);