Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions forester/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

## [Unreleased]

### Added

- **Graceful shutdown signaling** via `watch::channel`. Shutdown requests are now race-free regardless of when the run loop subscribes.
- **Panic isolation for `process_epoch`.** A panicking epoch no longer kills the run loop; the panic message is logged and processing continues.

### Fixed

- **`bigint_to_u8_32` now rejects negative `BigInt` inputs** (`light-prover-client`). Previously, negative inputs were silently converted to `[u8; 32]` using only the magnitude bytes, producing wrong-sign output that would cause silent proof-input corruption.
- **`pathIndex` widened from `u32` to `u64`** on both the Rust client and the Go prover server. The Gnark circuit already constrained by tree height (up to 40 bits for v2 address trees); only the JSON marshalling and runtime struct types were artificially narrow. This prevents proof generation failures once a v2 address tree exceeds ~4.3 billion entries.

### Breaking Changes

- **Removed `--photon-api-key` CLI arg and `PHOTON_API_KEY` env var.** The API key should now be included in `--indexer-url` as a query parameter:
Expand Down
167 changes: 116 additions & 51 deletions forester/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use forester_utils::{
forester_epoch::{get_epoch_phases, Epoch, ForesterSlot, TreeAccounts, TreeForesterSchedule},
rpc_pool::SolanaRpcPool,
};
use futures::future::join_all;
use light_client::{
indexer::{Indexer, MerkleProof, NewAddressProofWithContext},
rpc::{LightClient, LightClientConfig, RetryConfig, Rpc, RpcError},
Expand All @@ -39,7 +38,7 @@ use solana_sdk::{
transaction::TransactionError,
};
use tokio::{
sync::{mpsc, oneshot, Mutex},
sync::{mpsc, oneshot, watch, Mutex},
task::JoinHandle,
time::{sleep, Instant, MissedTickBehavior},
};
Expand Down Expand Up @@ -94,8 +93,6 @@ type StateBatchProcessorMap<R> =
type AddressBatchProcessorMap<R> =
Arc<DashMap<Pubkey, (u64, Arc<Mutex<QueueProcessor<R, AddressTreeStrategy>>>)>>;
type ProcessorInitLockMap = Arc<DashMap<Pubkey, Arc<Mutex<()>>>>;
type TreeProcessingTask = JoinHandle<Result<()>>;

/// Coordinates re-finalization across parallel `process_queue` tasks when new
/// foresters register mid-epoch. Only one task performs the on-chain
/// `finalize_registration` tx; others wait for it to complete.
Expand Down Expand Up @@ -280,6 +277,10 @@ pub struct EpochManager<R: Rpc + Indexer> {
run_id: Arc<str>,
/// Per-epoch registration trackers to coordinate re-finalization when new foresters register mid-epoch
registration_trackers: Arc<DashMap<u64, Arc<RegistrationTracker>>>,
/// Set to `true` by `request_shutdown`; `run` observes it via a
/// `watch::Receiver` so a request that arrives before the subscriber
/// is still picked up on the next loop iteration.
shutdown_tx: watch::Sender<bool>,
}

impl<R: Rpc + Indexer> Clone for EpochManager<R> {
Expand Down Expand Up @@ -310,6 +311,7 @@ impl<R: Rpc + Indexer> Clone for EpochManager<R> {
heartbeat: self.heartbeat.clone(),
run_id: self.run_id.clone(),
registration_trackers: self.registration_trackers.clone(),
shutdown_tx: self.shutdown_tx.clone(),
}
}
}
Expand Down Expand Up @@ -359,23 +361,28 @@ impl<R: Rpc + Indexer> EpochManager<R> {
heartbeat,
run_id: Arc::<str>::from(run_id),
registration_trackers: Arc::new(DashMap::new()),
shutdown_tx: watch::channel(false).0,
})
}

fn request_shutdown(&self) {
let _ = self.shutdown_tx.send(true);
}

pub async fn run(self: Arc<Self>) -> Result<()> {
let (tx, mut rx) = mpsc::channel(100);
let tx = Arc::new(tx);

let mut monitor_handle = tokio::spawn({
let self_clone = Arc::clone(&self);
let tx_clone = Arc::clone(&tx);
let self_clone = self.clone();
let tx_clone = tx.clone();
async move { self_clone.monitor_epochs(tx_clone).await }
});

// Process current and previous epochs
let current_previous_handle = tokio::spawn({
let self_clone = Arc::clone(&self);
let tx_clone = Arc::clone(&tx);
let self_clone = self.clone();
let tx_clone = tx.clone();
async move {
self_clone
.process_current_and_previous_epochs(tx_clone)
Expand All @@ -384,35 +391,80 @@ impl<R: Rpc + Indexer> EpochManager<R> {
});

let tree_discovery_handle = tokio::spawn({
let self_clone = Arc::clone(&self);
let self_clone = self.clone();
async move { self_clone.discover_trees_periodically().await }
});

let balance_check_handle = tokio::spawn({
let self_clone = Arc::clone(&self);
let self_clone = self.clone();
async move { self_clone.check_sol_balance_periodically().await }
});

// Abort all background tasks (including monitor) on any exit path,
// not just a clean break out of the loop. `monitor_handle` is still
// polled in the select! below — only its `AbortHandle` lives here.
let _guard = scopeguard::guard(
(
monitor_handle.abort_handle(),
current_previous_handle,
tree_discovery_handle,
balance_check_handle,
),
|(h2, h3, h4)| {
|(monitor, h2, h3, h4)| {
info!(
event = "background_tasks_aborting",
run_id = %self.run_id,
"Aborting EpochManager background tasks"
);
monitor.abort();
h2.abort();
h3.abort();
h4.abort();
},
);

let mut shutdown_rx = self.shutdown_tx.subscribe();
let mut epoch_tasks = tokio::task::JoinSet::new();
let result = loop {
// Check the flag at the top of the loop so a shutdown requested
// before we subscribed (or between iterations) is observed
// without depending on `changed()` firing.
if *shutdown_rx.borrow_and_update() {
info!(
event = "epoch_manager_shutdown_requested",
run_id = %self.run_id,
"Stopping EpochManager after shutdown request"
);
break Ok(());
}

tokio::select! {
_ = shutdown_rx.changed() => {}
Some(join_result) = epoch_tasks.join_next() => {
match join_result {
Ok(Ok(())) => debug!(
event = "epoch_processing_completed",
run_id = %self.run_id,
"Epoch processed successfully"
),
Ok(Err(e)) => error!(
event = "epoch_processing_failed",
run_id = %self.run_id,
error = ?e,
"Error processing epoch"
),
Err(join_error) => {
if join_error.is_panic() {
error!(
event = "epoch_processing_panicked",
run_id = %self.run_id,
error = %join_error,
"Epoch processing panicked"
);
}
}
}
}
epoch_opt = rx.recv() => {
match epoch_opt {
Some(epoch) => {
Expand All @@ -422,17 +474,9 @@ impl<R: Rpc + Indexer> EpochManager<R> {
epoch,
"Received epoch from monitor"
);
let self_clone = Arc::clone(&self);
tokio::spawn(async move {
if let Err(e) = self_clone.process_epoch(epoch).await {
error!(
event = "epoch_processing_failed",
run_id = %self_clone.run_id,
epoch,
error = ?e,
"Error processing epoch"
);
}
let self_clone = self.clone();
epoch_tasks.spawn(async move {
self_clone.process_epoch(epoch).await
});
}
None => {
Expand Down Expand Up @@ -486,8 +530,7 @@ impl<R: Rpc + Indexer> EpochManager<R> {
}
};

// Abort monitor_handle on exit
monitor_handle.abort();
// `_guard` aborts monitor_handle and the other background tasks.
result
}

Expand Down Expand Up @@ -641,7 +684,7 @@ impl<R: Rpc + Indexer> EpochManager<R> {
}
}

async fn add_new_tree(&self, new_tree: TreeAccounts) -> Result<()> {
async fn add_new_tree(self: &Arc<Self>, new_tree: TreeAccounts) -> Result<()> {
info!(
event = "new_tree_add_started",
run_id = %self.run_id,
Expand Down Expand Up @@ -701,7 +744,7 @@ impl<R: Rpc + Indexer> EpochManager<R> {
)?;
epoch_info.trees.push(tree_schedule.clone());

let self_clone = Arc::new(self.clone());
let self_clone = self.clone();
let tracker = self
.registration_trackers
.entry(current_epoch)
Expand Down Expand Up @@ -1037,7 +1080,7 @@ impl<R: Rpc + Indexer> EpochManager<R> {
}

#[instrument(level = "debug", skip(self), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch))]
async fn process_epoch(&self, epoch: u64) -> Result<()> {
async fn process_epoch(self: Arc<Self>, epoch: u64) -> Result<()> {
// Clone the Arc immediately to release the DashMap shard lock.
// Without .clone(), the RefMut guard would be held across async operations,
// blocking other epochs from accessing the DashMap if they hash to the same shard.
Expand Down Expand Up @@ -1115,7 +1158,7 @@ impl<R: Rpc + Indexer> EpochManager<R> {

// Perform work
if self.sync_slot().await? < phases.active.end {
self.perform_active_work(&registration_info).await?;
self.clone().perform_active_work(&registration_info).await?;
}
// Wait for report work phase
if self.sync_slot().await? < phases.report_work.start {
Expand Down Expand Up @@ -1589,7 +1632,7 @@ impl<R: Rpc + Indexer> EpochManager<R> {
skip(self, epoch_info),
fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch_info.epoch.epoch
))]
async fn perform_active_work(&self, epoch_info: &ForesterEpochInfo) -> Result<()> {
async fn perform_active_work(self: Arc<Self>, epoch_info: &ForesterEpochInfo) -> Result<()> {
self.heartbeat.increment_active_cycle();

let current_slot = self.slot_tracker.estimated_current_slot();
Expand All @@ -1615,6 +1658,17 @@ impl<R: Rpc + Indexer> EpochManager<R> {
.cloned()
.collect();

if trees_to_process.is_empty() {
debug!(
event = "active_work_cycle_no_trees",
run_id = %self.run_id,
"No trees to process this cycle"
);
let mut rpc = self.rpc_pool.get_connection().await?;
wait_until_slot_reached(&mut *rpc, &self.slot_tracker, active_phase_end).await?;
return Ok(());
}

info!(
event = "active_work_cycle_started",
run_id = %self.run_id,
Expand All @@ -1624,7 +1678,6 @@ impl<R: Rpc + Indexer> EpochManager<R> {
"Starting active work cycle"
);

let self_arc = Arc::new(self.clone());
let registration_tracker = self
.registration_trackers
.entry(epoch_info.epoch.epoch)
Expand All @@ -1636,7 +1689,7 @@ impl<R: Rpc + Indexer> EpochManager<R> {
.value()
.clone();

let mut handles: Vec<TreeProcessingTask> = Vec::with_capacity(trees_to_process.len());
let mut tree_tasks = tokio::task::JoinSet::new();

for tree in trees_to_process {
debug!(
Expand All @@ -1648,27 +1701,22 @@ impl<R: Rpc + Indexer> EpochManager<R> {
);
self.heartbeat.add_tree_tasks_spawned(1);

let self_clone = self_arc.clone();
let self_clone = self.clone();
let epoch_clone = epoch_info.epoch.clone();
let forester_epoch_pda = epoch_info.forester_epoch_pda.clone();
let tracker = registration_tracker.clone();

let handle = tokio::spawn(async move {
tree_tasks.spawn(async move {
self_clone
.process_queue(&epoch_clone, forester_epoch_pda, tree, tracker)
.await
});

handles.push(handle);
}

debug!("Waiting for {} tree processing tasks", handles.len());
let results = join_all(handles).await;
let mut success_count = 0usize;
let mut error_count = 0usize;
let mut panic_count = 0usize;
for result in results {
match result {
while let Some(join_result) = tree_tasks.join_next().await {
match join_result {
Ok(Ok(())) => success_count += 1,
Ok(Err(e)) => {
error_count += 1;
Expand All @@ -1679,14 +1727,16 @@ impl<R: Rpc + Indexer> EpochManager<R> {
"Error processing queue"
);
}
Err(e) => {
panic_count += 1;
error!(
event = "tree_processing_task_panicked",
run_id = %self.run_id,
error = ?e,
"Tree processing task panicked"
);
Err(join_error) => {
if join_error.is_panic() {
panic_count += 1;
error!(
event = "tree_processing_task_panicked",
run_id = %self.run_id,
error = %join_error,
"Tree processing task panicked"
);
}
}
}
}
Expand Down Expand Up @@ -4509,16 +4559,20 @@ pub async fn run_service<R: Rpc + Indexer>(
retry_count + 1
);

let run_future = epoch_manager.clone().run();
tokio::pin!(run_future);

let result = tokio::select! {
result = epoch_manager.run() => result,
_ = shutdown => {
result = &mut run_future => result,
_ = &mut shutdown => {
info!(
event = "shutdown_received",
run_id = %run_id_for_logs,
phase = "service_run",
"Received shutdown signal. Stopping the service."
);
Ok(())
epoch_manager.request_shutdown();
run_future.await
}
};

Expand Down Expand Up @@ -4810,4 +4864,15 @@ mod tests {
assert_eq!(report.metrics.total_proof_generation().as_secs(), 7);
assert_eq!(report.metrics.total_round_trip().as_secs(), 27);
}

#[tokio::test]
async fn watch_shutdown_observed_after_request() {
// Mirrors the run-loop pattern: subscribe, then check via
// borrow_and_update so a value set before subscribe is still seen.
let (tx, _initial_rx) = watch::channel(false);
tx.send(true).expect("send");

let mut rx = tx.subscribe();
assert!(*rx.borrow_and_update());
}
}
Loading