diff --git a/Cargo.toml b/Cargo.toml index 7b30a7a..630fee8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["crates/*"] resolver = "2" [workspace.package] -version = "0.6.3" +version = "0.6.4" edition = "2024" rust-version = "1.92" authors = ["init4"] @@ -35,13 +35,13 @@ incremental = false [workspace.dependencies] # internal -signet-hot = { version = "0.6.3", path = "./crates/hot" } -signet-hot-mdbx = { version = "0.6.3", path = "./crates/hot-mdbx" } -signet-cold = { version = "0.6.3", path = "./crates/cold" } -signet-cold-mdbx = { version = "0.6.3", path = "./crates/cold-mdbx" } -signet-cold-sql = { version = "0.6.3", path = "./crates/cold-sql" } -signet-storage = { version = "0.6.3", path = "./crates/storage" } -signet-storage-types = { version = "0.6.3", path = "./crates/types" } +signet-hot = { version = "0.6.4", path = "./crates/hot" } +signet-hot-mdbx = { version = "0.6.4", path = "./crates/hot-mdbx" } +signet-cold = { version = "0.6.4", path = "./crates/cold" } +signet-cold-mdbx = { version = "0.6.4", path = "./crates/cold-mdbx" } +signet-cold-sql = { version = "0.6.4", path = "./crates/cold-sql" } +signet-storage = { version = "0.6.4", path = "./crates/storage" } +signet-storage-types = { version = "0.6.4", path = "./crates/types" } # External, in-house signet-libmdbx = { version = "0.8.0" } @@ -66,6 +66,7 @@ parking_lot = "0.12.5" rand = "0.9.2" rayon = "1.10" serde = { version = "1.0.217", features = ["derive"] } +serial_test = "3.3" tempfile = "3.20.0" thiserror = "2.0.18" tokio = { version = "1.45.0", features = ["full"] } diff --git a/crates/cold-mdbx/src/connector.rs b/crates/cold-mdbx/src/connector.rs new file mode 100644 index 0000000..6b47e04 --- /dev/null +++ b/crates/cold-mdbx/src/connector.rs @@ -0,0 +1,116 @@ +//! MDBX storage connector. +//! +//! Unified connector that can open both hot and cold MDBX databases. + +use crate::{MdbxColdBackend, MdbxColdError}; +use signet_cold::ColdConnect; +use signet_hot::HotConnect; +use signet_hot_mdbx::{DatabaseArguments, DatabaseEnv, MdbxError}; +use std::path::PathBuf; + +/// Errors that can occur when initializing MDBX connectors. +#[derive(Debug, thiserror::Error)] +pub enum MdbxConnectorError { + /// Missing environment variable. + #[error("missing environment variable: {0}")] + MissingEnvVar(&'static str), + + /// Hot storage initialization failed. + #[error("hot storage initialization failed: {0}")] + HotInit(#[from] MdbxError), + + /// Cold storage initialization failed. + #[error("cold storage initialization failed: {0}")] + ColdInit(#[from] MdbxColdError), +} + +/// Connector for MDBX storage (both hot and cold). +/// +/// This unified connector can open MDBX databases for both hot and cold storage. +/// It holds the path and database arguments, which can include custom geometry, +/// sync mode, max readers, and other MDBX-specific configuration. +/// +/// # Example +/// +/// ```ignore +/// use signet_hot_mdbx::{MdbxConnector, DatabaseArguments}; +/// +/// // Hot storage with custom args +/// let hot = MdbxConnector::new("/tmp/hot") +/// .with_db_args(DatabaseArguments::new().with_max_readers(1000)); +/// +/// // Cold storage with default args +/// let cold = MdbxConnector::new("/tmp/cold"); +/// ``` +#[derive(Debug, Clone)] +pub struct MdbxConnector { + path: PathBuf, + db_args: DatabaseArguments, +} + +impl MdbxConnector { + /// Create a new MDBX connector with default database arguments. + pub fn new(path: impl Into) -> Self { + Self { path: path.into(), db_args: DatabaseArguments::new() } + } + + /// Set custom database arguments. + /// + /// This allows configuring MDBX-specific settings like geometry, sync mode, + /// max readers, and exclusive mode. + #[must_use] + pub const fn with_db_args(mut self, db_args: DatabaseArguments) -> Self { + self.db_args = db_args; + self + } + + /// Get a reference to the path. + pub fn path(&self) -> &std::path::Path { + &self.path + } + + /// Get a reference to the database arguments. + pub const fn db_args(&self) -> &DatabaseArguments { + &self.db_args + } + + /// Create a connector from environment variables. + /// + /// Reads the path from the specified environment variable. + /// + /// # Example + /// + /// ```ignore + /// use signet_cold_mdbx::MdbxConnector; + /// + /// let hot = MdbxConnector::from_env("SIGNET_HOT_PATH")?; + /// let cold = MdbxConnector::from_env("SIGNET_COLD_PATH")?; + /// ``` + pub fn from_env(env_var: &'static str) -> Result { + let path: PathBuf = + std::env::var(env_var).map_err(|_| MdbxConnectorError::MissingEnvVar(env_var))?.into(); + Ok(Self::new(path)) + } +} + +impl HotConnect for MdbxConnector { + type Hot = DatabaseEnv; + type Error = MdbxError; + + fn connect(&self) -> Result { + self.db_args.clone().open_rw(&self.path) + } +} + +impl ColdConnect for MdbxConnector { + type Cold = MdbxColdBackend; + type Error = MdbxColdError; + + #[allow(clippy::manual_async_fn)] + fn connect(&self) -> impl std::future::Future> + Send { + // MDBX open is sync, but wrapped in async for trait consistency + // Opens read-write and creates tables + let path = self.path.clone(); + async move { MdbxColdBackend::open_rw(&path) } + } +} diff --git a/crates/cold-mdbx/src/lib.rs b/crates/cold-mdbx/src/lib.rs index 09f1de6..a91fa8b 100644 --- a/crates/cold-mdbx/src/lib.rs +++ b/crates/cold-mdbx/src/lib.rs @@ -46,4 +46,7 @@ pub use tables::{ mod backend; pub use backend::MdbxColdBackend; +mod connector; +pub use connector::{MdbxConnector, MdbxConnectorError}; + pub use signet_hot_mdbx::{DatabaseArguments, DatabaseEnvKind}; diff --git a/crates/cold-sql/src/connector.rs b/crates/cold-sql/src/connector.rs new file mode 100644 index 0000000..08bce0f --- /dev/null +++ b/crates/cold-sql/src/connector.rs @@ -0,0 +1,83 @@ +//! SQL cold storage connector. + +use crate::{SqlColdBackend, SqlColdError}; +use signet_cold::ColdConnect; + +/// Errors that can occur when initializing SQL connectors. +#[derive(Debug, thiserror::Error)] +pub enum SqlConnectorError { + /// Missing environment variable. + #[error("missing environment variable: {0}")] + MissingEnvVar(&'static str), + + /// Cold storage initialization failed. + #[error("cold storage initialization failed: {0}")] + ColdInit(#[from] SqlColdError), +} + +/// Connector for SQL cold storage (PostgreSQL or SQLite). +/// +/// Automatically detects the database type from the URL: +/// - URLs starting with `postgres://` or `postgresql://` use PostgreSQL +/// - URLs starting with `sqlite:` use SQLite +/// +/// # Example +/// +/// ```ignore +/// use signet_cold_sql::SqlConnector; +/// +/// // PostgreSQL +/// let pg = SqlConnector::new("postgres://localhost/signet"); +/// let backend = pg.connect().await?; +/// +/// // SQLite +/// let sqlite = SqlConnector::new("sqlite::memory:"); +/// let backend = sqlite.connect().await?; +/// ``` +#[cfg(any(feature = "sqlite", feature = "postgres"))] +#[derive(Debug, Clone)] +pub struct SqlConnector { + url: String, +} + +#[cfg(any(feature = "sqlite", feature = "postgres"))] +impl SqlConnector { + /// Create a new SQL connector. + /// + /// The database type is detected from the URL prefix. + pub fn new(url: impl Into) -> Self { + Self { url: url.into() } + } + + /// Get a reference to the connection URL. + pub fn url(&self) -> &str { + &self.url + } + + /// Create a connector from environment variables. + /// + /// Reads the SQL URL from the specified environment variable. + /// + /// # Example + /// + /// ```ignore + /// use signet_cold_sql::SqlConnector; + /// + /// let cold = SqlConnector::from_env("SIGNET_COLD_SQL_URL")?; + /// ``` + pub fn from_env(env_var: &'static str) -> Result { + let url = std::env::var(env_var).map_err(|_| SqlConnectorError::MissingEnvVar(env_var))?; + Ok(Self::new(url)) + } +} + +#[cfg(any(feature = "sqlite", feature = "postgres"))] +impl ColdConnect for SqlConnector { + type Cold = SqlColdBackend; + type Error = SqlColdError; + + fn connect(&self) -> impl std::future::Future> + Send { + let url = self.url.clone(); + async move { SqlColdBackend::connect(&url).await } + } +} diff --git a/crates/cold-sql/src/lib.rs b/crates/cold-sql/src/lib.rs index 4a2e7cf..f28e2ad 100644 --- a/crates/cold-sql/src/lib.rs +++ b/crates/cold-sql/src/lib.rs @@ -47,6 +47,11 @@ mod backend; #[cfg(any(feature = "sqlite", feature = "postgres"))] pub use backend::SqlColdBackend; +#[cfg(any(feature = "sqlite", feature = "postgres"))] +mod connector; +#[cfg(any(feature = "sqlite", feature = "postgres"))] +pub use connector::{SqlConnector, SqlConnectorError}; + /// Backward-compatible alias for [`SqlColdBackend`] when using SQLite. #[cfg(feature = "sqlite")] pub type SqliteColdBackend = SqlColdBackend; diff --git a/crates/cold/src/connect.rs b/crates/cold/src/connect.rs new file mode 100644 index 0000000..7ee2a05 --- /dev/null +++ b/crates/cold/src/connect.rs @@ -0,0 +1,21 @@ +//! Connection traits for cold storage backends. + +use crate::ColdStorage; + +/// Connector trait for cold storage backends. +/// +/// Abstracts the connection/opening process for cold storage, allowing +/// different backends to implement their own initialization logic. +pub trait ColdConnect { + /// The cold storage type produced by this connector. + type Cold: ColdStorage; + + /// The error type returned by connection attempts. + type Error: std::error::Error + Send + Sync + 'static; + + /// Connect to the cold storage backend asynchronously. + /// + /// Async to support backends that require async initialization + /// (like SQL connection pools). + fn connect(&self) -> impl std::future::Future> + Send; +} diff --git a/crates/cold/src/lib.rs b/crates/cold/src/lib.rs index ec843cd..866293c 100644 --- a/crates/cold/src/lib.rs +++ b/crates/cold/src/lib.rs @@ -161,6 +161,9 @@ pub use stream::{StreamParams, produce_log_stream_default}; mod traits; pub use traits::{BlockData, ColdStorage, LogStream}; +pub mod connect; +pub use connect::ColdConnect; + /// Task module containing the storage task runner and handles. pub mod task; pub use task::{ColdStorageHandle, ColdStorageReadHandle, ColdStorageTask}; diff --git a/crates/hot/src/connect.rs b/crates/hot/src/connect.rs new file mode 100644 index 0000000..b1575c8 --- /dev/null +++ b/crates/hot/src/connect.rs @@ -0,0 +1,20 @@ +//! Connection traits for hot storage backends. + +use crate::model::HotKv; + +/// Connector trait for hot storage backends. +/// +/// Abstracts the connection/opening process for hot storage, allowing +/// different backends to implement their own initialization logic. +pub trait HotConnect { + /// The hot storage type produced by this connector. + type Hot: HotKv; + + /// The error type returned by connection attempts. + type Error: std::error::Error + Send + Sync + 'static; + + /// Connect to the hot storage backend. + /// + /// Synchronous since most hot storage backends use sync initialization. + fn connect(&self) -> Result; +} diff --git a/crates/hot/src/lib.rs b/crates/hot/src/lib.rs index 5bf1ab6..6430967 100644 --- a/crates/hot/src/lib.rs +++ b/crates/hot/src/lib.rs @@ -105,6 +105,9 @@ #[cfg(any(test, feature = "test-utils"))] pub mod conformance; +pub mod connect; +pub use connect::HotConnect; + pub mod db; pub use db::{HistoryError, HistoryRead, HistoryWrite}; diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 71293db..8ad9963 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -17,7 +17,10 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] signet-cold.workspace = true +signet-cold-mdbx.workspace = true +signet-cold-sql = { workspace = true, optional = true } signet-hot.workspace = true +signet-hot-mdbx.workspace = true signet-storage-types.workspace = true alloy.workspace = true @@ -25,6 +28,7 @@ thiserror.workspace = true tokio-util.workspace = true [dev-dependencies] +serial_test.workspace = true signet-storage = { path = ".", features = ["test-utils"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } tokio-util.workspace = true @@ -33,3 +37,5 @@ trevm.workspace = true [features] default = [] test-utils = ["signet-hot/test-utils", "signet-cold/test-utils"] +postgres = ["signet-cold-sql/postgres"] +sqlite = ["signet-cold-sql/sqlite"] diff --git a/crates/storage/src/builder.rs b/crates/storage/src/builder.rs new file mode 100644 index 0000000..b13e336 --- /dev/null +++ b/crates/storage/src/builder.rs @@ -0,0 +1,217 @@ +//! Storage builder for programmatic and environment-based configuration. + +use crate::{ + StorageError, StorageResult, UnifiedStorage, + config::{ConfigError, ENV_COLD_PATH, ENV_COLD_SQL_URL, ENV_HOT_PATH}, + either::Either, +}; +use signet_cold::ColdConnect; +use signet_cold_mdbx::MdbxConnector; +use signet_hot::HotConnect; +use std::env; +use tokio_util::sync::CancellationToken; + +#[cfg(any(feature = "postgres", feature = "sqlite"))] +use signet_cold_sql::SqlConnector; + +#[cfg(any(feature = "postgres", feature = "sqlite"))] +type EnvColdConnector = Either; + +#[cfg(not(any(feature = "postgres", feature = "sqlite")))] +type EnvColdConnector = Either; + +/// Builder for unified storage configuration. +/// +/// Uses a fluent API with `hot()`, `cold()`, and `build()` methods. +/// +/// # Example +/// +/// ```ignore +/// use signet_storage::StorageBuilder; +/// use signet_hot_mdbx::MdbxConnector; +/// +/// let hot = MdbxConnector::new("/tmp/hot"); +/// let cold = MdbxConnector::new("/tmp/cold"); +/// +/// let storage = StorageBuilder::default() +/// .hot(hot) +/// .cold(cold) +/// .build() +/// .await?; +/// ``` +#[derive(Default, Debug)] +pub struct StorageBuilder { + hot_connector: H, + cold_connector: C, + cancel_token: Option, +} + +impl StorageBuilder<(), ()> { + /// Create a new empty storage builder. + pub fn new() -> Self { + Self::default() + } +} + +impl StorageBuilder { + /// Set the hot storage connector. + pub fn hot(self, hot_connector: NewH) -> StorageBuilder { + StorageBuilder { + hot_connector, + cold_connector: self.cold_connector, + cancel_token: self.cancel_token, + } + } + + /// Set the cold storage connector. + pub fn cold(self, cold_connector: NewC) -> StorageBuilder { + StorageBuilder { + hot_connector: self.hot_connector, + cold_connector, + cancel_token: self.cancel_token, + } + } + + /// Set the cancellation token for the cold storage task. + #[must_use] + pub fn cancel_token(mut self, token: CancellationToken) -> Self { + self.cancel_token = Some(token); + self + } +} + +impl StorageBuilder +where + H: HotConnect, + C: ColdConnect, +{ + /// Build the unified storage instance. + /// + /// Opens both hot and cold backends and spawns the cold storage task. + pub async fn build(self) -> StorageResult> { + // Connect to hot storage (sync) + let hot = self + .hot_connector + .connect() + .map_err(|e| StorageError::Config(format!("hot connection failed: {e}")))?; + + // Connect to cold storage (async) + let cold = self + .cold_connector + .connect() + .await + .map_err(|e| StorageError::Config(format!("cold connection failed: {e}")))?; + + // Use provided cancel token or create new one + let cancel_token = self.cancel_token.unwrap_or_default(); + + // Spawn unified storage with cold task + Ok(UnifiedStorage::spawn(hot, cold, cancel_token)) + } +} + +impl StorageBuilder { + /// Create a builder from environment variables. + /// + /// Reads configuration from: + /// - `SIGNET_HOT_PATH`: Hot storage path (required) + /// - `SIGNET_COLD_PATH`: Cold MDBX path (optional) + /// - `SIGNET_COLD_SQL_URL`: Cold SQL URL (optional, requires postgres or sqlite feature) + /// + /// Checks for `SIGNET_COLD_PATH` first. If present, uses MDBX cold backend. + /// Otherwise checks for `SIGNET_COLD_SQL_URL` for SQL backend. + /// Exactly one cold backend must be specified. + pub fn from_env() -> Result { + // Hot connector from environment (always MDBX) + let hot_connector = MdbxConnector::from_env(ENV_HOT_PATH)?; + + // Determine cold backend from environment + let has_mdbx = env::var(ENV_COLD_PATH).is_ok(); + let has_sql = env::var(ENV_COLD_SQL_URL).is_ok(); + + let cold_connector = match (has_mdbx, has_sql) { + (true, false) => { + let mdbx = MdbxConnector::from_env(ENV_COLD_PATH)?; + Either::left(mdbx) + } + (false, true) => { + #[cfg(any(feature = "postgres", feature = "sqlite"))] + { + let sql = SqlConnector::from_env(ENV_COLD_SQL_URL)?; + Either::right(sql) + } + #[cfg(not(any(feature = "postgres", feature = "sqlite")))] + { + return Err(ConfigError::FeatureNotEnabled { + feature: "postgres or sqlite", + env_var: ENV_COLD_SQL_URL, + }); + } + } + (true, true) => { + return Err(ConfigError::AmbiguousColdBackend); + } + (false, false) => { + return Err(ConfigError::MissingColdBackend); + } + }; + + Ok(Self { hot_connector, cold_connector, cancel_token: None }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serial_test::serial; + + #[test] + #[serial] + fn from_env_missing_hot_path() { + // SAFETY: Test environment + unsafe { + env::remove_var(ENV_HOT_PATH); + } + assert!(StorageBuilder::from_env().is_err()); + } + + #[test] + #[serial] + fn from_env_missing_cold_backend() { + // SAFETY: Test environment + unsafe { + env::set_var(ENV_HOT_PATH, "/tmp/hot"); + env::remove_var(ENV_COLD_PATH); + env::remove_var(ENV_COLD_SQL_URL); + } + let result = StorageBuilder::from_env(); + assert!(matches!(result, Err(ConfigError::MissingColdBackend))); + } + + #[test] + #[serial] + fn from_env_ambiguous_cold_backend() { + // SAFETY: Test environment + unsafe { + env::set_var(ENV_HOT_PATH, "/tmp/hot"); + env::set_var(ENV_COLD_PATH, "/tmp/cold"); + env::set_var(ENV_COLD_SQL_URL, "postgres://localhost/db"); + } + let result = StorageBuilder::from_env(); + assert!(matches!(result, Err(ConfigError::AmbiguousColdBackend))); + } + + #[test] + #[serial] + fn from_env_mdbx_cold() { + // SAFETY: Test environment + unsafe { + env::set_var(ENV_HOT_PATH, "/tmp/hot"); + env::set_var(ENV_COLD_PATH, "/tmp/cold"); + env::remove_var(ENV_COLD_SQL_URL); + } + let builder = StorageBuilder::from_env().unwrap(); + // Verify it's an Either::Left + assert!(matches!(builder.cold_connector, Either::Left(_))); + } +} diff --git a/crates/storage/src/config.rs b/crates/storage/src/config.rs new file mode 100644 index 0000000..53bb029 --- /dev/null +++ b/crates/storage/src/config.rs @@ -0,0 +1,63 @@ +//! Storage configuration types and environment parsing. +//! +//! This module provides environment variable constants and error types +//! for storage configuration. +//! +//! # Environment Variables +//! +//! | Variable | Description | Required When | +//! |----------|-------------|---------------| +//! | `SIGNET_HOT_PATH` | Path to hot MDBX database | Always | +//! | `SIGNET_COLD_PATH` | Path to cold MDBX database | Cold backend is MDBX | +//! | `SIGNET_COLD_SQL_URL` | SQL connection string | Cold backend is SQL | +//! +//! Exactly one of `SIGNET_COLD_PATH` or `SIGNET_COLD_SQL_URL` must be set. + +use signet_cold_mdbx::MdbxConnectorError; +use thiserror::Error; + +#[cfg(any(feature = "postgres", feature = "sqlite"))] +use signet_cold_sql::SqlConnectorError; + +/// Environment variable name for hot storage path. +pub const ENV_HOT_PATH: &str = "SIGNET_HOT_PATH"; + +/// Environment variable name for cold MDBX storage path. +pub const ENV_COLD_PATH: &str = "SIGNET_COLD_PATH"; + +/// Environment variable name for cold SQL connection URL. +pub const ENV_COLD_SQL_URL: &str = "SIGNET_COLD_SQL_URL"; + +/// Configuration errors. +#[derive(Debug, Error)] +pub enum ConfigError { + /// Required environment variable is missing. + #[error("missing environment variable: {0}")] + MissingEnvVar(&'static str), + + /// Cold backend not specified. + #[error("no cold backend specified: set either {ENV_COLD_PATH} or {ENV_COLD_SQL_URL}")] + MissingColdBackend, + + /// Multiple cold backends specified. + #[error("ambiguous cold backend: both {ENV_COLD_PATH} and {ENV_COLD_SQL_URL} are set")] + AmbiguousColdBackend, + + /// Required feature not enabled. + #[error("feature '{feature}' required for {env_var} but not enabled")] + FeatureNotEnabled { + /// The feature name that is required. + feature: &'static str, + /// The environment variable that requires the feature. + env_var: &'static str, + }, + + /// MDBX connector error. + #[error("MDBX connector error: {0}")] + MdbxConnector(#[from] MdbxConnectorError), + + /// SQL connector error. + #[cfg(any(feature = "postgres", feature = "sqlite"))] + #[error("SQL connector error: {0}")] + SqlConnector(#[from] SqlConnectorError), +} diff --git a/crates/storage/src/either.rs b/crates/storage/src/either.rs new file mode 100644 index 0000000..4aa90c6 --- /dev/null +++ b/crates/storage/src/either.rs @@ -0,0 +1,217 @@ +//! Either type for holding one of two connector types. +//! +//! The `Either` type enables runtime backend selection while maintaining compile-time +//! type safety and zero-cost abstraction. The `dispatch_async!` macro reduces +//! boilerplate for the `EitherCold` implementation by generating the repetitive +//! match-and-forward pattern for all ColdStorage trait methods. + +use alloy::primitives::BlockNumber; +use signet_cold::{ + BlockData, ColdConnect, ColdReceipt, ColdResult, ColdStorage, Confirmed, Filter, + HeaderSpecifier, ReceiptSpecifier, SignetEventsSpecifier, StreamParams, TransactionSpecifier, + ZenithHeaderSpecifier, +}; +use signet_cold_mdbx::{MdbxColdBackend, MdbxConnector}; +use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHeader}; +use std::future::Future; + +#[cfg(any(feature = "postgres", feature = "sqlite"))] +use signet_cold_sql::{SqlColdBackend, SqlConnector}; + +type RpcLog = alloy::rpc::types::Log; + +/// Either type that holds one of two cold connectors. +/// +/// Used by `from_env()` to support both MDBX and SQL cold backends. +#[derive(Debug, Clone)] +pub enum Either { + /// Left variant. + Left(L), + /// Right variant. + Right(R), +} + +impl Either { + /// Create a left variant. + pub const fn left(value: L) -> Self { + Self::Left(value) + } + + /// Create a right variant. + pub const fn right(value: R) -> Self { + Self::Right(value) + } +} + +/// Enum to hold either cold backend type. +#[derive(Debug)] +pub enum EitherCold { + /// MDBX cold backend. + Mdbx(MdbxColdBackend), + /// SQL cold backend (PostgreSQL or SQLite). + #[cfg(any(feature = "postgres", feature = "sqlite"))] + Sql(SqlColdBackend), +} + +/// Dispatches an async method call to the inner cold storage backend. +/// +/// This macro reduces boilerplate for EitherCold by generating the match-and-forward +/// pattern. It preserves the method signatures for clarity while eliminating the +/// repetitive async match blocks. +macro_rules! dispatch_async { + ($self:expr, $method:ident($($param:expr),*)) => { + async move { + match $self { + Self::Mdbx(backend) => backend.$method($($param),*).await, + #[cfg(any(feature = "postgres", feature = "sqlite"))] + Self::Sql(backend) => backend.$method($($param),*).await, + } + } + }; +} + +// Implement ColdStorage for EitherCold by dispatching to inner type +#[allow(clippy::manual_async_fn)] +impl ColdStorage for EitherCold { + fn get_header( + &self, + spec: HeaderSpecifier, + ) -> impl Future>> + Send { + dispatch_async!(self, get_header(spec)) + } + + fn get_headers( + &self, + specs: Vec, + ) -> impl Future>>> + Send { + dispatch_async!(self, get_headers(specs)) + } + + fn get_transaction( + &self, + spec: TransactionSpecifier, + ) -> impl Future>>> + Send { + dispatch_async!(self, get_transaction(spec)) + } + + fn get_transactions_in_block( + &self, + block: BlockNumber, + ) -> impl Future>> + Send { + dispatch_async!(self, get_transactions_in_block(block)) + } + + fn get_transaction_count( + &self, + block: BlockNumber, + ) -> impl Future> + Send { + dispatch_async!(self, get_transaction_count(block)) + } + + fn get_receipt( + &self, + spec: ReceiptSpecifier, + ) -> impl Future>> + Send { + dispatch_async!(self, get_receipt(spec)) + } + + fn get_receipts_in_block( + &self, + block: BlockNumber, + ) -> impl Future>> + Send { + dispatch_async!(self, get_receipts_in_block(block)) + } + + fn get_signet_events( + &self, + spec: SignetEventsSpecifier, + ) -> impl Future>> + Send { + dispatch_async!(self, get_signet_events(spec)) + } + + fn get_zenith_header( + &self, + spec: ZenithHeaderSpecifier, + ) -> impl Future>> + Send { + dispatch_async!(self, get_zenith_header(spec)) + } + + fn get_zenith_headers( + &self, + spec: ZenithHeaderSpecifier, + ) -> impl Future>> + Send { + dispatch_async!(self, get_zenith_headers(spec)) + } + + fn get_latest_block(&self) -> impl Future>> + Send { + dispatch_async!(self, get_latest_block()) + } + + fn get_logs( + &self, + filter: &Filter, + max_logs: usize, + ) -> impl Future>> + Send { + dispatch_async!(self, get_logs(filter, max_logs)) + } + + fn produce_log_stream( + &self, + filter: &Filter, + params: StreamParams, + ) -> impl Future + Send { + dispatch_async!(self, produce_log_stream(filter, params)) + } + + fn append_block(&self, data: BlockData) -> impl Future> + Send { + dispatch_async!(self, append_block(data)) + } + + fn append_blocks(&self, data: Vec) -> impl Future> + Send { + dispatch_async!(self, append_blocks(data)) + } + + fn truncate_above(&self, block: BlockNumber) -> impl Future> + Send { + dispatch_async!(self, truncate_above(block)) + } +} + +// When SQL features are enabled +#[cfg(any(feature = "postgres", feature = "sqlite"))] +impl ColdConnect for Either { + type Cold = EitherCold; + type Error = crate::StorageError; + + fn connect(&self) -> impl std::future::Future> + Send { + let self_clone = self.clone(); + async move { + match self_clone { + Either::Left(mdbx) => { + let backend = mdbx.connect().await.map_err(crate::StorageError::MdbxCold)?; + Ok(EitherCold::Mdbx(backend)) + } + Either::Right(sql) => { + let backend = sql.connect().await.map_err(crate::StorageError::SqlCold)?; + Ok(EitherCold::Sql(backend)) + } + } + } + } +} + +// Fallback for when no SQL features are enabled +#[cfg(not(any(feature = "postgres", feature = "sqlite")))] +impl ColdConnect for Either { + type Cold = MdbxColdBackend; + type Error = crate::StorageError; + + fn connect(&self) -> impl std::future::Future> + Send { + let self_clone = self.clone(); + async move { + match self_clone { + Either::Left(mdbx) => mdbx.connect().await.map_err(crate::StorageError::MdbxCold), + Either::Right(()) => unreachable!("SQL not enabled"), + } + } + } +} diff --git a/crates/storage/src/error.rs b/crates/storage/src/error.rs index a9a7d84..19a02eb 100644 --- a/crates/storage/src/error.rs +++ b/crates/storage/src/error.rs @@ -1,7 +1,10 @@ //! Error types for unified storage operations. +use crate::config::ConfigError; use signet_cold::ColdStorageError; +use signet_cold_mdbx::MdbxColdError; use signet_hot::{HistoryError, model::HotKvError}; +use signet_hot_mdbx::MdbxError; /// Error type for unified storage operations. #[derive(Debug, thiserror::Error)] @@ -12,6 +15,19 @@ pub enum StorageError { /// Error from cold storage operations. #[error("cold storage error: {0}")] Cold(#[source] ColdStorageError), + /// Configuration error. + #[error("configuration error: {0}")] + Config(String), + /// MDBX hot storage error. + #[error("MDBX hot storage error: {0}")] + MdbxHot(#[from] MdbxError), + /// MDBX cold storage error. + #[error("MDBX cold storage error: {0}")] + MdbxCold(#[from] MdbxColdError), + /// SQL cold storage error. + #[cfg(any(feature = "postgres", feature = "sqlite"))] + #[error("SQL cold storage error: {0}")] + SqlCold(#[from] signet_cold_sql::SqlColdError), } impl From> for StorageError { @@ -32,5 +48,11 @@ impl From for StorageError { } } +impl From for StorageError { + fn from(err: ConfigError) -> Self { + Self::Config(err.to_string()) + } +} + /// Result type alias for unified storage operations. pub type StorageResult = Result; diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 5fd5bb4..a7b1116 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -60,14 +60,36 @@ mod error; pub use error::{StorageError, StorageResult}; +pub mod config; + +pub mod builder; + +pub mod either; +pub use either::Either; + mod unified; pub use unified::UnifiedStorage; +// Re-export connector traits +pub use signet_cold::ColdConnect; +pub use signet_hot::HotConnect; + +// Re-export unified connectors +pub use signet_cold_mdbx::MdbxConnector; + +#[cfg(any(feature = "postgres", feature = "sqlite"))] +pub use signet_cold_sql::SqlConnector; + // Re-export key types for convenience pub use signet_cold::{ColdStorage, ColdStorageError, ColdStorageHandle, ColdStorageTask}; +pub use signet_cold_mdbx::MdbxColdBackend; pub use signet_hot::{ HistoryError, HistoryRead, HistoryWrite, HotKv, model::{HotKvRead, RevmRead, RevmWrite}, }; +pub use signet_hot_mdbx::{DatabaseArguments, DatabaseEnv}; pub use signet_storage_types::{ExecutedBlock, ExecutedBlockBuilder}; pub use tokio_util::sync::CancellationToken; + +#[cfg(any(feature = "postgres", feature = "sqlite"))] +pub use signet_cold_sql::SqlColdBackend; diff --git a/crates/storage/src/unified.rs b/crates/storage/src/unified.rs index f6bc1cc..7312df3 100644 --- a/crates/storage/src/unified.rs +++ b/crates/storage/src/unified.rs @@ -105,6 +105,11 @@ impl UnifiedStorage { &self.hot } + /// Consume self and return the hot storage backend. + pub fn into_hot(self) -> H { + self.hot + } + /// Get a reference to the cold storage handle. pub const fn cold(&self) -> &ColdStorageHandle { &self.cold