Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 0 additions & 19 deletions crates/core/datasets-raw/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ impl std::convert::AsRef<dyn std::error::Error + 'static> for BlockStreamError {
/// Error type for [`BlockStreamer::latest_block`].
pub type LatestBlockError = Box<dyn std::error::Error + Sync + Send + 'static>;

/// Error type for [`BlockStreamer::wait_for_cleanup`].
pub type CleanupError = Box<dyn std::error::Error + Sync + Send + 'static>;

/// Trait for extracting raw blockchain data as a stream of rows.
///
/// Implementations fetch blocks from a data source (RPC, archive, etc.) and convert them
Expand Down Expand Up @@ -106,18 +103,6 @@ pub trait BlockStreamer: Clone + 'static {
/// for parallel streaming.
fn bucket_size(&self) -> Option<NonZeroU64>;

/// Waits for any background work and resources associated with this [`BlockStreamer`]
/// to be cleaned up.
///
/// This should be called once the user no longer needs to create new block streams
/// to allow implementations to terminate internal tasks, flush or release network
/// connections, and free any other resources.
///
/// After requesting cleanup, callers should not call [BlockStreamer::block_stream]
/// again on the same instance. Behavior when creating new streams after cleanup is
/// implementation-defined and must not be relied on.
fn wait_for_cleanup(self) -> impl Future<Output = Result<(), CleanupError>> + Send;

fn provider_name(&self) -> &str;
}

Expand Down Expand Up @@ -261,10 +246,6 @@ where
self.0.bucket_size()
}

fn wait_for_cleanup(self) -> impl Future<Output = Result<(), CleanupError>> + Send {
self.0.wait_for_cleanup()
}

fn provider_name(&self) -> &str {
self.0.provider_name()
}
Expand Down
8 changes: 1 addition & 7 deletions crates/core/providers-evm-rpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ use datasets_common::{
};
use datasets_raw::{
Timestamp,
client::{
BlockStreamError, BlockStreamResultExt, BlockStreamer, CleanupError, LatestBlockError,
},
client::{BlockStreamError, BlockStreamResultExt, BlockStreamer, LatestBlockError},
evm::{
EvmCurrency,
tables::{
Expand Down Expand Up @@ -499,10 +497,6 @@ impl BlockStreamer for Client {
None
}

async fn wait_for_cleanup(self) -> Result<(), CleanupError> {
Ok(())
}

fn provider_name(&self) -> &str {
&self.provider_name
}
Expand Down
8 changes: 1 addition & 7 deletions crates/core/providers-firehose/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use amp_providers_common::{network_id::NetworkId, provider_name::ProviderName};
use async_stream::stream;
use datasets_common::block_num::BlockNum;
use datasets_raw::{
client::{
BlockStreamError, BlockStreamResultExt, BlockStreamer, CleanupError, LatestBlockError,
},
client::{BlockStreamError, BlockStreamResultExt, BlockStreamer, LatestBlockError},
rows::Rows,
};
use futures::{Stream, StreamExt as _, TryStreamExt as _};
Expand Down Expand Up @@ -275,10 +273,6 @@ impl BlockStreamer for Client {
None
}

async fn wait_for_cleanup(self) -> Result<(), CleanupError> {
Ok(())
}

#[instrument(skip(self), err)]
async fn latest_block(
&mut self,
Expand Down
10 changes: 1 addition & 9 deletions crates/core/providers-registry/src/client/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use amp_providers_solana::kind::SolanaProviderKind;
use async_stream::stream;
use datasets_common::{block_num::BlockNum, network_id::NetworkId};
use datasets_raw::{
client::{BlockStreamError, BlockStreamer, CleanupError, LatestBlockError},
client::{BlockStreamError, BlockStreamer, LatestBlockError},
rows::Rows,
};
use futures::Stream;
Expand Down Expand Up @@ -141,14 +141,6 @@ impl BlockStreamer for BlockStreamClient {
}
}

async fn wait_for_cleanup(self) -> Result<(), CleanupError> {
match self {
Self::EvmRpc(client) => client.wait_for_cleanup().await,
Self::Solana(client) => client.wait_for_cleanup().await,
Self::Firehose(client) => client.wait_for_cleanup().await,
}
}

fn provider_name(&self) -> &str {
match self {
Self::EvmRpc(client) => client.provider_name(),
Expand Down
9 changes: 1 addition & 8 deletions crates/core/providers-solana/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{num::NonZeroU32, path::PathBuf};
use std::num::NonZeroU32;

use amp_providers_common::{network_id::NetworkId, redacted::Redacted};
use headers::{HeaderName, HeaderValue};
Expand Down Expand Up @@ -40,13 +40,6 @@ pub struct SolanaProviderConfig {
/// Optional rate limit for RPC calls per second.
pub max_rpc_calls_per_second: Option<NonZeroU32>,

/// Directory for storing Old Faithful ONE CAR files.
pub of1_car_directory: PathBuf,

/// Whether to keep downloaded CAR files after processing.
#[serde(default)]
pub keep_of1_car_files: bool,

/// Controls when to use the Solana archive for historical data.
#[serde(default)]
pub use_archive: UseArchive,
Expand Down
14 changes: 1 addition & 13 deletions crates/core/worker-datasets-raw/src/job_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ use datasets_common::{
table_name::TableName,
};
use datasets_raw::{
client::{BlockStreamer as _, BlockStreamerExt as _, CleanupError, LatestBlockError},
client::{BlockStreamer as _, BlockStreamerExt as _, LatestBlockError},
dataset::Dataset as RawDataset,
};

Expand Down Expand Up @@ -407,7 +407,6 @@ pub async fn execute(
}
}

client.wait_for_cleanup().await.map_err(Error::Cleanup)?;
tracing::info!("materialize completed successfully");

Ok(())
Expand Down Expand Up @@ -525,13 +524,6 @@ pub enum Error {
/// - Partition task panics (assertion failures, unwrap on None/Err, stack overflow)
#[error("Partition task failed")]
PartitionTask(#[source] TryWaitAllError<RunRangeError>),

/// Failure during blockchain client cleanup
///
/// At the end of the materialize process, the blockchain client may need to perform
/// cleanup operations, some of which could fail. This error indicates such a failure.
#[error("Failed to perform blockchain client cleanup: {0}")]
Cleanup(#[source] CleanupError),
}

impl RetryableErrorExt for Error {
Expand Down Expand Up @@ -565,9 +557,6 @@ impl RetryableErrorExt for Error {

// Partition tasks — delegate to TryWaitAllError classification
Self::PartitionTask(err) => err.is_retryable(),

// Cleanup failures — recoverable
Self::Cleanup(_) => true,
}
}
}
Expand Down Expand Up @@ -595,7 +584,6 @@ impl amp_worker_core::retryable::JobErrorExt for Error {
Self::LatestBlock(_) => "LATEST_BLOCK",
Self::MissingRanges(_) => "MISSING_RANGES",
Self::PartitionTask(_) => "PARTITION_TASK",
Self::Cleanup(_) => "CLEANUP",
}
}
}
4 changes: 2 additions & 2 deletions crates/extractors/solana/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ amp-providers-solana = { path = "../../core/providers-solana" }
anyhow.workspace = true
async-stream.workspace = true
backon.workspace = true
bs58 = "0.5.1"
base64 = "0.22.1"
bs58 = "0.5.1"
bytes.workspace = true
datasets-common = { path = "../../core/datasets-common" }
datasets-raw = { path = "../../core/datasets-raw" }
futures.workspace = true
fs-err.workspace = true
governor.workspace = true
headers = { workspace = true }
monitoring = { path = "../../core/monitoring" }
memmap2 = "0.9.9"
reqwest.workspace = true
schemars = { workspace = true, optional = true }
serde.workspace = true
Expand Down
48 changes: 20 additions & 28 deletions crates/extractors/solana/examples/solana_compare.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! The program streams blocks from OF1 for a given epoch, fetches the same blocks via JSON-RPC,
//! and compares the results at the [solana_datasets::tables::NonEmptySlot] level.

#[cfg(debug_assertions)]
use std::{collections::HashSet, sync::Mutex};
use std::{path::PathBuf, sync::Arc, time::Duration};

use anyhow::Context;
Expand Down Expand Up @@ -52,29 +54,20 @@ async fn main() -> anyhow::Result<()> {

tracing::info!( epoch = %cli.epoch, start_slot, end_slot, "running OF1 vs RPC comparison");

let (car_manager_tx, car_manager_rx) = tokio::sync::mpsc::channel(128);

let car_manager_jh = tokio::task::spawn(of1_client::car_file_manager(
car_manager_rx,
provider_cfg.of1_car_directory.clone(),
provider_cfg.keep_of1_car_files,
cli.provider_name.clone(),
provider_cfg.network.clone(),
None,
));

let rpc_connection_info = rpc_client::RpcProviderConnectionInfo {
url: provider_cfg.rpc_provider_info.url,
auth: None,
let reqwest = Arc::new(reqwest::Client::new());
let rpc_client = {
let rpc_connection_info = rpc_client::RpcProviderConnectionInfo {
url: provider_cfg.rpc_provider_info.url,
auth: None,
};
Arc::new(rpc_client::SolanaRpcClient::new(
rpc_connection_info,
provider_cfg.max_rpc_calls_per_second,
cli.provider_name,
provider_cfg.network.clone(),
))
};

let rpc_client = Arc::new(rpc_client::SolanaRpcClient::new(
rpc_connection_info,
provider_cfg.max_rpc_calls_per_second,
cli.provider_name,
provider_cfg.network.clone(),
));

let get_block_config = rpc_client::rpc_config::RpcBlockConfig {
encoding: Some(rpc_client::rpc_config::UiTransactionEncoding::Json),
transaction_details: Some(rpc_client::rpc_config::TransactionDetails::Full),
Expand All @@ -86,11 +79,15 @@ async fn main() -> anyhow::Result<()> {
let of1_stream = of1_client::stream(
start_slot,
end_slot,
provider_cfg.of1_car_directory,
car_manager_tx.clone(),
reqwest,
rpc_client.clone(),
get_block_config,
// Metrics, we don't need to record them.
None,
// In-progress epochs, won't affect the example since it only matters
// when there are multiple concurrent epochs being processed.
#[cfg(debug_assertions)]
Arc::new(Mutex::new(HashSet::new())),
);

let mut expected_slot_num = start_slot;
Expand Down Expand Up @@ -191,11 +188,6 @@ async fn main() -> anyhow::Result<()> {
}
}

drop(car_manager_tx);
if let Err(e) = car_manager_jh.await {
tracing::error!(error = %e, "car file manager task failed");
}

tracing::info!("comparison complete");

Ok(())
Expand Down
Loading