Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions forester/dashboard/src/app/api/[...path]/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { NextResponse } from "next/server";

const BACKEND_URL =
process.env.FORESTER_API_URL || "http://localhost:8080";

export async function GET(
request: Request,
{ params }: { params: Promise<{ path: string[] }> }
) {
const { path } = await params;
const backendPath = path.join("/");
try {
const response = await fetch(`${BACKEND_URL}/${backendPath}`);
const data = await response.json();
return NextResponse.json(data, { status: response.status });
} catch {
return NextResponse.json(
{ error: "Backend unavailable" },
{ status: 502 }
);
}
}
2 changes: 1 addition & 1 deletion forester/dashboard/src/lib/api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const API_URL =
process.env.NEXT_PUBLIC_FORESTER_API_URL || "http://localhost:8080";
process.env.NEXT_PUBLIC_FORESTER_API_URL ?? "/api";

export class ApiError extends Error {
constructor(
Expand Down
120 changes: 97 additions & 23 deletions forester/src/api_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,13 @@ pub struct CompressibleDashboardState {

/// Configuration for the HTTP API server.
pub struct ApiServerConfig {
pub run_id: Arc<str>,
pub rpc_url: String,
pub port: u16,
pub allow_public_bind: bool,
pub compressible_state: Option<CompressibleDashboardState>,
pub prometheus_url: Option<String>,
pub helius_rpc: bool,
}

/// Default timeout for status endpoint in seconds
Expand All @@ -120,16 +122,23 @@ pub struct ApiServerHandle {
pub thread_handle: JoinHandle<()>,
/// Sender to trigger graceful shutdown
pub shutdown_tx: oneshot::Sender<()>,
pub run_id: Arc<str>,
}

impl ApiServerHandle {
/// Trigger graceful shutdown and wait for the server to stop
pub fn shutdown(self) {
let run_id = self.run_id.clone();
// Send shutdown signal (ignore error if receiver already dropped)
let _ = self.shutdown_tx.send(());
// Wait for the thread to finish
if let Err(e) = self.thread_handle.join() {
error!("API server thread panicked: {:?}", e);
error!(
event = "api_server_thread_panicked",
run_id = %run_id,
error = ?e,
"API server thread panicked"
);
}
}
}
Expand All @@ -142,6 +151,7 @@ impl ApiServerHandle {
pub(crate) async fn fetch_metrics_snapshot(
client: &reqwest::Client,
prometheus_url: &Option<String>,
run_id: &str,
) -> MetricsSnapshot {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
Expand Down Expand Up @@ -170,7 +180,12 @@ pub(crate) async fn fetch_metrics_snapshot(
};
}
Err(e) => {
warn!("Prometheus query failed: {}", e);
warn!(
event = "api_server_prometheus_query_failed",
run_id = %run_id,
error = %e,
"Prometheus query failed"
);
}
}
}
Expand All @@ -187,6 +202,8 @@ pub(crate) async fn fetch_metrics_snapshot(
pub(crate) async fn fetch_compressible_snapshot(
trackers: &Option<CompressibleTrackers>,
rpc_url: &str,
helius_rpc: bool,
run_id: &str,
) -> CompressibleSnapshot {
use crate::compressible::traits::CompressibleTracker;

Expand All @@ -211,7 +228,7 @@ pub(crate) async fn fetch_compressible_snapshot(
// Standalone mode: RPC with timeout
let fetch_result = tokio::time::timeout(
COMPRESSIBLE_FETCH_TIMEOUT,
crate::compressible::count_compressible_accounts(rpc_url),
crate::compressible::count_compressible_accounts(rpc_url, helius_rpc),
)
.await;

Expand All @@ -227,7 +244,12 @@ pub(crate) async fn fetch_compressible_snapshot(
cached_at: now,
},
Ok(Err(e)) => {
warn!("RPC compressible count failed: {}", e);
warn!(
event = "api_server_compressible_count_failed",
run_id = %run_id,
error = %e,
"RPC compressible count failed"
);
CompressibleSnapshot {
data: CompressibleResponse {
enabled: false,
Expand All @@ -241,8 +263,10 @@ pub(crate) async fn fetch_compressible_snapshot(
}
Err(_) => {
warn!(
"Compressible count timed out after {}s",
COMPRESSIBLE_FETCH_TIMEOUT.as_secs()
event = "api_server_compressible_count_timeout",
run_id = %run_id,
timeout_seconds = COMPRESSIBLE_FETCH_TIMEOUT.as_secs(),
"Compressible count timed out"
);
CompressibleSnapshot {
data: CompressibleResponse {
Expand All @@ -268,9 +292,10 @@ async fn run_metrics_provider(
client: reqwest::Client,
prometheus_url: Option<String>,
mut shutdown: broadcast::Receiver<()>,
run_id: Arc<str>,
) {
loop {
let snapshot = fetch_metrics_snapshot(&client, &prometheus_url).await;
let snapshot = fetch_metrics_snapshot(&client, &prometheus_url, run_id.as_ref()).await;
if tx.send(snapshot).is_err() {
break; // all receivers dropped
}
Expand All @@ -279,7 +304,11 @@ async fn run_metrics_provider(
_ = shutdown.recv() => break,
}
}
info!("Metrics provider stopped");
info!(
event = "api_server_metrics_provider_stopped",
run_id = %run_id,
"Metrics provider stopped"
);
}

/// Periodically fetches compressible counts and publishes via watch channel.
Expand All @@ -288,6 +317,8 @@ async fn run_compressible_provider(
trackers: Option<CompressibleTrackers>,
rpc_url: String,
mut shutdown: broadcast::Receiver<()>,
helius_rpc: bool,
run_id: Arc<str>,
) {
// In-memory trackers are cheap (.len()); RPC is expensive (getProgramAccounts)
let interval = if trackers.is_some() {
Expand All @@ -297,7 +328,8 @@ async fn run_compressible_provider(
};

loop {
let snapshot = fetch_compressible_snapshot(&trackers, &rpc_url).await;
let snapshot =
fetch_compressible_snapshot(&trackers, &rpc_url, helius_rpc, run_id.as_ref()).await;
if tx.send(snapshot).is_err() {
break;
}
Expand All @@ -306,7 +338,11 @@ async fn run_compressible_provider(
_ = shutdown.recv() => break,
}
}
info!("Compressible provider stopped");
info!(
event = "api_server_compressible_provider_stopped",
run_id = %run_id,
"Compressible provider stopped"
);
}

// ---------------------------------------------------------------------------
Expand All @@ -319,26 +355,40 @@ async fn run_compressible_provider(
/// An `ApiServerHandle` that can be used to trigger graceful shutdown
pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle {
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let run_id_for_handle = config.run_id.clone();

let thread_handle = std::thread::spawn(move || {
let run_id = config.run_id.clone();
let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(e) => {
error!("Failed to create tokio runtime for API server: {}", e);
error!(
event = "api_server_runtime_create_failed",
run_id = %run_id,
error = %e,
"Failed to create tokio runtime for API server"
);
return;
}
};
rt.block_on(async move {
let addr = if config.allow_public_bind {
warn!(
"API server binding to 0.0.0.0:{} - endpoints will be publicly accessible",
config.port
event = "api_server_public_bind_enabled",
run_id = %run_id,
port = config.port,
"API server binding to 0.0.0.0; endpoints will be publicly accessible"
);
SocketAddr::from(([0, 0, 0, 0], config.port))
} else {
SocketAddr::from(([127, 0, 0, 1], config.port))
};
info!("Starting HTTP API server on {}", addr);
info!(
event = "api_server_started",
run_id = %run_id,
address = %addr,
"Starting HTTP API server"
);

// Shared HTTP client with timeout for external requests (Prometheus)
let http_client = reqwest::Client::builder()
Expand Down Expand Up @@ -369,13 +419,16 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle {
http_client.clone(),
config.prometheus_url.clone(),
provider_shutdown_tx.subscribe(),
run_id.clone(),
));

tokio::spawn(run_compressible_provider(
compressible_tx,
trackers,
config.rpc_url.clone(),
provider_shutdown_tx.subscribe(),
config.helius_rpc,
run_id.clone(),
));

let cors = warp::cors()
Expand All @@ -391,8 +444,10 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle {

// --- Status route (unchanged — per-request RPC call) ---
let rpc_url_for_status = config.rpc_url.clone();
let run_id_for_status = run_id.clone();
let status_route = warp::path("status").and(warp::get()).and_then(move || {
let rpc_url = rpc_url_for_status.clone();
let run_id = run_id_for_status.clone();
async move {
let timeout_duration = Duration::from_secs(STATUS_TIMEOUT_SECS);
match tokio::time::timeout(timeout_duration, get_forester_status(&rpc_url))
Expand All @@ -403,7 +458,12 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle {
warp::http::StatusCode::OK,
)),
Ok(Err(e)) => {
error!("Failed to get forester status: {:?}", e);
error!(
event = "api_server_status_fetch_failed",
run_id = %run_id,
error = ?e,
"Failed to get forester status"
);
let error_response = ErrorResponse {
error: format!("Failed to get forester status: {}", e),
};
Expand All @@ -414,8 +474,10 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle {
}
Err(_elapsed) => {
error!(
"Forester status request timed out after {}s",
STATUS_TIMEOUT_SECS
event = "api_server_status_timeout",
run_id = %run_id,
timeout_seconds = STATUS_TIMEOUT_SECS,
"Forester status request timed out"
);
let error_response = ErrorResponse {
error: format!(
Expand Down Expand Up @@ -453,21 +515,33 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle {
warp::serve(routes)
.bind(addr)
.await
.graceful(async move {
let _ = shutdown_rx.await;
info!("API server received shutdown signal");
// Signal providers to stop
let _ = provider_shutdown_tx.send(());
.graceful({
let run_id_for_shutdown = run_id.clone();
async move {
let _ = shutdown_rx.await;
info!(
event = "api_server_shutdown_signal_received",
run_id = %run_id_for_shutdown,
"API server received shutdown signal"
);
// Signal providers to stop
let _ = provider_shutdown_tx.send(());
}
})
.run()
.await;
info!("API server shut down gracefully");
info!(
event = "api_server_stopped",
run_id = %run_id,
"API server shut down gracefully"
);
});
});

ApiServerHandle {
thread_handle,
shutdown_tx,
run_id: run_id_for_handle,
}
}

Expand Down
14 changes: 13 additions & 1 deletion forester/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct StartArgs {
env = "INDEXER_URL",
help = "Photon indexer URL. API key can be included as query param: https://host?api-key=KEY"
)]
pub indexer_url: Option<String>,
pub indexer_url: String,

#[arg(long, env = "PROVER_URL")]
pub prover_url: Option<String>,
Expand Down Expand Up @@ -270,6 +270,14 @@ pub struct StartArgs {
)]
pub api_server_public_bind: bool,

#[arg(
long,
env = "HELIUS_RPC",
help = "Use Helius getProgramAccountsV2 for compressible account queries (default: standard getProgramAccounts)",
default_value = "false"
)]
pub helius_rpc: bool,

#[arg(
long,
env = "GROUP_AUTHORITY",
Expand Down Expand Up @@ -438,6 +446,7 @@ mod tests {
"forester",
"--processor-mode", "v1",
"--rpc-url", "http://test.com",
"--indexer-url", "http://indexer.test.com",
"--payer", "[1,2,3]",
"--derivation", "[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]"
]).unwrap();
Expand All @@ -448,6 +457,7 @@ mod tests {
"forester",
"--processor-mode", "v2",
"--rpc-url", "http://test.com",
"--indexer-url", "http://indexer.test.com",
"--payer", "[1,2,3]",
"--derivation", "[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]"
]).unwrap();
Expand All @@ -457,6 +467,7 @@ mod tests {
let args = StartArgs::try_parse_from([
"forester",
"--rpc-url", "http://test.com",
"--indexer-url", "http://indexer.test.com",
"--payer", "[1,2,3]",
"--derivation", "[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]"
]).unwrap();
Expand All @@ -467,6 +478,7 @@ mod tests {
"forester",
"--processor-mode", "invalid-mode",
"--rpc-url", "http://test.com",
"--indexer-url", "http://indexer.test.com",
"--payer", "[1,2,3]",
"--derivation", "[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]"
]);
Expand Down
Loading