From d187f5ec315ea028be397b7e3a52b6b96cfe0805 Mon Sep 17 00:00:00 2001 From: sistemd Date: Tue, 10 Mar 2026 17:30:24 +0100 Subject: [PATCH 1/5] refactor(extractors/solana): in-memory CAR file parsing --- Cargo.lock | 11 +- crates/core/datasets-raw/src/client.rs | 19 - crates/core/providers-evm-rpc/src/client.rs | 8 +- crates/core/providers-firehose/src/client.rs | 8 +- .../src/client/block_stream.rs | 10 +- crates/core/providers-solana/src/config.rs | 9 +- .../core/worker-datasets-raw/src/job_impl.rs | 14 +- crates/extractors/solana/Cargo.toml | 4 +- .../solana/examples/solana_compare.rs | 48 +- crates/extractors/solana/src/client.rs | 160 +--- crates/extractors/solana/src/error.rs | 48 +- crates/extractors/solana/src/lib.rs | 4 +- crates/extractors/solana/src/of1_client.rs | 863 ++++++++---------- docs/code/extractors.md | 3 - docs/providers/solana.spec.json | 12 +- tests/config/providers/solana_mainnet.toml | 2 - ...olana_historical_to_json_rpc_transition.rs | 4 +- 17 files changed, 503 insertions(+), 724 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ff1de4c43..c0e1bd940 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6969,15 +6969,6 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" -[[package]] -name = "memmap2" -version = "0.9.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "714098028fe011992e1c3962653c96b2d578c4b4bce9036e15ff220319b1e0e3" -dependencies = [ - "libc", -] - [[package]] name = "memmem" version = "0.1.1" @@ -10615,6 +10606,7 @@ dependencies = [ "base64 0.22.1", "bincode 1.3.3", "bs58", + "bytes", "clap", "datasets-common", "datasets-raw", @@ -10622,7 +10614,6 @@ dependencies = [ "futures", "governor 0.10.4", "headers", - "memmap2", "monitoring", "pretty_assertions", "prost", diff --git a/crates/core/datasets-raw/src/client.rs b/crates/core/datasets-raw/src/client.rs index e8b246b00..304b2f316 100644 --- a/crates/core/datasets-raw/src/client.rs +++ b/crates/core/datasets-raw/src/client.rs @@ -76,9 +76,6 @@ impl std::convert::AsRef for BlockStreamError { /// Error type for [`BlockStreamer::latest_block`]. pub type LatestBlockError = Box; -/// Error type for [`BlockStreamer::wait_for_cleanup`]. -pub type CleanupError = Box; - /// Trait for extracting raw blockchain data as a stream of rows. /// /// Implementations fetch blocks from a data source (RPC, archive, etc.) and convert them @@ -106,18 +103,6 @@ pub trait BlockStreamer: Clone + 'static { /// for parallel streaming. fn bucket_size(&self) -> Option; - /// Waits for any background work and resources associated with this [`BlockStreamer`] - /// to be cleaned up. - /// - /// This should be called once the user no longer needs to create new block streams - /// to allow implementations to terminate internal tasks, flush or release network - /// connections, and free any other resources. - /// - /// After requesting cleanup, callers should not call [BlockStreamer::block_stream] - /// again on the same instance. Behavior when creating new streams after cleanup is - /// implementation-defined and must not be relied on. - fn wait_for_cleanup(self) -> impl Future> + Send; - fn provider_name(&self) -> &str; } @@ -261,10 +246,6 @@ where self.0.bucket_size() } - fn wait_for_cleanup(self) -> impl Future> + Send { - self.0.wait_for_cleanup() - } - fn provider_name(&self) -> &str { self.0.provider_name() } diff --git a/crates/core/providers-evm-rpc/src/client.rs b/crates/core/providers-evm-rpc/src/client.rs index 707106c1f..37c4a9427 100644 --- a/crates/core/providers-evm-rpc/src/client.rs +++ b/crates/core/providers-evm-rpc/src/client.rs @@ -29,9 +29,7 @@ use datasets_common::{ }; use datasets_raw::{ Timestamp, - client::{ - BlockStreamError, BlockStreamResultExt, BlockStreamer, CleanupError, LatestBlockError, - }, + client::{BlockStreamError, BlockStreamResultExt, BlockStreamer, LatestBlockError}, evm::{ EvmCurrency, tables::{ @@ -499,10 +497,6 @@ impl BlockStreamer for Client { None } - async fn wait_for_cleanup(self) -> Result<(), CleanupError> { - Ok(()) - } - fn provider_name(&self) -> &str { &self.provider_name } diff --git a/crates/core/providers-firehose/src/client.rs b/crates/core/providers-firehose/src/client.rs index 5cca4f16e..545af5895 100644 --- a/crates/core/providers-firehose/src/client.rs +++ b/crates/core/providers-firehose/src/client.rs @@ -8,9 +8,7 @@ use amp_providers_common::{network_id::NetworkId, provider_name::ProviderName}; use async_stream::stream; use datasets_common::block_num::BlockNum; use datasets_raw::{ - client::{ - BlockStreamError, BlockStreamResultExt, BlockStreamer, CleanupError, LatestBlockError, - }, + client::{BlockStreamError, BlockStreamResultExt, BlockStreamer, LatestBlockError}, rows::Rows, }; use futures::{Stream, StreamExt as _, TryStreamExt as _}; @@ -275,10 +273,6 @@ impl BlockStreamer for Client { None } - async fn wait_for_cleanup(self) -> Result<(), CleanupError> { - Ok(()) - } - #[instrument(skip(self), err)] async fn latest_block( &mut self, diff --git a/crates/core/providers-registry/src/client/block_stream.rs b/crates/core/providers-registry/src/client/block_stream.rs index 647a008a6..6162334c3 100644 --- a/crates/core/providers-registry/src/client/block_stream.rs +++ b/crates/core/providers-registry/src/client/block_stream.rs @@ -10,7 +10,7 @@ use amp_providers_solana::kind::SolanaProviderKind; use async_stream::stream; use datasets_common::{block_num::BlockNum, network_id::NetworkId}; use datasets_raw::{ - client::{BlockStreamError, BlockStreamer, CleanupError, LatestBlockError}, + client::{BlockStreamError, BlockStreamer, LatestBlockError}, rows::Rows, }; use futures::Stream; @@ -141,14 +141,6 @@ impl BlockStreamer for BlockStreamClient { } } - async fn wait_for_cleanup(self) -> Result<(), CleanupError> { - match self { - Self::EvmRpc(client) => client.wait_for_cleanup().await, - Self::Solana(client) => client.wait_for_cleanup().await, - Self::Firehose(client) => client.wait_for_cleanup().await, - } - } - fn provider_name(&self) -> &str { match self { Self::EvmRpc(client) => client.provider_name(), diff --git a/crates/core/providers-solana/src/config.rs b/crates/core/providers-solana/src/config.rs index b349c2e95..b905102de 100644 --- a/crates/core/providers-solana/src/config.rs +++ b/crates/core/providers-solana/src/config.rs @@ -1,4 +1,4 @@ -use std::{num::NonZeroU32, path::PathBuf}; +use std::num::NonZeroU32; use amp_providers_common::{network_id::NetworkId, redacted::Redacted}; use headers::{HeaderName, HeaderValue}; @@ -40,13 +40,6 @@ pub struct SolanaProviderConfig { /// Optional rate limit for RPC calls per second. pub max_rpc_calls_per_second: Option, - /// Directory for storing Old Faithful ONE CAR files. - pub of1_car_directory: PathBuf, - - /// Whether to keep downloaded CAR files after processing. - #[serde(default)] - pub keep_of1_car_files: bool, - /// Controls when to use the Solana archive for historical data. #[serde(default)] pub use_archive: UseArchive, diff --git a/crates/core/worker-datasets-raw/src/job_impl.rs b/crates/core/worker-datasets-raw/src/job_impl.rs index ee9cfc3bb..bd4b15786 100644 --- a/crates/core/worker-datasets-raw/src/job_impl.rs +++ b/crates/core/worker-datasets-raw/src/job_impl.rs @@ -102,7 +102,7 @@ use datasets_common::{ table_name::TableName, }; use datasets_raw::{ - client::{BlockStreamer as _, BlockStreamerExt as _, CleanupError, LatestBlockError}, + client::{BlockStreamer as _, BlockStreamerExt as _, LatestBlockError}, dataset::Dataset as RawDataset, }; @@ -407,7 +407,6 @@ pub async fn execute( } } - client.wait_for_cleanup().await.map_err(Error::Cleanup)?; tracing::info!("materialize completed successfully"); Ok(()) @@ -525,13 +524,6 @@ pub enum Error { /// - Partition task panics (assertion failures, unwrap on None/Err, stack overflow) #[error("Partition task failed")] PartitionTask(#[source] TryWaitAllError), - - /// Failure during blockchain client cleanup - /// - /// At the end of the materialize process, the blockchain client may need to perform - /// cleanup operations, some of which could fail. This error indicates such a failure. - #[error("Failed to perform blockchain client cleanup: {0}")] - Cleanup(#[source] CleanupError), } impl RetryableErrorExt for Error { @@ -565,9 +557,6 @@ impl RetryableErrorExt for Error { // Partition tasks — delegate to TryWaitAllError classification Self::PartitionTask(err) => err.is_retryable(), - - // Cleanup failures — recoverable - Self::Cleanup(_) => true, } } } @@ -595,7 +584,6 @@ impl amp_worker_core::retryable::JobErrorExt for Error { Self::LatestBlock(_) => "LATEST_BLOCK", Self::MissingRanges(_) => "MISSING_RANGES", Self::PartitionTask(_) => "PARTITION_TASK", - Self::Cleanup(_) => "CLEANUP", } } } diff --git a/crates/extractors/solana/Cargo.toml b/crates/extractors/solana/Cargo.toml index 149f93cfe..0ead9c99f 100644 --- a/crates/extractors/solana/Cargo.toml +++ b/crates/extractors/solana/Cargo.toml @@ -13,8 +13,9 @@ amp-providers-solana = { path = "../../core/providers-solana" } anyhow.workspace = true async-stream.workspace = true backon.workspace = true -bs58 = "0.5.1" base64 = "0.22.1" +bs58 = "0.5.1" +bytes.workspace = true datasets-common = { path = "../../core/datasets-common" } datasets-raw = { path = "../../core/datasets-raw" } futures.workspace = true @@ -22,7 +23,6 @@ fs-err.workspace = true governor.workspace = true headers = { workspace = true } monitoring = { path = "../../core/monitoring" } -memmap2 = "0.9.9" reqwest.workspace = true schemars = { workspace = true, optional = true } serde.workspace = true diff --git a/crates/extractors/solana/examples/solana_compare.rs b/crates/extractors/solana/examples/solana_compare.rs index 2cbb9c2ad..f2858ba6f 100644 --- a/crates/extractors/solana/examples/solana_compare.rs +++ b/crates/extractors/solana/examples/solana_compare.rs @@ -1,6 +1,8 @@ //! The program streams blocks from OF1 for a given epoch, fetches the same blocks via JSON-RPC, //! and compares the results at the [solana_datasets::tables::NonEmptySlot] level. +#[cfg(debug_assertions)] +use std::{collections::HashSet, sync::Mutex}; use std::{path::PathBuf, sync::Arc, time::Duration}; use anyhow::Context; @@ -52,29 +54,20 @@ async fn main() -> anyhow::Result<()> { tracing::info!( epoch = %cli.epoch, start_slot, end_slot, "running OF1 vs RPC comparison"); - let (car_manager_tx, car_manager_rx) = tokio::sync::mpsc::channel(128); - - let car_manager_jh = tokio::task::spawn(of1_client::car_file_manager( - car_manager_rx, - provider_cfg.of1_car_directory.clone(), - provider_cfg.keep_of1_car_files, - cli.provider_name.clone(), - provider_cfg.network.clone(), - None, - )); - - let rpc_connection_info = rpc_client::RpcProviderConnectionInfo { - url: provider_cfg.rpc_provider_info.url, - auth: None, + let reqwest = Arc::new(reqwest::Client::new()); + let rpc_client = { + let rpc_connection_info = rpc_client::RpcProviderConnectionInfo { + url: provider_cfg.rpc_provider_info.url, + auth: None, + }; + Arc::new(rpc_client::SolanaRpcClient::new( + rpc_connection_info, + provider_cfg.max_rpc_calls_per_second, + cli.provider_name, + provider_cfg.network.clone(), + )) }; - let rpc_client = Arc::new(rpc_client::SolanaRpcClient::new( - rpc_connection_info, - provider_cfg.max_rpc_calls_per_second, - cli.provider_name, - provider_cfg.network.clone(), - )); - let get_block_config = rpc_client::rpc_config::RpcBlockConfig { encoding: Some(rpc_client::rpc_config::UiTransactionEncoding::Json), transaction_details: Some(rpc_client::rpc_config::TransactionDetails::Full), @@ -86,11 +79,15 @@ async fn main() -> anyhow::Result<()> { let of1_stream = of1_client::stream( start_slot, end_slot, - provider_cfg.of1_car_directory, - car_manager_tx.clone(), + reqwest, rpc_client.clone(), get_block_config, + // Metrics, we don't need to record them. None, + // In-progress epochs, won't affect the example since it only matters + // when there are multiple concurrent epochs being processed. + #[cfg(debug_assertions)] + Arc::new(Mutex::new(HashSet::new())), ); let mut expected_slot_num = start_slot; @@ -191,11 +188,6 @@ async fn main() -> anyhow::Result<()> { } } - drop(car_manager_tx); - if let Err(e) = car_manager_jh.await { - tracing::error!(error = %e, "car file manager task failed"); - } - tracing::info!("comparison complete"); Ok(()) diff --git a/crates/extractors/solana/src/client.rs b/crates/extractors/solana/src/client.rs index 853784f57..c08d8922c 100644 --- a/crates/extractors/solana/src/client.rs +++ b/crates/extractors/solana/src/client.rs @@ -6,11 +6,12 @@ //! //! Learn more about the Old Faithful archive here: . +#[cfg(debug_assertions)] +use std::{collections::HashSet, sync::Mutex}; use std::{ num::{NonZeroU32, NonZeroU64}, - path::PathBuf, str::FromStr, - sync::{Arc, Mutex}, + sync::Arc, }; use amp_providers_common::{network_id::NetworkId, provider_name::ProviderName}; @@ -18,9 +19,7 @@ use amp_providers_solana::config::UseArchive; use anyhow::Context; use datasets_common::block_num::BlockNum; use datasets_raw::{ - client::{ - BlockStreamError, BlockStreamResultExt, BlockStreamer, CleanupError, LatestBlockError, - }, + client::{BlockStreamError, BlockStreamResultExt, BlockStreamer, LatestBlockError}, rows::Rows, }; use futures::{Stream, StreamExt, TryStreamExt}; @@ -33,23 +32,22 @@ use crate::{metrics, of1_client, rpc_client, tables}; /// have been truncated. pub const TRUNCATED_LOG_MESSAGES_MARKER: &str = "Log truncated"; -/// Handles related to the OF1 CAR manager task, stored in the client for cleanup. -struct Of1CarManagerHandles { - tx: tokio::sync::mpsc::Sender, - jh: tokio::task::JoinHandle<()>, -} - /// A JSON-RPC based Solana client that implements the [`BlockStreamer`] trait. +#[derive(Clone)] pub struct Client { - of1_car_manager_handles: Arc>>, + reqwest: Arc, main_rpc_client: Arc, fallback_rpc_client: Option>, metrics: Option>, network: NetworkId, provider_name: ProviderName, - of1_car_directory: PathBuf, use_archive: UseArchive, commitment: CommitmentConfig, + /// Monitor which epochs are currently being streamed to prevent overlapping + /// streams. This is only used in debug builds as an additional safety check + /// with respect to the partitioning logic. + #[cfg(debug_assertions)] + epochs_in_progress: Arc>>, } impl Client { @@ -60,16 +58,15 @@ impl Client { max_rpc_calls_per_second: Option, network: NetworkId, provider_name: ProviderName, - of1_car_directory: PathBuf, - keep_of1_car_files: bool, use_archive: UseArchive, - meter: Option<&monitoring::telemetry::metrics::Meter>, commitment: CommitmentConfig, + meter: Option<&monitoring::telemetry::metrics::Meter>, ) -> Self { assert_eq!(network, "mainnet", "only mainnet is supported"); let metrics = meter.map(metrics::MetricsRegistry::new).map(Arc::new); + let reqwest = reqwest::Client::new(); let main_rpc_client = Arc::new(rpc_client::SolanaRpcClient::new( main_rpc_connection_info, max_rpc_calls_per_second, @@ -87,30 +84,17 @@ impl Client { )) }); - let (of1_car_manager_tx, of1_car_manager_rx) = tokio::sync::mpsc::channel(128); - let of1_car_manager_jh = tokio::task::spawn(of1_client::car_file_manager( - of1_car_manager_rx, - of1_car_directory.clone(), - keep_of1_car_files, - provider_name.to_string(), - network.clone(), - metrics.clone(), - )); - let handles = Of1CarManagerHandles { - tx: of1_car_manager_tx, - jh: of1_car_manager_jh, - }; - Self { - of1_car_manager_handles: Arc::new(Mutex::new(Some(handles))), + reqwest: Arc::new(reqwest), main_rpc_client, fallback_rpc_client, metrics, network, provider_name, - of1_car_directory, use_archive, commitment, + #[cfg(debug_assertions)] + epochs_in_progress: Arc::new(Mutex::new(HashSet::new())), } } @@ -231,27 +215,6 @@ impl Client { } } -impl Clone for Client { - fn clone(&self) -> Self { - assert!( - self.of1_car_manager_handles.lock().unwrap().is_some(), - "cannot clone Client after cleanup" - ); - - Self { - of1_car_manager_handles: self.of1_car_manager_handles.clone(), - main_rpc_client: self.main_rpc_client.clone(), - fallback_rpc_client: self.fallback_rpc_client.clone(), - metrics: self.metrics.clone(), - network: self.network.clone(), - provider_name: self.provider_name.clone(), - of1_car_directory: self.of1_car_directory.clone(), - use_archive: self.use_archive, - commitment: self.commitment, - } - } -} - impl BlockStreamer for Client { async fn block_stream( self, @@ -316,27 +279,27 @@ impl BlockStreamer for Client { } }; - let of1_car_manager_tx = { - let guard = self.of1_car_manager_handles.lock().unwrap(); - guard - .as_ref() - .expect("new block streams should not start after cleanup") - .tx - .clone() - }; - let historical_block_stream = if use_rpc_only { // Return empty stream to skip Old Faithful entirely futures::stream::empty().boxed() } else { + let metrics = self + .metrics + .clone() + .map(|registry| of1_client::MetricsContext { + registry, + provider: self.provider_name.clone(), + network: self.network.clone(), + }); of1_client::stream( start, end, - self.of1_car_directory.clone(), - of1_car_manager_tx, + self.reqwest.clone(), self.main_rpc_client.clone(), get_block_config, - self.metrics.clone(), + metrics, + #[cfg(debug_assertions)] + self.epochs_in_progress.clone(), ) .map_err(Into::into) .boxed() @@ -369,30 +332,6 @@ impl BlockStreamer for Client { NonZeroU64::new(solana_clock::DEFAULT_SLOTS_PER_EPOCH) } - async fn wait_for_cleanup(self) -> Result<(), CleanupError> { - let Self { - of1_car_manager_handles, - .. - } = self; - - let Of1CarManagerHandles { tx, jh } = { - let mut guard = of1_car_manager_handles.lock().unwrap(); - if let Some(handles) = guard.take() { - handles - } else { - // Cleanup already done. - return Ok(()); - } - }; - - // Drop the extra sender so that the CAR manager task can exit -- assuming all block - // streams, which hold clones of the sender, complete before cleanup. - drop(tx); - let _ = jh.await; - - Ok(()) - } - fn provider_name(&self) -> &str { &self.provider_name } @@ -565,21 +504,24 @@ async fn fill_truncated_logs( .collect(); for (tx_id, truncated_log) in truncated_logs { - let id = solana_transaction::Signature::from_str(tx_id) - .with_context(|| format!("parsing transaction signature {tx_id}"))?; - let fallback_log_messages = fallback_rpc_client - .get_transaction(id, get_transaction_config, metrics.clone()) - .await - .with_context(|| { - format!( - "fetching transaction details for tx {id} in slot {}", - slot.slot - ) - })? - .transaction - .meta - .and_then(|meta| -> Option> { meta.log_messages.into() }) - .unwrap_or_default(); + let fallback_log_messages = { + let tx_id = solana_transaction::Signature::from_str(tx_id) + .with_context(|| format!("parsing transaction signature {tx_id}"))?; + + fallback_rpc_client + .get_transaction(tx_id, get_transaction_config, metrics.clone()) + .await + .with_context(|| { + format!( + "fetching transaction details for tx {tx_id} in slot {}", + slot.slot + ) + })? + .transaction + .meta + .and_then(|meta| -> Option> { meta.log_messages.into() }) + .unwrap_or_default() + }; if fallback_log_messages .iter() @@ -628,8 +570,6 @@ fn bs58_decode_blockhash(blockhash_str: &str) -> anyhow::Result<[u8; 32]> { #[cfg(test)] mod tests { - use std::path::PathBuf; - use amp_providers_common::redacted::Redacted; use amp_providers_solana::config::UseArchive; use futures::StreamExt; @@ -654,15 +594,13 @@ mod tests { let client = Client::new( rpc_connection_info, - None, - None, + None, // Fallback RPC + None, // RPC rate limit network, provider_name, - PathBuf::new(), - false, UseArchive::Auto, - None, CommitmentConfig::finalized(), + None, // Meter ); let start = 0; diff --git a/crates/extractors/solana/src/error.rs b/crates/extractors/solana/src/error.rs index f11a006f4..9edfb5be1 100644 --- a/crates/extractors/solana/src/error.rs +++ b/crates/extractors/solana/src/error.rs @@ -1,7 +1,8 @@ use amp_providers_common::network_id::NetworkId; use datasets_raw::{client::BlockStreamError, rows::TableRowError}; -pub use yellowstone_faithful_car_parser as car_parser; -use yellowstone_faithful_car_parser::node::{NodeError, ReassableError}; +use yellowstone_faithful_car_parser as car_parser; + +use crate::of1_client; /// Errors that occur when converting Solana block data to table rows. /// @@ -57,29 +58,13 @@ pub enum Of1StreamError { #[error("RPC client error")] RpcClient(#[source] solana_client::client_error::ClientError), - /// The CAR manager communication channel was closed unexpectedly. - /// - /// The CAR manager runs as a separate task handling file downloads. This error - /// occurs when the channel for communicating with the manager is closed before - /// a response is received, indicating the manager task has terminated. - #[error("CAR manager channel closed")] - ChannelClosed(#[source] tokio::sync::oneshot::error::RecvError), - - /// Failed to open a CAR file from disk. - /// - /// This occurs when the downloaded CAR file cannot be opened for reading, - /// due to permission issues, file corruption, or the file being deleted - /// between download and read. - #[error("failed to open CAR file")] - FileOpen(#[source] std::io::Error), - - /// Failed to memory-map a CAR file. + /// Failed to stream a CAR file through the OF1 client. /// - /// CAR files are memory-mapped for efficient reading. This error occurs when - /// the memory-mapping operation fails, typically due to insufficient virtual - /// memory or file access issues. - #[error("failed to memory-map CAR file")] - Mmap(#[source] std::io::Error), + /// This occurs when the OF1 client encounters issues while reading or streaming + /// CAR files, which may include HTTP errors, unsupported range requests, or + /// other file access problems. + #[error("failed to stream CAR file")] + FileStream(#[source] of1_client::CarReaderError), /// Encountered an unexpected node type while reading a block from CAR. /// @@ -148,7 +133,7 @@ pub enum Of1StreamError { /// This occurs during low-level parsing of CAR node structures, indicating /// malformed or corrupted node data that cannot be interpreted. #[error("CAR node parsing error")] - NodeParse(#[source] NodeError), + NodeParse(#[source] car_parser::node::NodeError), /// Failed to reassemble a dataframe from CAR nodes. /// @@ -156,18 +141,21 @@ pub enum Of1StreamError { /// This error occurs when the reassembly of these fragmented structures fails, /// typically due to missing or corrupted fragment nodes. #[error("CAR dataframe reassembly error")] - DataframeReassembly(#[source] ReassableError), + DataframeReassembly(#[source] car_parser::node::ReassableError), } impl From for BlockStreamError { fn from(value: Of1StreamError) -> Self { match value { - Of1StreamError::RpcClient(_) => BlockStreamError::Recoverable(value.into()), + Of1StreamError::RpcClient(_) + | Of1StreamError::FileStream(of1_client::CarReaderError::Http(_)) + | Of1StreamError::FileStream(of1_client::CarReaderError::Reqwest(_)) => { + BlockStreamError::Recoverable(value.into()) + } // This is intentionally not a catch-all, to force consideration of // each error type when mapping to recoverable vs fatal. - Of1StreamError::ChannelClosed(_) - | Of1StreamError::FileOpen(_) - | Of1StreamError::Mmap(_) + Of1StreamError::FileStream(of1_client::CarReaderError::Io(_)) + | Of1StreamError::FileStream(of1_client::CarReaderError::RangeRequestUnsupported) | Of1StreamError::UnexpectedNode { .. } | Of1StreamError::MissingNode { .. } | Of1StreamError::RewardSlotMismatch { .. } diff --git a/crates/extractors/solana/src/lib.rs b/crates/extractors/solana/src/lib.rs index 42cd6a330..21fdc2977 100644 --- a/crates/extractors/solana/src/lib.rs +++ b/crates/extractors/solana/src/lib.rs @@ -140,11 +140,9 @@ pub fn client( config.max_rpc_calls_per_second, config.network, name, - config.of1_car_directory, - config.keep_of1_car_files, config.use_archive, - meter, commitment, + meter, ); Ok(client) diff --git a/crates/extractors/solana/src/of1_client.rs b/crates/extractors/solana/src/of1_client.rs index 44c67825e..b26c72c45 100644 --- a/crates/extractors/solana/src/of1_client.rs +++ b/crates/extractors/solana/src/of1_client.rs @@ -1,44 +1,14 @@ -use std::{ - collections::HashMap, - path::{Path, PathBuf}, - sync::{Arc, Mutex}, - time::{Duration, Instant}, -}; - -use amp_providers_common::network_id::NetworkId; -use backon::{ExponentialBuilder, Retryable}; -use futures::{Stream, StreamExt}; -use solana_clock::{Epoch, Slot}; -use tokio::io::AsyncWriteExt; -pub use yellowstone_faithful_car_parser as car_parser; +#[cfg(debug_assertions)] +use std::{collections::HashSet, sync::Mutex}; +use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; -use crate::{error::Of1StreamError, metrics, rpc_client}; - -const OLD_FAITHFUL_ARCHIVE_URL: &str = "https://files.old-faithful.net"; - -const CAR_DOWNLOAD_PROGRESS_REPORT_INTERVAL: Duration = Duration::from_secs(10); -const BYTES_DOWNLOADED_METRICS_REPORT_THRESHOLD: usize = 10 * 1024 * 1024; // 10 MiB +use amp_providers_common::{network_id::NetworkId, provider_name::ProviderName}; +use futures::{FutureExt, Stream, StreamExt}; +use yellowstone_faithful_car_parser as car_parser; -/// Maps epoch to a list of oneshot senders to notify when the download is complete. -type PendingMessageMap = HashMap>>; +use crate::{error::Of1StreamError, metrics, rpc_client}; -/// Maps epoch to the number of interested block streams. Each block stream will request -/// a file it wants to work with (CarManagerMessage::DownloadFile) and notify this task -/// when done (CarManagerMessage::FileProcessed). When the interest count reaches zero, -/// the file can be deleted. -/// -/// NOTE: The interest data type should match max dump parallelism data type. -type FileInterestMap = HashMap; - -pub enum CarManagerMessage { - /// Request to download the CAR file for the given epoch. The oneshot sender will be - /// notified when the download is complete. The boolean indicates whether the file - /// was successfully downloaded (true) or if no file is available (false). - DownloadFile((Epoch, tokio::sync::oneshot::Sender)), - /// Notification that the CAR file for the given epoch has been processed and can - /// be deleted if no other streams are interested in it. - FileProcessed(Epoch), -} +const BYTES_DOWNLOADED_METRICS_REPORT_THRESHOLD: u64 = 10 * 1024 * 1024; // 10 MiB pub type DecodedTransactionStatusMeta = DecodedField< solana_storage_proto::confirmed_block::TransactionStatusMeta, @@ -57,8 +27,8 @@ pub enum DecodedField { #[derive(Default)] pub struct DecodedSlot { - pub slot: Slot, - pub parent_slot: Slot, + pub slot: solana_clock::Slot, + pub parent_slot: solana_clock::Slot, pub blockhash: [u8; 32], pub prev_blockhash: [u8; 32], pub block_height: Option, @@ -76,7 +46,7 @@ impl DecodedSlot { /// NOTE: The reason this is marked as `pub` is because it is used in integration tests /// in the `tests` crate. #[doc(hidden)] - pub fn dummy(slot: Slot) -> Self { + pub fn dummy(slot: solana_clock::Slot) -> Self { Self { slot, parent_slot: slot.saturating_sub(1), @@ -85,172 +55,27 @@ impl DecodedSlot { } } -pub async fn car_file_manager( - mut car_manager_rx: tokio::sync::mpsc::Receiver, - car_directory: PathBuf, - keep_car_files: bool, - provider: String, - network: NetworkId, - metrics: Option>, -) { - let mut downloaders = futures::stream::FuturesUnordered::new(); - let reqwest = Arc::new(reqwest::Client::new()); - let pending_msgs = Arc::new(Mutex::new(PendingMessageMap::new())); - let file_interests = Arc::new(Mutex::new(FileInterestMap::new())); - - if let Err(e) = fs_err::create_dir_all(&car_directory) { - tracing::error!( - path = %car_directory.display(), - error = %e, - "failed to create CAR file directory, shutting down CAR file manager" - ); - return; - } - - loop { - tokio::select! { - msg = car_manager_rx.recv() => { - let Some(msg) = msg else { - tracing::debug!("CAR file manager channel closed, shutting down"); - return; - }; - match msg { - CarManagerMessage::DownloadFile((epoch, done_tx)) => { - tracing::debug!(%epoch, "received CAR file download message"); - file_interests - .lock() - .unwrap() - .entry(epoch) - .and_modify(|count| *count += 1) - .or_insert(1); - - { - let mut guard = pending_msgs.lock().unwrap(); - let entry = guard.entry(epoch).or_default(); - entry.push(done_tx); - - if entry.len() > 1 { - // A download is already in progress for this epoch. - continue; - } - } - - let reqwest = Arc::clone(&reqwest); - let pending_msgs = Arc::clone(&pending_msgs); - let car_directory = car_directory.clone(); - let provider = provider.clone(); - let network = network.clone(); - let metrics = metrics.clone(); - - downloaders.push(tokio::spawn(async move { - let dest = car_directory.join(local_car_filename(epoch)); - - let result = (|| async { - ensure_car_file_exists( - epoch, - &reqwest, - &dest, - &provider, - &network, - metrics.clone() - ).await - }) - .retry(ExponentialBuilder::default().without_max_times()) - .sleep(tokio::time::sleep) - // Only retry on errors that are not HTTP 404, since that indicates - // that the CAR file for the gives epoch does not exist. - .when(|e| !matches!(e, FileDownloadError::Http(code) if *code == 404)) - .notify(|error: &FileDownloadError, delay: Duration| { - if let Some(m) = metrics.as_ref() { - m.record_of1_car_download_error(epoch, &provider, &network); - } - tracing::debug!( - %epoch, - %error, - "CAR file download failed, retrying in {delay:?}" - ); - }).await; - - match result { - Ok(_) => { - // CAR file is ready for use. - tracing::debug!(%epoch, "CAR file is ready"); - let mut guard = pending_msgs.lock().unwrap(); - let pending = guard.remove(&epoch).expect("epoch previously inserted"); - for tx in pending { - tx.send(true).ok(); - } - }, - Err(FileDownloadError::Http(404)) => { - // No more CAR files available. - tracing::debug!(%epoch, "no CAR file available (404)"); - let mut guard = pending_msgs.lock().unwrap(); - let pending = guard.remove(&epoch).expect("epoch previously inserted"); - for tx in pending { - tx.send(false).ok(); - } - }, - _ => { - unreachable!("all other errors should be retried indefinitely"); - }, - } - })); - } - CarManagerMessage::FileProcessed(epoch) => { - tracing::debug!(%epoch, "received CAR file processed message"); - let should_delete = { - let mut guard = file_interests.lock().unwrap(); - let count = guard.get_mut(&epoch).expect("epoch previously inserted"); - *count -= 1; - - if *count == 0 { - guard.remove(&epoch); - !keep_car_files - } else { - false - } - }; - - if should_delete { - // No more interested streams, delete the file. - let dest = car_directory.join(local_car_filename(epoch)); - match tokio::fs::remove_file(&dest).await { - Ok(_) => { - tracing::debug!(%epoch, "deleted processed CAR file"); - } - // `ErrorKind::NotFound` is expected to occur for epochs that did - // not have a CAR file to begin with, since we still track interest - // in them. - Err(error) if error.kind() != std::io::ErrorKind::NotFound => { - tracing::debug!(%epoch, %error, "failed to delete CAR file"); - } - _ => {} - } - } - } - } - } - // Drive the download tasks to completion. - Some(_) = downloaders.next() => {} - } - } +/// Context for OF1 streaming that can be passed to functions that need to report metrics. +#[derive(Debug, Clone)] +pub struct MetricsContext { + pub provider: ProviderName, + pub network: NetworkId, + pub registry: Arc, } +/// Create a stream of decoded slots for the given epoch by reading from the +/// corresponding CAR file downloaded from the Old Faithful archive. #[allow(clippy::too_many_arguments)] pub fn stream( start: solana_clock::Slot, end: solana_clock::Slot, - car_directory: PathBuf, - // The receiver part should never be dropped as the manager task ends only after all - // streams are done. - car_manager_tx: tokio::sync::mpsc::Sender, + reqwest: Arc, solana_rpc_client: Arc, get_block_config: rpc_client::rpc_config::RpcBlockConfig, - metrics: Option>, + metrics: Option, + #[cfg(debug_assertions)] epochs_in_progress: Arc>>, ) -> impl Stream> { async_stream::stream! { - let mut epoch = start / solana_clock::DEFAULT_SLOTS_PER_EPOCH; - // Pre-fetch the initial previous block hash via JSON-RPC so that we don't have to // (potentially) read multiple CAR files to find it. let mut prev_blockhash = if start == 0 { @@ -263,8 +88,9 @@ pub fn stream( } else { let mut slot = start; loop { + let metrics = metrics.clone().map(|m| m.registry); let resp = solana_rpc_client - .get_block(slot, get_block_config, metrics.clone()) + .get_block(slot, get_block_config, metrics) .await; match resp { @@ -274,7 +100,7 @@ pub fn stream( .map(TryInto::try_into) .expect("invalid base-58 string") .expect("blockhash is 32 bytes"); - }, + } Err(e) if rpc_client::is_block_missing_err(&e) => slot += 1, Err(e) => { yield Err(Of1StreamError::RpcClient(e)); @@ -284,67 +110,59 @@ pub fn stream( } }; - // Download historical data via CAR files. - loop { - tracing::debug!(epoch, "processing historical epoch"); - let (done_tx, done_rx) = tokio::sync::oneshot::channel(); - let msg = CarManagerMessage::DownloadFile((epoch, done_tx)); - car_manager_tx.send(msg).await.expect("receiver not dropped"); - match done_rx.await { - Ok(downloaded) if !downloaded => { - // No more CAR files available. - car_manager_tx - .send(CarManagerMessage::FileProcessed(epoch)) - .await - .expect("receiver not dropped"); - return; - }, - Err(e) => { - car_manager_tx - .send(CarManagerMessage::FileProcessed(epoch)) - .await - .expect("receiver not dropped"); - yield Err(Of1StreamError::ChannelClosed(e)); - return; - } - _ => {} - } + let start_epoch = start / solana_clock::DEFAULT_SLOTS_PER_EPOCH; + let end_epoch = end / solana_clock::DEFAULT_SLOTS_PER_EPOCH; - let dest = car_directory.join(local_car_filename(epoch)); - let file = match fs_err::File::open(dest) { - Ok(f) => f, - Err(e) => { - car_manager_tx - .send(CarManagerMessage::FileProcessed(epoch)) - .await - .expect("receiver not dropped"); - yield Err(Of1StreamError::FileOpen(e)); - return; - } - }; - // SAFETY: The file is not modified/deleted while the mmap is in use. - let mmap = match unsafe { memmap2::Mmap::map(&file) } { - Ok(mmap) => mmap, - Err(e) => { - car_manager_tx - .send(CarManagerMessage::FileProcessed(epoch)) - .await - .expect("receiver not dropped"); - yield Err(Of1StreamError::Mmap(e)); - return; - } - }; + for epoch in start_epoch..=end_epoch { + #[cfg(debug_assertions)] + let _guard = epoch_supervision::Guard::new(epochs_in_progress.as_ref(), epoch); - let mut node_reader = car_parser::node::NodeReader::new(&mmap[..]); + let reader = CarReader::new( + epoch, + reqwest.clone(), + metrics.clone() + ); + let mut node_reader = car_parser::node::NodeReader::new(reader); - while let Some(slot) = read_next_slot(&mut node_reader, prev_blockhash).await.transpose() { + while let Some(slot) = read_next_slot(&mut node_reader, prev_blockhash) + .await + .transpose() + { let slot = match slot { Ok(slot) => slot, + // IO errors from the node reader could come from the underlying `CarReader`. + // Try to downcast to `CarReaderError` to determine how to map into `Of1StreamError`. + // + // NOTE: There should be no retry logic here because the `CarReader` should + // handle all retry logic internally and only return an error when a non-recoverable + // error occurs. + Err(Of1StreamError::NodeParse(car_parser::node::NodeError::Io(io_err))) + if io_err.kind() == std::io::ErrorKind::Other => + { + let downcast = io_err.get_ref().and_then(|e| e.downcast_ref::()); + let car_reader_err = match downcast { + None => { + CarReaderError::Io(io_err) + } + Some(CarReaderError::RangeRequestUnsupported) => { + CarReaderError::RangeRequestUnsupported + } + // No more CAR files available, end the stream. + Some(CarReaderError::Http(404)) => { + return; + } + Some(e) => { + let error = e.to_string(); + unreachable!( + "unexpected CAR reader error (should be retried indefinetly); {error}" + ); + } + }; + + yield Err(Of1StreamError::FileStream(car_reader_err)); + return; + } Err(e) => { - car_manager_tx - .send(CarManagerMessage::FileProcessed(epoch)) - .await - .expect("receiver not dropped"); yield Err(e); return; } @@ -359,222 +177,49 @@ pub fn stream( match slot.slot.cmp(&end) { std::cmp::Ordering::Less => { yield Ok(slot); - }, + } std::cmp::Ordering::Equal => { - car_manager_tx - .send(CarManagerMessage::FileProcessed(epoch)) - .await - .expect("receiver not dropped"); yield Ok(slot); return; - }, + } std::cmp::Ordering::Greater => { - car_manager_tx - .send(CarManagerMessage::FileProcessed(epoch)) - .await - .expect("receiver not dropped"); return; - }, + } } } - - car_manager_tx - .send(CarManagerMessage::FileProcessed(epoch)) - .await - .expect("receiver not dropped"); - - epoch += 1; - } - } -} - -/// Ensures that the entire CAR file for the given epoch exists at the specified destination path. -/// -/// If the file was partially downloaded before, the download will resume from where it left off. -async fn ensure_car_file_exists( - epoch: solana_clock::Epoch, - reqwest: &reqwest::Client, - dest: &Path, - provider: &str, - network: &NetworkId, - metrics: Option>, -) -> Result<(), FileDownloadError> { - enum DownloadAction { - Download, - Resume(u64), - Restart, - Skip, - } - - let download_url = car_download_url(epoch); - - // Get the actual file size from the server to determine if we need to resume, as well - // as for download progress reports. - let remote_file_size = { - let head_response = reqwest.head(&download_url).send().await?; - if head_response.status() != reqwest::StatusCode::OK { - return Err(FileDownloadError::Http(head_response.status().as_u16())); - } - let Some(content_length) = head_response.headers().get(reqwest::header::CONTENT_LENGTH) - else { - return Err(FileDownloadError::MissingContentLengthHeader); - }; - content_length - .to_str() - .map_err(|_| FileDownloadError::ContentLengthParsing)? - .parse() - .map_err(|_| FileDownloadError::ContentLengthParsing)? - }; - - let action = match fs_err::metadata(dest).map(|meta| meta.len()) { - Ok(0) => DownloadAction::Download, - Ok(local_file_size) => { - match local_file_size.cmp(&remote_file_size) { - // Local file is partially downloaded, need to resume. - std::cmp::Ordering::Less => DownloadAction::Resume(local_file_size), - // Local file is larger than remote file, need to restart download. - std::cmp::Ordering::Greater => DownloadAction::Restart, - // File already fully downloaded. - std::cmp::Ordering::Equal => DownloadAction::Skip, - } - } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => DownloadAction::Download, - Err(e) => return Err(FileDownloadError::Io(e)), - }; - - // Set up HTTP headers for range requests if the file already exists. - let mut headers = reqwest::header::HeaderMap::new(); - - match action { - DownloadAction::Download => { - tracing::debug!(%download_url, "downloading CAR file"); - } - DownloadAction::Resume(download_offset) => { - tracing::debug!( - %download_url, - %download_offset, - "resuming CAR file download" - ); - let range_header = format!("bytes={download_offset}-"); - let range_header_value = - reqwest::header::HeaderValue::from_str(&range_header).expect("valid range header"); - headers.insert(reqwest::header::RANGE, range_header_value); - } - DownloadAction::Restart => { - tracing::debug!( - %download_url, - "local CAR file is larger than remote file, restarting download" - ); - tokio::fs::remove_file(&dest).await?; - } - DownloadAction::Skip => { - tracing::debug!( - %download_url, - "local CAR file already fully downloaded, skipping download" - ); - return Ok(()); - } - } - - let mut file = tokio::fs::File::options() - .create(true) // Create the file if it doesn't exist. - .append(true) // Append to the file to support resuming. - .open(&dest) - .await?; - - let download_start = Instant::now(); - - let download_response = reqwest.get(download_url).headers(headers).send().await?; - let status = download_response.status(); - if !status.is_success() { - return Err(FileDownloadError::Http(status.as_u16())); - } - - let mut bytes_downloaded: u64 = if let DownloadAction::Resume(offset) = action { - // Expecting a 206 Partial Content response when resuming. - if status != reqwest::StatusCode::PARTIAL_CONTENT { - return Err(FileDownloadError::PartialDownloadNotSupported); - } - offset - } else { - 0 - }; - - log_download_progress(epoch, bytes_downloaded, remote_file_size); - let mut last_progress_report = download_start; - - // Stream the file content since these files can be extremely large. - let mut stream = download_response.bytes_stream(); - // Recording metrics for each stream output would be inefficient, - // so we report once for every 10 MiB downloaded. - let mut current_chunk = 0; - while let Some(chunk) = stream.next().await { - let chunk = chunk?; - file.write_all(&chunk).await?; - - // Report progress to the user. - bytes_downloaded += chunk.len() as u64; - if last_progress_report.elapsed() >= CAR_DOWNLOAD_PROGRESS_REPORT_INTERVAL { - log_download_progress(epoch, bytes_downloaded, remote_file_size); - last_progress_report = Instant::now(); - } - - // Record metrics. - current_chunk += chunk.len(); - match metrics.as_ref() { - Some(m) if current_chunk >= BYTES_DOWNLOADED_METRICS_REPORT_THRESHOLD => { - m.record_of1_car_download_bytes(current_chunk as u64, epoch, provider, network); - current_chunk = 0; - } - _ => {} } } - - if current_chunk > 0 { - // Record any remaining bytes that were not reported in the loop. - if let Some(m) = metrics.as_ref() { - m.record_of1_car_download_bytes(current_chunk as u64, epoch, provider, network); - } - } - - let download_duration = download_start.elapsed().as_secs_f64(); - let download_duration_str = format!("{download_duration:.2}"); - tracing::info!(%epoch, %bytes_downloaded, duration_secs = %download_duration_str, "downloaded CAR file"); - if let Some(m) = metrics.as_ref() { - m.record_of1_car_download(download_duration, epoch, provider, network); - } - - Ok(()) -} - -fn log_download_progress(epoch: Epoch, bytes_downloaded: u64, bytes_total: u64) { - let percent_done = { - let p = (bytes_downloaded as f64 / bytes_total as f64) * 100.0; - format!("{p:.2}") - }; - tracing::info!( - %epoch, - %bytes_downloaded, - %bytes_total, - %percent_done, - "downloading CAR file" - ); } +/// Errors that can occur during streaming of Solana blocks from Old Faithful +/// v1 (OF1) CAR files. #[derive(Debug, thiserror::Error)] -enum FileDownloadError { - #[error("I/O error: {0}")] - Io(#[from] std::io::Error), +pub enum CarReaderError { + /// IO error when reading the CAR file. + /// + /// This can occur due to network issues, file corruption, or other problems when + /// accessing the CAR file. + #[error("IO error: {0}")] + Io(#[source] std::io::Error), + /// HTTP error when connecting to or reading from the CAR file. + /// + /// This can occur when the CAR file for a given epoch is not found (404) or if + /// there are other HTTP errors (e.g., 500) when trying to access the CAR file. #[error("HTTP error with status code: {0}")] Http(u16), + /// Reqwest error when connecting to or reading from the CAR file. + /// + /// This can occur due to network issues, timeouts, or other problems when making + /// HTTP requests to access the CAR file. #[error("Reqwest error: {0}")] - Reqwest(#[from] reqwest::Error), - #[error("missing Content-Length header in HTTP response")] - MissingContentLengthHeader, - #[error("error parsing Content-Length header")] - ContentLengthParsing, - #[error("partial downloads are not supported by the server")] - PartialDownloadNotSupported, + Reqwest(#[source] reqwest::Error), + /// The server does not support HTTP range requests. + /// + /// This is a non-recoverable error because the [`CarReader`] relies on range + /// requests to be able to resume interrupted downloads without re-downloading + /// the entire CAR. + #[error("server does not support range requests")] + RangeRequestUnsupported, } /// Read an entire block worth of nodes from the given node reader and decode them into @@ -781,13 +426,315 @@ async fn read_next_slot( Ok(Some(block)) } +type ConnectFuture = Pin> + Send>>; +type ByteStream = Pin> + Send>>; +type BackoffFuture = Pin>; + +struct ByteStreamMonitor { + epoch: solana_clock::Epoch, + bytes_read_chunk: u64, + started_at: std::time::Instant, + provider: ProviderName, + network: NetworkId, + registry: Arc, +} + +struct MonitoredByteStream { + stream: ByteStream, + monitor: Option, +} + +impl MonitoredByteStream { + fn new( + stream: impl Stream> + Send + 'static, + epoch: solana_clock::Epoch, + metrics: Option, + ) -> Self { + let stream = Box::pin(stream); + let monitor = metrics.map(|metrics| ByteStreamMonitor { + epoch, + bytes_read_chunk: 0, + started_at: std::time::Instant::now(), + provider: metrics.provider, + network: metrics.network, + registry: metrics.registry, + }); + Self { stream, monitor } + } +} + +impl Stream for MonitoredByteStream { + type Item = ::Item; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + let poll = this.stream.poll_next_unpin(cx); + + if let Some(m) = this.monitor.as_mut() { + match &poll { + std::task::Poll::Ready(Some(Ok(bytes))) => { + m.bytes_read_chunk += bytes.len() as u64; + + if m.bytes_read_chunk >= BYTES_DOWNLOADED_METRICS_REPORT_THRESHOLD { + m.registry.record_of1_car_download_bytes( + m.bytes_read_chunk, + m.epoch, + &m.provider, + &m.network, + ); + m.bytes_read_chunk = 0; + } + } + std::task::Poll::Ready(Some(Err(_))) => { + m.registry + .record_of1_car_download_error(m.epoch, &m.provider, &m.network); + } + std::task::Poll::Ready(None) => { + // Record any remaining bytes read that didn't reach the reporting threshold. + if m.bytes_read_chunk > 0 { + m.registry.record_of1_car_download_bytes( + m.bytes_read_chunk, + m.epoch, + &m.provider, + &m.network, + ); + } + let elapsed = m.started_at.elapsed().as_secs_f64(); + m.registry + .record_of1_car_download(elapsed, m.epoch, &m.provider, &m.network); + } + _ => {} + } + } + + poll + } +} + +enum ReaderState { + /// A single in-flight HTTP request to (re)connect. + Connect(ConnectFuture), + /// We have an active byte stream. + Stream(MonitoredByteStream), + /// We are waiting until a backoff deadline before attempting reconnect. + Backoff(BackoffFuture), +} + +struct CarReader { + url: String, + epoch: solana_clock::Epoch, + reqwest: Arc, + state: ReaderState, + overflow: Vec, + bytes_read_total: u64, + + // Backoff control + reconnect_attempt: u32, + max_backoff: Duration, + base_backoff: Duration, + + metrics: Option, +} + +impl CarReader { + fn new( + epoch: solana_clock::Epoch, + reqwest: Arc, + metrics: Option, + ) -> Self { + let url = car_download_url(epoch); + let connect_fut = get_with_range_header(reqwest.clone(), url.clone(), 0); + + Self { + url, + epoch, + reqwest, + state: ReaderState::Connect(Box::pin(connect_fut)), + overflow: Vec::new(), + bytes_read_total: 0, + reconnect_attempt: 0, + base_backoff: Duration::from_millis(100), + max_backoff: Duration::from_secs(30), + metrics, + } + } + + fn schedule_backoff(&mut self, reason: impl std::fmt::Display) { + self.reconnect_attempt = self.reconnect_attempt.saturating_add(1); + let backoff = compute_backoff(self.base_backoff, self.max_backoff, self.reconnect_attempt); + + let backoff_str = format!("{:.1}s", backoff.as_secs_f32()); + tracing::warn!( + epoch = self.epoch, + bytes_read = self.bytes_read_total, + attempt = self.reconnect_attempt, + reason = %reason, + backoff = %backoff_str, + "CAR reader failed; scheduled retry" + ); + + let fut = tokio::time::sleep(backoff); + self.state = ReaderState::Backoff(Box::pin(fut)); + } +} + +impl tokio::io::AsyncRead for CarReader { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + + // Drain overflow first. + if !this.overflow.is_empty() { + let to_copy = this.overflow.len().min(buf.remaining()); + buf.put_slice(&this.overflow[..to_copy]); + this.overflow.drain(..to_copy); + return std::task::Poll::Ready(Ok(())); + } + + // Retry loop, return on successful read, EOF, or non-recoverable error (RangeRequestUnsupported). + loop { + match &mut this.state { + ReaderState::Connect(fut) => match fut.as_mut().poll(cx) { + std::task::Poll::Ready(Ok(resp)) => { + let status = resp.status(); + if !status.is_success() { + this.schedule_backoff(format!("HTTP error: {status}")); + continue; + } + + if this.bytes_read_total > 0 + && status != reqwest::StatusCode::PARTIAL_CONTENT + { + let e = std::io::Error::other(CarReaderError::RangeRequestUnsupported); + return std::task::Poll::Ready(Err(e)); + } + + // Initial connection succeeded, start reading the byte stream. + this.reconnect_attempt = 0; + let stream = MonitoredByteStream::new( + resp.bytes_stream(), + this.epoch, + this.metrics.clone(), + ); + this.state = ReaderState::Stream(stream); + } + std::task::Poll::Ready(Err(e)) => { + this.schedule_backoff(format!("request error: {e}")); + } + std::task::Poll::Pending => return std::task::Poll::Pending, + }, + ReaderState::Stream(stream) => match stream.poll_next_unpin(cx) { + // Reached EOF. + std::task::Poll::Ready(None) => { + return std::task::Poll::Ready(Ok(())); + } + // Read some bytes, account for possible overflow. + std::task::Poll::Ready(Some(Ok(bytes))) => { + let n_read = bytes.len(); + let to_copy = n_read.min(buf.remaining()); + + buf.put_slice(&bytes[..to_copy]); + this.overflow.extend_from_slice(&bytes[to_copy..]); + this.bytes_read_total += n_read as u64; + + return std::task::Poll::Ready(Ok(())); + } + std::task::Poll::Ready(Some(Err(e))) => { + this.schedule_backoff(format!("stream error: {e}")); + } + std::task::Poll::Pending => return std::task::Poll::Pending, + }, + ReaderState::Backoff(fut) => match fut.poll_unpin(cx) { + std::task::Poll::Ready(()) => { + let fut = get_with_range_header( + this.reqwest.clone(), + this.url.clone(), + this.bytes_read_total, + ); + this.state = ReaderState::Connect(Box::pin(fut)); + } + std::task::Poll::Pending => return std::task::Poll::Pending, + }, + } + } + } +} + +async fn get_with_range_header( + reqwest: Arc, + url: String, + offset: u64, +) -> Result { + let mut req = reqwest.get(&url); + if offset > 0 { + req = req.header(reqwest::header::RANGE, format!("bytes={offset}-")); + } + + req.send().await +} + +fn compute_backoff(base: Duration, cap: Duration, attempt: u32) -> Duration { + // attempt=1 => base, attempt=2 => 2*base, attempt=3 => 4*base, ... + let factor = 1u64 << attempt.saturating_sub(1).min(30); + let backoff = base.saturating_mul(factor as u32); + backoff.min(cap) +} + /// Generates the Old Faithful CAR download URL for the given epoch. /// /// Reference: . fn car_download_url(epoch: solana_clock::Epoch) -> String { - format!("{OLD_FAITHFUL_ARCHIVE_URL}/{epoch}/epoch-{epoch}.car") + format!("https://files.old-faithful.net/{epoch}/epoch-{epoch}.car") } -fn local_car_filename(epoch: Epoch) -> String { - format!("epoch-{epoch}.car") +#[cfg(debug_assertions)] +mod epoch_supervision { + use super::{HashSet, Mutex}; + + /// Guard that tracks in-progress epochs to detect overlapping Solana streams in debug builds. + /// + /// # Panics + /// + /// Panics if an attempt is made to [create](Guard::new) a guard for an epoch that is already + /// in progress, or if a guard is dropped for an epoch that is not currently in progress. + pub struct Guard<'a> { + epoch: solana_clock::Epoch, + in_progress_epochs: &'a Mutex>, + } + + impl<'a> Guard<'a> { + pub fn new( + in_progress_epochs: &'a Mutex>, + epoch: solana_clock::Epoch, + ) -> Self { + let mut epochs_in_progress = in_progress_epochs.lock().unwrap(); + let is_new = epochs_in_progress.insert(epoch); + assert!( + is_new, + "epoch {epoch} already in progress, overlapping Solana streams are not allowed" + ); + Self { + epoch, + in_progress_epochs, + } + } + } + + impl<'a> Drop for Guard<'a> { + fn drop(&mut self) { + let mut epochs_in_progress = self.in_progress_epochs.lock().unwrap(); + let removed = epochs_in_progress.remove(&self.epoch); + assert!( + removed, + "epoch {} was not in progress during drop, this should never happen", + self.epoch + ); + } + } } diff --git a/docs/code/extractors.md b/docs/code/extractors.md index 690b6d4c7..f376f4f21 100644 --- a/docs/code/extractors.md +++ b/docs/code/extractors.md @@ -48,8 +48,6 @@ pub trait BlockStreamer: Clone + 'static { fn bucket_size(&self) -> Option; - fn wait_for_cleanup(self) -> impl Future> + Send; - fn provider_name(&self) -> &str; } ``` @@ -61,7 +59,6 @@ pub trait BlockStreamer: Clone + 'static { - `bucket_size()` — returns the bucket size if the streamer fetches blocks in fixed-size buckets, or `None` if blocks are produced individually. This matters for determining how to split block ranges for parallel streaming. -- `wait_for_cleanup()` — releases background tasks, connections, and resources - `provider_name()` — returns the provider instance name **Error types** are all `Box`. diff --git a/docs/providers/solana.spec.json b/docs/providers/solana.spec.json index f888a9259..e519e3a3a 100644 --- a/docs/providers/solana.spec.json +++ b/docs/providers/solana.spec.json @@ -19,11 +19,6 @@ } ] }, - "keep_of1_car_files": { - "description": "Whether to keep downloaded CAR files after processing.", - "type": "boolean", - "default": false - }, "kind": { "description": "The provider kind, must be `\"solana\"`.", "const": "solana" @@ -41,10 +36,6 @@ "description": "The network this provider serves.", "$ref": "#/$defs/NetworkId" }, - "of1_car_directory": { - "description": "Directory for storing Old Faithful ONE CAR files.", - "type": "string" - }, "rpc_provider_info": { "description": "Connection information for the primary Solana RPC endpoint.\n\nUsed for fetching recent blocks and all non-historical data.", "$ref": "#/$defs/RpcProviderConnectionConfig" @@ -57,8 +48,7 @@ "required": [ "kind", "network", - "rpc_provider_info", - "of1_car_directory" + "rpc_provider_info" ], "$defs": { "CommitmentLevel": { diff --git a/tests/config/providers/solana_mainnet.toml b/tests/config/providers/solana_mainnet.toml index fc00eb1c6..9dbdc0d2a 100644 --- a/tests/config/providers/solana_mainnet.toml +++ b/tests/config/providers/solana_mainnet.toml @@ -4,8 +4,6 @@ network = "mainnet" # Optionally, rate limit RPC calls # max_rpc_calls_per_second = 50 -of1_car_directory = "${SOLANA_OF1_CAR_DIRECTORY}" - [rpc_provider_info] url = "${SOLANA_MAINNET_RPC_URL}" # auth_header = "${SOLANA_MAINNET_RPC_AUTH_HEADER}" diff --git a/tests/src/tests/it_solana_historical_to_json_rpc_transition.rs b/tests/src/tests/it_solana_historical_to_json_rpc_transition.rs index 8bf7e0e3d..922771223 100644 --- a/tests/src/tests/it_solana_historical_to_json_rpc_transition.rs +++ b/tests/src/tests/it_solana_historical_to_json_rpc_transition.rs @@ -24,11 +24,9 @@ async fn historical_to_json_rpc_transition() { None, // Rate limit network, provider_name, - std::path::PathBuf::new(), // of1_car_directory - false, // keep_of1_car_files UseArchive::Auto, - None, // Metrics CommitmentConfig::finalized(), + None, // Metrics ); let start = 0; From afb441a29486bb248fce8378d4df22fde7bfe62f Mon Sep 17 00:00:00 2001 From: sistemd Date: Tue, 10 Mar 2026 17:30:24 +0100 Subject: [PATCH 2/5] fix(extractors/solana): surface 404 error from CAR reader --- crates/extractors/solana/src/error.rs | 2 +- crates/extractors/solana/src/of1_client.rs | 29 ++++++++++++++-------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/crates/extractors/solana/src/error.rs b/crates/extractors/solana/src/error.rs index 9edfb5be1..dad8b41b8 100644 --- a/crates/extractors/solana/src/error.rs +++ b/crates/extractors/solana/src/error.rs @@ -148,13 +148,13 @@ impl From for BlockStreamError { fn from(value: Of1StreamError) -> Self { match value { Of1StreamError::RpcClient(_) - | Of1StreamError::FileStream(of1_client::CarReaderError::Http(_)) | Of1StreamError::FileStream(of1_client::CarReaderError::Reqwest(_)) => { BlockStreamError::Recoverable(value.into()) } // This is intentionally not a catch-all, to force consideration of // each error type when mapping to recoverable vs fatal. Of1StreamError::FileStream(of1_client::CarReaderError::Io(_)) + | Of1StreamError::FileStream(of1_client::CarReaderError::FileNotFound) | Of1StreamError::FileStream(of1_client::CarReaderError::RangeRequestUnsupported) | Of1StreamError::UnexpectedNode { .. } | Of1StreamError::MissingNode { .. } diff --git a/crates/extractors/solana/src/of1_client.rs b/crates/extractors/solana/src/of1_client.rs index b26c72c45..1189b5bf3 100644 --- a/crates/extractors/solana/src/of1_client.rs +++ b/crates/extractors/solana/src/of1_client.rs @@ -148,7 +148,7 @@ pub fn stream( CarReaderError::RangeRequestUnsupported } // No more CAR files available, end the stream. - Some(CarReaderError::Http(404)) => { + Some(CarReaderError::FileNotFound) => { return; } Some(e) => { @@ -201,18 +201,18 @@ pub enum CarReaderError { /// accessing the CAR file. #[error("IO error: {0}")] Io(#[source] std::io::Error), - /// HTTP error when connecting to or reading from the CAR file. - /// - /// This can occur when the CAR file for a given epoch is not found (404) or if - /// there are other HTTP errors (e.g., 500) when trying to access the CAR file. - #[error("HTTP error with status code: {0}")] - Http(u16), /// Reqwest error when connecting to or reading from the CAR file. /// /// This can occur due to network issues, timeouts, or other problems when making /// HTTP requests to access the CAR file. #[error("Reqwest error: {0}")] Reqwest(#[source] reqwest::Error), + /// The CAR file for the requested epoch was not found (HTTP 404). + /// + /// This is a non-recoverable error because it indicates that the expected data + /// is not available and retrying will not resolve the issue. + #[error("CAR file not found (HTTP 404)")] + FileNotFound, /// The server does not support HTTP range requests. /// /// This is a non-recoverable error because the [`CarReader`] relies on range @@ -603,11 +603,20 @@ impl tokio::io::AsyncRead for CarReader { ReaderState::Connect(fut) => match fut.as_mut().poll(cx) { std::task::Poll::Ready(Ok(resp)) => { let status = resp.status(); - if !status.is_success() { - this.schedule_backoff(format!("HTTP error: {status}")); - continue; + // Handle error codes. + match status { + reqwest::StatusCode::NOT_FOUND => { + let e = std::io::Error::other(CarReaderError::FileNotFound); + return std::task::Poll::Ready(Err(e)); + } + status if !status.is_success() => { + this.schedule_backoff(format!("HTTP error: {status}")); + continue; + } + _ => {} } + // Handle partial content. if this.bytes_read_total > 0 && status != reqwest::StatusCode::PARTIAL_CONTENT { From 1deaa6dcd28823a9adf895d0d7e77005f61ad0dc Mon Sep 17 00:00:00 2001 From: sistemd Date: Tue, 10 Mar 2026 17:30:24 +0100 Subject: [PATCH 3/5] fix(extractors/solana): correctly handle non recoverable errors --- crates/extractors/solana/src/of1_client.rs | 32 ++++++++-------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/crates/extractors/solana/src/of1_client.rs b/crates/extractors/solana/src/of1_client.rs index 1189b5bf3..f43b30006 100644 --- a/crates/extractors/solana/src/of1_client.rs +++ b/crates/extractors/solana/src/of1_client.rs @@ -136,30 +136,22 @@ pub fn stream( // NOTE: There should be no retry logic here because the `CarReader` should // handle all retry logic internally and only return an error when a non-recoverable // error occurs. - Err(Of1StreamError::NodeParse(car_parser::node::NodeError::Io(io_err))) - if io_err.kind() == std::io::ErrorKind::Other => - { - let downcast = io_err.get_ref().and_then(|e| e.downcast_ref::()); - let car_reader_err = match downcast { - None => { - CarReaderError::Io(io_err) + Err(Of1StreamError::NodeParse(car_parser::node::NodeError::Io(io_err))) => { + match io_err.downcast::() { + // No more CAR files available, not an error. + Ok(CarReaderError::FileNotFound) => {}, + // Non-recoverable error from the `CarReader`. + Ok(car_err) => { + yield Err(Of1StreamError::FileStream(car_err)); } - Some(CarReaderError::RangeRequestUnsupported) => { - CarReaderError::RangeRequestUnsupported - } - // No more CAR files available, end the stream. - Some(CarReaderError::FileNotFound) => { - return; - } - Some(e) => { - let error = e.to_string(); - unreachable!( - "unexpected CAR reader error (should be retried indefinetly); {error}" + // Non-recoverable IO error from the `NodeParser`. + Err(io_err) => { + let original_err = Of1StreamError::NodeParse( + car_parser::node::NodeError::Io(io_err) ); + yield Err(original_err); } }; - - yield Err(Of1StreamError::FileStream(car_reader_err)); return; } Err(e) => { From 060ade216f853d63353dafcabdca57a8bc1413c3 Mon Sep 17 00:00:00 2001 From: sistemd Date: Tue, 10 Mar 2026 17:30:24 +0100 Subject: [PATCH 4/5] refactor(extractors/solana): cleaner CAR reader metrics --- crates/extractors/solana/src/of1_client.rs | 80 ++++++++++++++-------- 1 file changed, 53 insertions(+), 27 deletions(-) diff --git a/crates/extractors/solana/src/of1_client.rs b/crates/extractors/solana/src/of1_client.rs index f43b30006..362c2dc0b 100644 --- a/crates/extractors/solana/src/of1_client.rs +++ b/crates/extractors/solana/src/of1_client.rs @@ -8,8 +8,6 @@ use yellowstone_faithful_car_parser as car_parser; use crate::{error::Of1StreamError, metrics, rpc_client}; -const BYTES_DOWNLOADED_METRICS_REPORT_THRESHOLD: u64 = 10 * 1024 * 1024; // 10 MiB - pub type DecodedTransactionStatusMeta = DecodedField< solana_storage_proto::confirmed_block::TransactionStatusMeta, solana_storage_proto::StoredTransactionStatusMeta, @@ -431,6 +429,56 @@ struct ByteStreamMonitor { registry: Arc, } +impl ByteStreamMonitor { + const BYTES_READ_RECORD_THRESHOLD: u64 = 10 * 1024 * 1024; // 10 MiB + + /// Record the number of bytes read and report to metrics if the reporting threshold is reached. + fn record_bytes_read(&mut self, n: u64) { + self.bytes_read_chunk += n; + + if self.bytes_read_chunk >= Self::BYTES_READ_RECORD_THRESHOLD { + self.registry.record_of1_car_download_bytes( + self.bytes_read_chunk, + self.epoch, + &self.provider, + &self.network, + ); + self.bytes_read_chunk = 0; + } + } + + /// Record any remaining bytes read that didn't reach the reporting threshold. + fn flush_bytes_read(&mut self) { + if self.bytes_read_chunk > 0 { + self.registry.record_of1_car_download_bytes( + self.bytes_read_chunk, + self.epoch, + &self.provider, + &self.network, + ); + self.bytes_read_chunk = 0; + } + } + + fn record_car_download(&mut self) { + let elapsed = self.started_at.elapsed().as_secs_f64(); + self.registry + .record_of1_car_download(elapsed, self.epoch, &self.provider, &self.network); + self.flush_bytes_read(); + } + + fn record_car_download_error(&mut self) { + self.registry + .record_of1_car_download_error(self.epoch, &self.provider, &self.network); + } +} + +impl Drop for ByteStreamMonitor { + fn drop(&mut self) { + self.flush_bytes_read(); + } +} + struct MonitoredByteStream { stream: ByteStream, monitor: Option, @@ -468,35 +516,13 @@ impl Stream for MonitoredByteStream { if let Some(m) = this.monitor.as_mut() { match &poll { std::task::Poll::Ready(Some(Ok(bytes))) => { - m.bytes_read_chunk += bytes.len() as u64; - - if m.bytes_read_chunk >= BYTES_DOWNLOADED_METRICS_REPORT_THRESHOLD { - m.registry.record_of1_car_download_bytes( - m.bytes_read_chunk, - m.epoch, - &m.provider, - &m.network, - ); - m.bytes_read_chunk = 0; - } + m.record_bytes_read(bytes.len() as u64); } std::task::Poll::Ready(Some(Err(_))) => { - m.registry - .record_of1_car_download_error(m.epoch, &m.provider, &m.network); + m.record_car_download_error(); } std::task::Poll::Ready(None) => { - // Record any remaining bytes read that didn't reach the reporting threshold. - if m.bytes_read_chunk > 0 { - m.registry.record_of1_car_download_bytes( - m.bytes_read_chunk, - m.epoch, - &m.provider, - &m.network, - ); - } - let elapsed = m.started_at.elapsed().as_secs_f64(); - m.registry - .record_of1_car_download(elapsed, m.epoch, &m.provider, &m.network); + m.record_car_download(); } _ => {} } From bd0008406b4fc0eb48d04ac59549375cca0146ec Mon Sep 17 00:00:00 2001 From: sistemd Date: Tue, 10 Mar 2026 17:30:24 +0100 Subject: [PATCH 5/5] refactor(extractors/solana): guideline compliant error logging --- crates/extractors/solana/src/error.rs | 18 +++++++++------ crates/extractors/solana/src/of1_client.rs | 26 +++++++++++----------- 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/crates/extractors/solana/src/error.rs b/crates/extractors/solana/src/error.rs index dad8b41b8..7a982e7d5 100644 --- a/crates/extractors/solana/src/error.rs +++ b/crates/extractors/solana/src/error.rs @@ -146,15 +146,13 @@ pub enum Of1StreamError { impl From for BlockStreamError { fn from(value: Of1StreamError) -> Self { + // There is no catch-all here on purpose, to force consideration of + // each error type when mapping to recoverable vs fatal. match value { - Of1StreamError::RpcClient(_) - | Of1StreamError::FileStream(of1_client::CarReaderError::Reqwest(_)) => { - BlockStreamError::Recoverable(value.into()) - } - // This is intentionally not a catch-all, to force consideration of - // each error type when mapping to recoverable vs fatal. Of1StreamError::FileStream(of1_client::CarReaderError::Io(_)) - | Of1StreamError::FileStream(of1_client::CarReaderError::FileNotFound) + | Of1StreamError::FileStream(of1_client::CarReaderError::Http( + reqwest::StatusCode::NOT_FOUND, + )) | Of1StreamError::FileStream(of1_client::CarReaderError::RangeRequestUnsupported) | Of1StreamError::UnexpectedNode { .. } | Of1StreamError::MissingNode { .. } @@ -164,6 +162,12 @@ impl From for BlockStreamError { | Of1StreamError::DecodeField { .. } | Of1StreamError::NodeParse(_) | Of1StreamError::DataframeReassembly(_) => BlockStreamError::Fatal(value.into()), + + Of1StreamError::RpcClient(_) + | Of1StreamError::FileStream(of1_client::CarReaderError::Http(_)) + | Of1StreamError::FileStream(of1_client::CarReaderError::Reqwest(_)) => { + BlockStreamError::Recoverable(value.into()) + } } } } diff --git a/crates/extractors/solana/src/of1_client.rs b/crates/extractors/solana/src/of1_client.rs index 362c2dc0b..65848f9a4 100644 --- a/crates/extractors/solana/src/of1_client.rs +++ b/crates/extractors/solana/src/of1_client.rs @@ -137,7 +137,7 @@ pub fn stream( Err(Of1StreamError::NodeParse(car_parser::node::NodeError::Io(io_err))) => { match io_err.downcast::() { // No more CAR files available, not an error. - Ok(CarReaderError::FileNotFound) => {}, + Ok(CarReaderError::Http(reqwest::StatusCode::NOT_FOUND)) => {}, // Non-recoverable error from the `CarReader`. Ok(car_err) => { yield Err(Of1StreamError::FileStream(car_err)); @@ -197,12 +197,11 @@ pub enum CarReaderError { /// HTTP requests to access the CAR file. #[error("Reqwest error: {0}")] Reqwest(#[source] reqwest::Error), - /// The CAR file for the requested epoch was not found (HTTP 404). + /// The server responded with an HTTP error status code when trying to access the CAR file. /// - /// This is a non-recoverable error because it indicates that the expected data - /// is not available and retrying will not resolve the issue. - #[error("CAR file not found (HTTP 404)")] - FileNotFound, + /// This can occur due to network issues, server problems, or if the CAR file is not available. + #[error("HTTP error: {0}")] + Http(reqwest::StatusCode), /// The server does not support HTTP range requests. /// /// This is a non-recoverable error because the [`CarReader`] relies on range @@ -580,7 +579,7 @@ impl CarReader { } } - fn schedule_backoff(&mut self, reason: impl std::fmt::Display) { + fn schedule_backoff(&mut self, err: CarReaderError) { self.reconnect_attempt = self.reconnect_attempt.saturating_add(1); let backoff = compute_backoff(self.base_backoff, self.max_backoff, self.reconnect_attempt); @@ -589,7 +588,8 @@ impl CarReader { epoch = self.epoch, bytes_read = self.bytes_read_total, attempt = self.reconnect_attempt, - reason = %reason, + error = ?err, + error_source = monitoring::logging::error_source(&err), backoff = %backoff_str, "CAR reader failed; scheduled retry" ); @@ -624,11 +624,11 @@ impl tokio::io::AsyncRead for CarReader { // Handle error codes. match status { reqwest::StatusCode::NOT_FOUND => { - let e = std::io::Error::other(CarReaderError::FileNotFound); - return std::task::Poll::Ready(Err(e)); + let err = std::io::Error::other(CarReaderError::Http(status)); + return std::task::Poll::Ready(Err(err)); } status if !status.is_success() => { - this.schedule_backoff(format!("HTTP error: {status}")); + this.schedule_backoff(CarReaderError::Http(status)); continue; } _ => {} @@ -652,7 +652,7 @@ impl tokio::io::AsyncRead for CarReader { this.state = ReaderState::Stream(stream); } std::task::Poll::Ready(Err(e)) => { - this.schedule_backoff(format!("request error: {e}")); + this.schedule_backoff(CarReaderError::Reqwest(e)); } std::task::Poll::Pending => return std::task::Poll::Pending, }, @@ -673,7 +673,7 @@ impl tokio::io::AsyncRead for CarReader { return std::task::Poll::Ready(Ok(())); } std::task::Poll::Ready(Some(Err(e))) => { - this.schedule_backoff(format!("stream error: {e}")); + this.schedule_backoff(CarReaderError::Reqwest(e)); } std::task::Poll::Pending => return std::task::Poll::Pending, },