diff --git a/Cargo.lock b/Cargo.lock index 8db63769d..ee0af9ad2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -42,14 +42,12 @@ dependencies = [ "datafusion", "datasets-common", "datasets-derived", - "evm-rpc-datasets", - "firehose-datasets", + "datasets-raw", "futures", "metadata-db", "monitoring", "serde", "serde_json", - "solana-datasets", "thiserror 2.0.18", "tokio", "toml", diff --git a/crates/clients/admin/src/datasets.rs b/crates/clients/admin/src/datasets.rs index fcb3f54d1..762461f87 100644 --- a/crates/clients/admin/src/datasets.rs +++ b/crates/clients/admin/src/datasets.rs @@ -234,9 +234,6 @@ impl<'a> DatasetsClient<'a> { "MANIFEST_VALIDATION_ERROR" => Err(RegisterError::ManifestValidationError( error_response.into(), )), - "UNSUPPORTED_DATASET_KIND" => { - Err(RegisterError::UnsupportedDatasetKind(error_response.into())) - } "MANIFEST_REGISTRATION_ERROR" => Err(RegisterError::ManifestRegistrationError( error_response.into(), )), @@ -1526,13 +1523,6 @@ pub enum RegisterError { #[error("dependency validation error: {0}")] ManifestValidationError(#[source] ApiError), - /// Unsupported dataset kind (400, UNSUPPORTED_DATASET_KIND) - /// - /// This occurs when: - /// - Dataset kind is not one of the supported types (manifest, evm-rpc, firehose) - #[error("unsupported dataset kind")] - UnsupportedDatasetKind(#[source] ApiError), - /// Failed to register manifest in the system (500, MANIFEST_REGISTRATION_ERROR) /// /// This occurs when: diff --git a/crates/clients/admin/src/manifests.rs b/crates/clients/admin/src/manifests.rs index ef3433c5b..7419c0d21 100644 --- a/crates/clients/admin/src/manifests.rs +++ b/crates/clients/admin/src/manifests.rs @@ -138,9 +138,6 @@ impl<'a> ManifestsClient<'a> { "MANIFEST_VALIDATION_ERROR" => Err(RegisterError::ManifestValidationError( error_response.into(), )), - "UNSUPPORTED_DATASET_KIND" => { - Err(RegisterError::UnsupportedDatasetKind(error_response.into())) - } "MANIFEST_STORAGE_ERROR" => { Err(RegisterError::ManifestStorageError(error_response.into())) } @@ -575,14 +572,6 @@ pub enum RegisterError { #[error("dependency validation error")] ManifestValidationError(#[source] ApiError), - /// Unsupported dataset kind (400, UNSUPPORTED_DATASET_KIND) - /// - /// This occurs when: - /// - Dataset kind is not one of the supported types (manifest, evm-rpc, firehose) - /// - The 'kind' field in the manifest contains an unrecognized value - #[error("unsupported dataset kind")] - UnsupportedDatasetKind(#[source] ApiError), - /// Failed to write manifest to object store (500, MANIFEST_STORAGE_ERROR) /// /// This occurs when: diff --git a/crates/core/datasets-common/src/dataset_kind_str.rs b/crates/core/datasets-common/src/dataset_kind_str.rs index 836a05c01..24f458004 100644 --- a/crates/core/datasets-common/src/dataset_kind_str.rs +++ b/crates/core/datasets-common/src/dataset_kind_str.rs @@ -11,8 +11,12 @@ pub struct DatasetKindStr(String); impl DatasetKindStr { - /// Creates a new [`DatasetKindStr`] from a string identifier. - pub fn new(kind: String) -> Self { + /// Creates a new [`DatasetKindStr`] from a string identifier without validation. + /// + /// # Safety + /// The caller must ensure the provided string is a valid dataset kind identifier + /// (e.g., originates from a strongly-typed ZST kind or a trusted database value). + pub fn new_unchecked(kind: String) -> Self { Self(kind) } @@ -20,6 +24,11 @@ impl DatasetKindStr { pub fn as_str(&self) -> &str { &self.0 } + + /// Consumes the [`DatasetKindStr`] and returns the inner [`String`]. + pub fn into_inner(self) -> String { + self.0 + } } impl AsRef for DatasetKindStr { @@ -53,3 +62,58 @@ impl PartialEq for &str { *self == other.0 } } + +#[cfg(feature = "metadata-db")] +impl From for DatasetKindStr { + fn from(value: metadata_db::manifests::ManifestKindOwned) -> Self { + // SAFETY: ManifestKindOwned values originate from the database, which only stores + // validated kind strings inserted at system boundaries. + DatasetKindStr::new_unchecked(value.into_inner()) + } +} + +#[cfg(feature = "metadata-db")] +impl From for metadata_db::manifests::ManifestKindOwned { + fn from(value: DatasetKindStr) -> Self { + // SAFETY: DatasetKindStr values originate from validated domain types (ZST kind types), + // so invariants are upheld. + metadata_db::manifests::ManifestKind::from_owned_unchecked(value.0) + } +} + +#[cfg(feature = "metadata-db")] +impl<'a> From<&'a DatasetKindStr> for metadata_db::manifests::ManifestKind<'a> { + fn from(value: &'a DatasetKindStr) -> Self { + // SAFETY: DatasetKindStr values originate from validated domain types (ZST kind types), + // so invariants are upheld. + metadata_db::manifests::ManifestKind::from_ref_unchecked(value.as_str()) + } +} + +#[cfg(feature = "metadata-db")] +impl<'a> PartialEq> for DatasetKindStr { + fn eq(&self, other: &metadata_db::manifests::ManifestKind<'a>) -> bool { + self.as_str() == other.as_str() + } +} + +#[cfg(feature = "metadata-db")] +impl<'a> PartialEq for metadata_db::manifests::ManifestKind<'a> { + fn eq(&self, other: &DatasetKindStr) -> bool { + self.as_str() == other.as_str() + } +} + +#[cfg(feature = "metadata-db")] +impl<'a> PartialEq> for &DatasetKindStr { + fn eq(&self, other: &metadata_db::manifests::ManifestKind<'a>) -> bool { + self.as_str() == other.as_str() + } +} + +#[cfg(feature = "metadata-db")] +impl<'a> PartialEq<&DatasetKindStr> for metadata_db::manifests::ManifestKind<'a> { + fn eq(&self, other: &&DatasetKindStr) -> bool { + self.as_str() == other.as_str() + } +} diff --git a/crates/core/datasets-derived/src/dataset_kind.rs b/crates/core/datasets-derived/src/dataset_kind.rs index 75835f0c4..6731df1b1 100644 --- a/crates/core/datasets-derived/src/dataset_kind.rs +++ b/crates/core/datasets-derived/src/dataset_kind.rs @@ -35,7 +35,9 @@ impl DerivedDatasetKind { impl From for DatasetKindStr { fn from(value: DerivedDatasetKind) -> Self { - DatasetKindStr::new(value.to_string()) + // SAFETY: DerivedDatasetKind is a strongly-typed ZST whose Display impl produces + // a valid dataset kind string. + DatasetKindStr::new_unchecked(value.to_string()) } } diff --git a/crates/core/datasets-derived/src/lib.rs b/crates/core/datasets-derived/src/lib.rs index bf45ed97a..0d611501f 100644 --- a/crates/core/datasets-derived/src/lib.rs +++ b/crates/core/datasets-derived/src/lib.rs @@ -24,7 +24,7 @@ pub mod sql_str; pub use self::{ dataset::Dataset, - dataset_kind::{DerivedDatasetKind, DerivedDatasetKindError}, + dataset_kind::DerivedDatasetKind, func_name::FuncName, function::{Function, FunctionSource}, manifest::Manifest, diff --git a/crates/core/datasets-raw/src/dataset_kind.rs b/crates/core/datasets-raw/src/dataset_kind.rs index 620ad812f..e84aa5a3f 100644 --- a/crates/core/datasets-raw/src/dataset_kind.rs +++ b/crates/core/datasets-raw/src/dataset_kind.rs @@ -36,7 +36,9 @@ macro_rules! define_dataset_kind { impl From<$Name> for DatasetKindStr { fn from(value: $Name) -> Self { - DatasetKindStr::new(value.to_string()) + // SAFETY: $Name is a strongly-typed ZST whose Display impl produces + // a valid dataset kind string. + DatasetKindStr::new_unchecked(value.to_string()) } } diff --git a/crates/core/datasets-registry/src/lib.rs b/crates/core/datasets-registry/src/lib.rs index d354e16c6..456067b61 100644 --- a/crates/core/datasets-registry/src/lib.rs +++ b/crates/core/datasets-registry/src/lib.rs @@ -1,8 +1,8 @@ use std::collections::BTreeSet; use datasets_common::{ - hash::Hash, hash_reference::HashReference, name::Name, namespace::Namespace, - reference::Reference, revision::Revision, version::Version, + dataset_kind_str::DatasetKindStr, hash::Hash, hash_reference::HashReference, name::Name, + namespace::Namespace, reference::Reference, revision::Revision, version::Version, }; use metadata_db::MetadataDb; @@ -63,6 +63,7 @@ impl DatasetsRegistry { pub async fn register_manifest( &self, hash: &Hash, + kind: &DatasetKindStr, content: String, ) -> Result<(), RegisterManifestError> { let path = self @@ -70,7 +71,7 @@ impl DatasetsRegistry { .store(hash, content) .await .map_err(RegisterManifestError::ManifestStorage)?; - metadata_db::manifests::register(&self.metadata_db, hash, path) + metadata_db::manifests::register(&self.metadata_db, hash, kind, path) .await .map_err(RegisterManifestError::MetadataRegistration)?; Ok(()) diff --git a/crates/core/datasets-static/src/dataset_kind.rs b/crates/core/datasets-static/src/dataset_kind.rs index 9df99d3f7..7f537bc10 100644 --- a/crates/core/datasets-static/src/dataset_kind.rs +++ b/crates/core/datasets-static/src/dataset_kind.rs @@ -31,7 +31,9 @@ impl StaticDatasetKind { impl From for DatasetKindStr { fn from(value: StaticDatasetKind) -> Self { - DatasetKindStr::new(value.to_string()) + // SAFETY: StaticDatasetKind is a strongly-typed ZST whose Display impl produces + // a valid dataset kind string. + DatasetKindStr::new_unchecked(value.to_string()) } } diff --git a/crates/core/metadata-db/migrations/20260312000000_add_kind_to_manifest_files.sql b/crates/core/metadata-db/migrations/20260312000000_add_kind_to_manifest_files.sql new file mode 100644 index 000000000..3c8d6467d --- /dev/null +++ b/crates/core/metadata-db/migrations/20260312000000_add_kind_to_manifest_files.sql @@ -0,0 +1,9 @@ +-- ============================================================= +-- Migration: Add kind column to manifest_files +-- ============================================================= +-- Adds a TEXT kind column to manifest_files to store the dataset +-- kind (e.g., evm-rpc, derived, static) for each manifest file. +-- Defaults to 'unknown' for existing rows. +-- ============================================================= + +ALTER TABLE manifest_files ADD COLUMN kind TEXT NOT NULL DEFAULT 'unknown'; diff --git a/crates/core/metadata-db/src/manifests.rs b/crates/core/metadata-db/src/manifests.rs index 68b75e51b..7b865f92f 100644 --- a/crates/core/metadata-db/src/manifests.rs +++ b/crates/core/metadata-db/src/manifests.rs @@ -2,18 +2,20 @@ //! //! This module provides operations for managing manifest files: //! - **manifest_files**: Content-addressable manifest storage indexed by SHA256 hash -//! - **Type definitions**: ManifestHash, ManifestPath +//! - **Type definitions**: ManifestHash, ManifestPath, ManifestKind //! //! ## Database Tables //! -//! - **manifest_files**: Content-addressable manifest storage with hash → path mapping +//! - **manifest_files**: Content-addressable manifest storage with hash → path → kind mapping mod hash; +mod kind; mod path; pub(crate) mod sql; pub use self::{ hash::{Hash as ManifestHash, HashOwned as ManifestHashOwned}, + kind::{Kind as ManifestKind, KindOwned as ManifestKindOwned}, path::{Path as ManifestPath, PathOwned as ManifestPathOwned}, sql::ManifestSummary, }; @@ -21,19 +23,20 @@ use crate::{db::Executor, error::Error}; /// Register manifest file in content-addressable storage /// -/// Inserts manifest hash and path into `manifest_files` table with ON +/// Inserts manifest hash, path, and kind into `manifest_files` table with ON /// CONFLICT DO NOTHING. This operation is idempotent - duplicate /// registrations are silently ignored. #[tracing::instrument(skip(exe), err)] pub async fn register<'c, E>( exe: E, hash: impl Into> + std::fmt::Debug, + kind: impl Into> + std::fmt::Debug, path: impl Into> + std::fmt::Debug, ) -> Result<(), Error> where E: Executor<'c>, { - sql::insert(exe, hash.into(), path.into()) + sql::insert(exe, hash.into(), kind.into(), path.into()) .await .map_err(Error::Database) } @@ -74,6 +77,7 @@ where /// /// Queries for all manifests in the `manifest_files` table, returning: /// - Hash (content-addressable identifier) +/// - Kind (dataset kind, e.g., "evm-rpc", "solana", "firehose", "manifest") /// - Dataset count (number of datasets using this manifest) /// /// Results are ordered by hash. diff --git a/crates/core/metadata-db/src/manifests/kind.rs b/crates/core/metadata-db/src/manifests/kind.rs new file mode 100644 index 000000000..5c9844a17 --- /dev/null +++ b/crates/core/metadata-db/src/manifests/kind.rs @@ -0,0 +1,166 @@ +//! Manifest kind new-type wrapper for database values +//! +//! This module provides a [`Kind`] new-type wrapper around [`Cow`] that maintains +//! manifest kind invariants for database operations. The type provides efficient handling +//! with support for both borrowed and owned strings. +//! +//! ## Validation Strategy +//! +//! This type **maintains invariants but does not validate** input data. Validation occurs +//! at system boundaries through manifest registration operations, which ensure kinds are +//! valid before converting into this database-layer type. Database values are trusted as +//! already valid, following the principle of "validate at boundaries, trust database data." +//! +//! Types that convert into [`Kind`] are responsible for ensuring invariants are met: +//! - Manifest kind must be a non-empty string identifying the dataset type +//! - Examples: `"evm-rpc"`, `"solana"`, `"firehose"`, `"manifest"`, `"unknown"` + +use std::borrow::Cow; + +/// An owned manifest kind type for database return values and owned storage scenarios. +/// +/// This is a type alias for `Kind<'static>`, specifically intended for use as a return type from +/// database queries or in any context where a manifest kind with owned storage is required. +/// Prefer this alias when working with kinds that need to be stored or returned from the database, +/// rather than just representing a manifest kind with owned storage in general. +pub type KindOwned = Kind<'static>; + +/// A manifest kind wrapper for database values. +/// +/// This new-type wrapper around `Cow` maintains manifest kind invariants for database +/// operations. It supports both borrowed and owned strings through copy-on-write semantics, +/// enabling efficient handling without unnecessary allocations. +/// +/// The type trusts that values are already validated. Validation must occur at system +/// boundaries before conversion into this type. +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize)] +pub struct Kind<'a>(Cow<'a, str>); + +impl<'a> Kind<'a> { + /// Create a new Kind wrapper from a reference to str (borrowed) + /// + /// # Safety + /// The caller must ensure the provided kind upholds the manifest kind invariants. + /// This method does not perform validation. Failure to uphold the invariants may + /// cause undefined behavior. + pub fn from_ref_unchecked(kind: &'a str) -> Self { + Self(Cow::Borrowed(kind)) + } + + /// Create a new Kind wrapper from an owned String + /// + /// # Safety + /// The caller must ensure the provided kind upholds the manifest kind invariants. + /// This method does not perform validation. Failure to uphold the invariants may + /// cause undefined behavior. + pub fn from_owned_unchecked(kind: String) -> Kind<'static> { + Kind(Cow::Owned(kind)) + } + + /// Consume and return the inner String (owned) + pub fn into_inner(self) -> String { + match self { + Kind(Cow::Owned(kind)) => kind, + Kind(Cow::Borrowed(kind)) => kind.to_owned(), + } + } + + /// Get a reference to the inner str + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl<'a> From<&'a Kind<'a>> for Kind<'a> { + fn from(value: &'a Kind<'a>) -> Self { + // Create a borrowed Cow variant pointing to the data inside the input Kind. + // This works for both Cow::Borrowed and Cow::Owned without cloning the underlying data. + // SAFETY: The input Kind already upholds invariants, so the referenced data is valid. + Kind::from_ref_unchecked(value.as_ref()) + } +} + +impl<'a> std::ops::Deref for Kind<'a> { + type Target = str; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a> AsRef for Kind<'a> { + fn as_ref(&self) -> &str { + &self.0 + } +} + +impl<'a> PartialEq<&str> for Kind<'a> { + fn eq(&self, other: &&str) -> bool { + self.as_str() == *other + } +} + +impl<'a> PartialEq> for &str { + fn eq(&self, other: &Kind<'a>) -> bool { + *self == other.as_str() + } +} + +impl<'a> PartialEq for Kind<'a> { + fn eq(&self, other: &str) -> bool { + self.as_str() == other + } +} + +impl<'a> PartialEq> for str { + fn eq(&self, other: &Kind<'a>) -> bool { + self == other.as_str() + } +} + +impl<'a> PartialEq for Kind<'a> { + fn eq(&self, other: &String) -> bool { + self.as_str() == other + } +} + +impl<'a> PartialEq> for String { + fn eq(&self, other: &Kind<'a>) -> bool { + self == other.as_str() + } +} + +impl<'a> std::fmt::Display for Kind<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl<'a> std::fmt::Debug for Kind<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl sqlx::Type for Kind<'_> { + fn type_info() -> sqlx::postgres::PgTypeInfo { + >::type_info() + } +} + +impl<'a> sqlx::Encode<'_, sqlx::Postgres> for Kind<'a> { + fn encode_by_ref( + &self, + buf: &mut ::ArgumentBuffer<'_>, + ) -> Result { + <&str as sqlx::Encode<'_, sqlx::Postgres>>::encode_by_ref(&self.as_str(), buf) + } +} + +impl<'r> sqlx::Decode<'r, sqlx::Postgres> for KindOwned { + fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result { + let s = >::decode(value)?; + // SAFETY: Database values are trusted to uphold invariants; validation occurs at boundaries before insertion. + Ok(Kind::from_owned_unchecked(s)) + } +} diff --git a/crates/core/metadata-db/src/manifests/sql.rs b/crates/core/metadata-db/src/manifests/sql.rs index f6d5ff3bf..ccb664101 100644 --- a/crates/core/metadata-db/src/manifests/sql.rs +++ b/crates/core/metadata-db/src/manifests/sql.rs @@ -4,24 +4,31 @@ use sqlx::{Executor, Postgres}; use super::{ hash::{Hash, HashOwned}, + kind::{Kind, KindOwned}, path::{Path, PathOwned}, }; /// Insert a new manifest record /// /// Idempotent (`ON CONFLICT DO NOTHING`). -pub(crate) async fn insert<'c, E>(exe: E, hash: Hash<'_>, path: Path<'_>) -> Result<(), sqlx::Error> +pub(crate) async fn insert<'c, E>( + exe: E, + hash: Hash<'_>, + kind: Kind<'_>, + path: Path<'_>, +) -> Result<(), sqlx::Error> where E: Executor<'c, Database = Postgres>, { let query = indoc::indoc! {r#" - INSERT INTO manifest_files (hash, path) - VALUES ($1, $2) + INSERT INTO manifest_files (hash, kind, path) + VALUES ($1, $2, $3) ON CONFLICT (hash) DO NOTHING "#}; sqlx::query(query) .bind(hash) + .bind(kind) .bind(path) .execute(exe) .await?; @@ -103,6 +110,7 @@ where #[derive(Debug, Clone)] pub struct ManifestSummary { pub hash: HashOwned, + pub kind: KindOwned, pub dataset_count: i64, } @@ -111,6 +119,7 @@ impl<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> for ManifestSummary { use sqlx::Row; Ok(Self { hash: row.try_get("hash")?, + kind: row.try_get("kind")?, dataset_count: row.try_get("dataset_count")?, }) } @@ -118,7 +127,7 @@ impl<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> for ManifestSummary { /// List all manifests with metadata /// -/// Returns a vector of manifest summaries including hash and dataset link count, +/// Returns a vector of manifest summaries including hash, kind, and dataset link count, /// ordered by hash. pub(crate) async fn list_all<'c, E>(exe: E) -> Result, sqlx::Error> where @@ -127,10 +136,11 @@ where let query = indoc::indoc! {r#" SELECT mf.hash, + mf.kind, COALESCE(COUNT(dm.hash), 0) AS dataset_count FROM manifest_files mf LEFT JOIN dataset_manifests dm ON mf.hash = dm.hash - GROUP BY mf.hash + GROUP BY mf.hash, mf.kind ORDER BY mf.hash "#}; diff --git a/crates/core/metadata-db/src/tests/it_txn.rs b/crates/core/metadata-db/src/tests/it_txn.rs index 0a90338ed..fb590de06 100644 --- a/crates/core/metadata-db/src/tests/it_txn.rs +++ b/crates/core/metadata-db/src/tests/it_txn.rs @@ -2,7 +2,7 @@ use crate::{ datasets::{self, DatasetName, DatasetNamespace}, - manifests::{self, ManifestHash, ManifestPath}, + manifests::{self, ManifestHash, ManifestKind, ManifestPath}, tests::helpers::setup_test_db, }; @@ -16,13 +16,14 @@ async fn commit_persists_changes() { let manifest_hash = ManifestHash::from_ref_unchecked( "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef", ); + let manifest_kind = ManifestKind::from_ref_unchecked("evm-rpc"); let manifest_path = ManifestPath::from_ref_unchecked("path/to/manifest-commit.json"); // Begin transaction let mut tx = conn.begin_txn().await.expect("Failed to begin transaction"); // Make changes within transaction - manifests::register(&mut tx, &manifest_hash, &manifest_path) + manifests::register(&mut tx, &manifest_hash, &manifest_kind, &manifest_path) .await .expect("Failed to register manifest in transaction"); datasets::link_manifest(&mut tx, &namespace, &name, &manifest_hash) @@ -56,12 +57,13 @@ async fn explicit_rollback_discards_changes() { let manifest_hash = ManifestHash::from_ref_unchecked( "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef", ); + let manifest_kind = ManifestKind::from_ref_unchecked("evm-rpc"); let manifest_path = ManifestPath::from_ref_unchecked("path/to/manifest-rollback.json"); let mut tx = conn.begin_txn().await.expect("Failed to begin transaction"); // Make changes within transaction - manifests::register(&mut tx, &manifest_hash, manifest_path) + manifests::register(&mut tx, &manifest_hash, &manifest_kind, &manifest_path) .await .expect("Failed to register manifest in transaction"); datasets::link_manifest(&mut tx, &namespace, &name, &manifest_hash) @@ -97,12 +99,13 @@ async fn rollback_on_drop_discards_changes() { let manifest_hash = ManifestHash::from_ref_unchecked( "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789", ); + let manifest_kind = ManifestKind::from_ref_unchecked("evm-rpc"); let manifest_path = ManifestPath::from_ref_unchecked("path/to/manifest-drop.json"); let mut tx = conn.begin_txn().await.expect("Failed to begin transaction"); // Make changes within transaction - manifests::register(&mut tx, &manifest_hash, manifest_path) + manifests::register(&mut tx, &manifest_hash, &manifest_kind, &manifest_path) .await .expect("Failed to register manifest in transaction"); datasets::link_manifest(&mut tx, &namespace, &name, &manifest_hash) diff --git a/crates/services/admin-api/Cargo.toml b/crates/services/admin-api/Cargo.toml index 35c6db301..dd87f638e 100644 --- a/crates/services/admin-api/Cargo.toml +++ b/crates/services/admin-api/Cargo.toml @@ -6,6 +6,7 @@ edition.workspace = true [dependencies] amp-data-store = { path = "../../core/data-store" } +amp-datasets-raw = { path = "../../core/datasets-raw", package = "datasets-raw" } amp-datasets-registry = { path = "../../core/datasets-registry" } amp-parquet = { path = "../../core/parquet" } amp-providers-common = { path = "../../core/providers-common" } @@ -19,14 +20,11 @@ common = { path = "../../core/common" } datafusion.workspace = true datasets-common = { path = "../../core/datasets-common" } datasets-derived = { path = "../../core/datasets-derived" } -evm-rpc-datasets = { path = "../../extractors/evm-rpc" } -firehose-datasets = { path = "../../extractors/firehose" } futures.workspace = true metadata-db = { path = "../../core/metadata-db" } monitoring = { path = "../../core/monitoring" } serde.workspace = true serde_json = { workspace = true, features = ["raw_value"] } -solana-datasets = { path = "../../extractors/solana" } thiserror.workspace = true tokio.workspace = true toml.workspace = true diff --git a/crates/services/admin-api/src/handlers/common.rs b/crates/services/admin-api/src/handlers/common.rs index c4f48c223..22ed33d0c 100644 --- a/crates/services/admin-api/src/handlers/common.rs +++ b/crates/services/admin-api/src/handlers/common.rs @@ -6,6 +6,7 @@ use std::{ }; use amp_data_store::{DataStore, PhyTableRevision}; +use amp_datasets_raw::dataset_kind::{EvmRpcDatasetKind, FirehoseDatasetKind, SolanaDatasetKind}; use amp_datasets_registry::error::ResolveRevisionError; use amp_parquet::footer::{AmpMetadataFromParquetError, amp_metadata_from_parquet_file}; use common::{ @@ -21,9 +22,11 @@ use common::{ udfs::eth_call::EthCallUdfsCache, }; use datafusion::sql::parser::Statement; -use datasets_common::{hash_reference::HashReference, table_name::TableName}; +use datasets_common::{ + dataset_kind_str::DatasetKindStr, hash_reference::HashReference, table_name::TableName, +}; use datasets_derived::{ - Manifest as DerivedDatasetManifest, + DerivedDatasetKind, Manifest as DerivedDatasetManifest, deps::{DepAlias, DepAliasOrSelfRef, DepAliasOrSelfRefError}, manifest::{TableInput, View}, sorting::{self, CyclicDepError}, @@ -124,105 +127,95 @@ impl InterTableDepError { } } -/// A string wrapper that ensures the value is not empty or whitespace-only -/// -/// This invariant-holding _new-type_ validates that strings contain at least one non-whitespace character. -/// Validation occurs during: -/// - JSON/serde deserialization -/// - Parsing from `&str` via `FromStr` +/// Manifest header extracted from a manifest JSON string. /// -/// ## Behavior -/// - Input strings are validated by checking if they contain non-whitespace characters after trimming -/// - Empty strings or whitespace-only strings are rejected with [`EmptyStringError`] -/// - The **original string is preserved** including any leading/trailing whitespace -/// - Once created, the string is guaranteed to contain at least one non-whitespace character -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] -#[cfg_attr(feature = "utoipa", schema(value_type = String))] -pub struct NonEmptyString(String); - -impl NonEmptyString { - /// Creates a new NonEmptyString without validation - /// - /// ## Safety - /// The caller must ensure that the string contains at least one non-whitespace character. - /// Passing an empty string or whitespace-only string violates the type's invariant and - /// may lead to undefined behavior in code that relies on this guarantee. - pub unsafe fn new_unchecked(value: String) -> Self { - Self(value) - } - - /// Returns a reference to the inner string value - pub fn as_str(&self) -> &str { - &self.0 - } - - /// Consumes the NonEmptyString and returns the inner String - pub fn into_inner(self) -> String { - self.0 - } +/// Deserializes only the `kind` field, ignoring all other fields. +/// Validates the kind against supported dataset kinds during deserialization. +#[derive(serde::Deserialize)] +pub struct ManifestHeader { + /// The dataset kind, validated during deserialization against supported kinds. + pub kind: DatasetKind, } -impl AsRef for NonEmptyString { - fn as_ref(&self) -> &str { - &self.0 - } +/// Enum of supported dataset kinds for handler dispatch. +/// +/// Centralizes the supported kinds so both registration handlers can use the same +/// dispatch logic without duplicating if-else chains against the ZST kind constants. +#[derive(Debug, Clone, Copy)] +pub enum DatasetKind { + /// Derived datasets that transform data from other datasets using SQL queries. + Derived, + /// Raw datasets that extract blockchain data from Ethereum-compatible JSON-RPC endpoints. + EvmRpc, + /// Raw datasets that extract blockchain data from Solana RPC endpoints. + Solana, + /// Raw datasets that stream blockchain data from StreamingFast Firehose protocol. + Firehose, } -impl AsRef<[u8]> for NonEmptyString { - fn as_ref(&self) -> &[u8] { - self.0.as_bytes() +impl DatasetKind { + /// Returns the canonical string representation of this kind. + pub fn as_str(&self) -> &'static str { + match self { + Self::Derived => DerivedDatasetKind.as_str(), + Self::EvmRpc => EvmRpcDatasetKind.as_str(), + Self::Solana => SolanaDatasetKind.as_str(), + Self::Firehose => FirehoseDatasetKind.as_str(), + } } } -impl std::ops::Deref for NonEmptyString { - type Target = str; - - fn deref(&self) -> &Self::Target { - &self.0 +impl From for DatasetKindStr { + fn from(kind: DatasetKind) -> Self { + // SAFETY: DatasetKind variants map to validated ZST kind types; as_str() returns + // a valid dataset kind string. + Self::new_unchecked(kind.to_string()) } } -impl std::fmt::Display for NonEmptyString { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&self.0, f) +impl TryFrom<&DatasetKindStr> for DatasetKind { + type Error = UnsupportedDatasetKindError; + + fn try_from(kind: &DatasetKindStr) -> Result { + if *kind == DerivedDatasetKind { + Ok(Self::Derived) + } else if *kind == EvmRpcDatasetKind { + Ok(Self::EvmRpc) + } else if *kind == SolanaDatasetKind { + Ok(Self::Solana) + } else if *kind == FirehoseDatasetKind { + Ok(Self::Firehose) + } else { + Err(UnsupportedDatasetKindError(kind.to_string())) + } } } -impl std::str::FromStr for NonEmptyString { - type Err = EmptyStringError; - - fn from_str(s: &str) -> Result { - if s.trim().is_empty() { - return Err(EmptyStringError); - } - Ok(NonEmptyString(s.to_string())) +impl std::fmt::Display for DatasetKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) } } -impl<'de> serde::Deserialize<'de> for NonEmptyString { +impl<'de> serde::Deserialize<'de> for DatasetKind { fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, { - let value = String::deserialize(deserializer)?; - value.parse().map_err(serde::de::Error::custom) + let s = String::deserialize(deserializer)?; + let kind_str = DatasetKindStr::new_unchecked(s); + DatasetKind::try_from(&kind_str).map_err(serde::de::Error::custom) } } -impl serde::Serialize for NonEmptyString { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - self.0.serialize(serializer) - } -} - -/// Error type for NonEmptyString parsing failures -#[derive(Debug, Clone, Copy, thiserror::Error)] -#[error("string cannot be empty or whitespace-only")] -pub struct EmptyStringError; +/// Error returned when a dataset kind string does not match any supported kind. +/// +/// This error occurs when `TryFrom<&DatasetKindStr>` encounters a kind string +/// that is not one of the recognized dataset kinds (derived, evm-rpc, solana, firehose). +/// During deserialization, this surfaces as a serde error via `Deserialize` for `DatasetKind`. +#[derive(Debug, thiserror::Error)] +#[error("unsupported dataset kind: '{0}'")] +pub struct UnsupportedDatasetKindError(String); /// Parse, validate, and re-serialize a derived dataset manifest to canonical JSON format /// @@ -231,9 +224,9 @@ pub struct EmptyStringError; /// 2. Validate manifest using dataset store (SQL, dependencies, tables, functions) /// 3. Re-serialize to canonical JSON pub async fn parse_and_canonicalize_derived_dataset_manifest( - manifest_str: impl AsRef, datasets_cache: &DatasetsCache, ethcall_udfs_cache: &EthCallUdfsCache, + manifest_str: impl AsRef, ) -> Result { let manifest: DerivedDatasetManifest = serde_json::from_str(manifest_str.as_ref()) .map_err(ParseDerivedManifestError::Deserialization)?; diff --git a/crates/services/admin-api/src/handlers/datasets/register.rs b/crates/services/admin-api/src/handlers/datasets/register.rs index bc4edeecb..34aa43849 100644 --- a/crates/services/admin-api/src/handlers/datasets/register.rs +++ b/crates/services/admin-api/src/handlers/datasets/register.rs @@ -1,3 +1,4 @@ +use amp_datasets_raw::manifest::{EvmRpcManifest, FirehoseManifest, SolanaManifest}; use amp_datasets_registry::error::{LinkManifestError, RegisterManifestError, SetVersionTagError}; use axum::{ Json, @@ -9,25 +10,20 @@ use datasets_common::{ dataset_kind_str::DatasetKindStr, hash::{Hash, hash}, hash_reference::HashReference, - manifest::Manifest as CommonManifest, name::Name, namespace::Namespace, table_name::TableName, version::Version, }; -use datasets_derived::DerivedDatasetKind; -use evm_rpc_datasets::{EvmRpcDatasetKind, Manifest as EvmRpcManifest}; -use firehose_datasets::{FirehoseDatasetKind, Manifest as FirehoseManifest}; use monitoring::logging; use serde_json::value::RawValue; -use solana_datasets::{Manifest as SolanaManifest, SolanaDatasetKind}; use crate::{ ctx::Ctx, handlers::{ common::{ - ManifestValidationError, ParseDerivedManifestError, ParseRawManifestError, - parse_and_canonicalize_derived_dataset_manifest, + DatasetKind, ManifestHeader, ManifestValidationError, ParseDerivedManifestError, + ParseRawManifestError, parse_and_canonicalize_derived_dataset_manifest, parse_and_canonicalize_raw_dataset_manifest, }, error::{ErrorResponse, IntoErrorResponse}, @@ -63,7 +59,6 @@ use crate::{ /// - `MANIFEST_LINKING_ERROR`: Failed to link manifest to dataset /// - `MANIFEST_NOT_FOUND`: Manifest hash provided but manifest doesn't exist /// - `VERSION_TAGGING_ERROR`: Failed to tag the manifest with the version -/// - `UNSUPPORTED_DATASET_KIND`: Dataset kind is not supported /// /// ## Behavior /// This handler supports multiple dataset kinds for registration: @@ -164,59 +159,57 @@ pub async fn handler( "Received manifest content, validating and storing" ); - let manifest = - serde_json::from_str::(manifest_content.get()).map_err(|err| { + let header = + serde_json::from_str::(manifest_content.get()).map_err(|err| { tracing::error!( namespace = %namespace, name = %name, version = ?version, error = %err, error_source = logging::error_source(&err), - "Failed to parse common manifest JSON" + "Failed to parse manifest JSON" ); Error::InvalidManifest(err) })?; + let kind_str = DatasetKindStr::from(header.kind); + // Validate and serialize manifest based on dataset kind - let manifest_canonical = if manifest.kind == DerivedDatasetKind { - parse_and_canonicalize_derived_dataset_manifest( - manifest_content.get(), - &ctx.datasets_cache, - &ctx.ethcall_udfs_cache, - ) - .await - .map_err(Error::from)? - } else if manifest.kind == EvmRpcDatasetKind { - parse_and_canonicalize_raw_dataset_manifest::( - manifest_content.get(), - ) - .map_err(Error::from)? - } else if manifest.kind == FirehoseDatasetKind { - parse_and_canonicalize_raw_dataset_manifest::( - manifest_content.get(), - ) - .map_err(Error::from)? - } else if manifest.kind == SolanaDatasetKind { - parse_and_canonicalize_raw_dataset_manifest::( - manifest_content.get(), - ) - .map_err(Error::from)? - } else { - return Err(Error::UnsupportedDatasetKind(manifest.kind.to_string()).into()); - }; + let manifest_canonical = + match header.kind { + DatasetKind::Derived => parse_and_canonicalize_derived_dataset_manifest( + &ctx.datasets_cache, + &ctx.ethcall_udfs_cache, + manifest_content.get(), + ) + .await + .map_err(Error::from)?, + DatasetKind::EvmRpc => parse_and_canonicalize_raw_dataset_manifest::< + EvmRpcManifest, + >(manifest_content.get()) + .map_err(Error::from)?, + DatasetKind::Firehose => parse_and_canonicalize_raw_dataset_manifest::< + FirehoseManifest, + >(manifest_content.get()) + .map_err(Error::from)?, + DatasetKind::Solana => parse_and_canonicalize_raw_dataset_manifest::< + SolanaManifest, + >(manifest_content.get()) + .map_err(Error::from)?, + }; // Compute manifest hash from canonical serialization let manifest_hash = hash(&manifest_canonical); // Register manifest (store in object store + metadata DB) ctx.datasets_registry - .register_manifest(&manifest_hash, manifest_canonical) + .register_manifest(&manifest_hash, &kind_str, manifest_canonical) .await .map_err(|err| { tracing::error!( namespace = %namespace, name = %name, manifest_hash = %manifest_hash, - kind = %manifest.kind, + kind = %kind_str, error = %err, error_source = logging::error_source(&err), "Failed to register manifest" ); @@ -227,7 +220,7 @@ pub async fn handler( namespace = %namespace, name = %name, manifest_hash = %manifest_hash, - kind = %manifest.kind, + kind = %kind_str, "Manifest registered, will link to dataset" ); @@ -506,15 +499,6 @@ pub enum Error { #[error("Failed to set version tag")] VersionTaggingError(#[source] SetVersionTagError), - /// Unsupported dataset kind - /// - /// This occurs when: - /// - Dataset kind is not one of the supported types (manifest, evm-rpc, firehose) - #[error( - "unsupported kind '{0}' - supported kinds: 'manifest' (derived), 'evm-rpc', 'firehose'" - )] - UnsupportedDatasetKind(String), - /// Manifest not found /// /// This occurs when: @@ -537,7 +521,6 @@ impl IntoErrorResponse for Error { Error::ManifestRegistrationError(_) => "MANIFEST_REGISTRATION_ERROR", Error::ManifestLinkingError(_) => "MANIFEST_LINKING_ERROR", Error::VersionTaggingError(_) => "VERSION_TAGGING_ERROR", - Error::UnsupportedDatasetKind(_) => "UNSUPPORTED_DATASET_KIND", Error::ManifestNotFound(_) => "MANIFEST_NOT_FOUND", Error::GetDataset(_) => "GET_DATASET_ERROR", } @@ -551,7 +534,6 @@ impl IntoErrorResponse for Error { Error::ManifestRegistrationError(_) => StatusCode::INTERNAL_SERVER_ERROR, Error::ManifestLinkingError(_) => StatusCode::INTERNAL_SERVER_ERROR, Error::VersionTaggingError(_) => StatusCode::INTERNAL_SERVER_ERROR, - Error::UnsupportedDatasetKind(_) => StatusCode::BAD_REQUEST, Error::ManifestNotFound(_) => StatusCode::NOT_FOUND, Error::GetDataset(_) => StatusCode::INTERNAL_SERVER_ERROR, } diff --git a/crates/services/admin-api/src/handlers/manifests/list_all.rs b/crates/services/admin-api/src/handlers/manifests/list_all.rs index 4e8b41fea..4be197ae0 100644 --- a/crates/services/admin-api/src/handlers/manifests/list_all.rs +++ b/crates/services/admin-api/src/handlers/manifests/list_all.rs @@ -21,7 +21,8 @@ use crate::{ /// This handler returns a comprehensive list of all manifests registered in the system. /// For each manifest, it includes: /// - The content-addressable hash (SHA-256) -/// - The object store path where the manifest is stored +/// - The dataset kind (e.g., "evm-rpc", "solana", "firehose", "manifest") +/// - The number of datasets using this manifest /// /// Results are ordered by hash (lexicographical). #[tracing::instrument(skip_all, err)] @@ -51,6 +52,7 @@ pub async fn handler(State(ctx): State) -> Result, .into_iter() .map(|summary| ManifestInfo { hash: Hash::from(summary.hash), + kind: summary.kind.into_inner(), dataset_count: summary.dataset_count as u64, }) .collect(); @@ -73,6 +75,8 @@ pub struct ManifestInfo { /// Content-addressable hash (SHA-256) #[cfg_attr(feature = "utoipa", schema(value_type = String))] pub hash: Hash, + /// Dataset kind (e.g. "evm-rpc", "solana", "firehose", "manifest") + pub kind: String, /// Number of datasets using this manifest pub dataset_count: u64, } diff --git a/crates/services/admin-api/src/handlers/manifests/register.rs b/crates/services/admin-api/src/handlers/manifests/register.rs index 38ac85c45..c92cef457 100644 --- a/crates/services/admin-api/src/handlers/manifests/register.rs +++ b/crates/services/admin-api/src/handlers/manifests/register.rs @@ -1,5 +1,6 @@ //! Manifests register handler +use amp_datasets_raw::manifest::{EvmRpcManifest, FirehoseManifest, SolanaManifest}; use amp_datasets_registry::{error::RegisterManifestError, manifests::StoreError}; use axum::{ Json, @@ -7,21 +8,17 @@ use axum::{ http::StatusCode, }; use datasets_common::{ + dataset_kind_str::DatasetKindStr, hash::{Hash, hash}, - manifest::Manifest as CommonManifest, }; -use datasets_derived::DerivedDatasetKind; -use evm_rpc_datasets::{EvmRpcDatasetKind, Manifest as EvmRpcManifest}; -use firehose_datasets::{FirehoseDatasetKind, Manifest as FirehoseManifest}; use monitoring::logging; -use solana_datasets::{Manifest as SolanaManifest, SolanaDatasetKind}; use crate::{ ctx::Ctx, handlers::{ common::{ - ManifestValidationError, ParseDerivedManifestError, ParseRawManifestError, - parse_and_canonicalize_derived_dataset_manifest, + DatasetKind, ManifestHeader, ManifestValidationError, ParseDerivedManifestError, + ParseRawManifestError, parse_and_canonicalize_derived_dataset_manifest, parse_and_canonicalize_raw_dataset_manifest, }, error::{ErrorResponse, IntoErrorResponse}, @@ -48,7 +45,6 @@ use crate::{ /// - `INVALID_PAYLOAD_FORMAT`: Request JSON is malformed or invalid /// - `INVALID_MANIFEST`: Manifest JSON parsing or structure error /// - `MANIFEST_VALIDATION_ERROR`: Manifest validation failed (derived datasets only) -/// - `UNSUPPORTED_DATASET_KIND`: Dataset kind is not supported /// - `MANIFEST_STORAGE_ERROR`: Failed to store manifest in object store /// - `MANIFEST_REGISTRATION_ERROR`: Failed to register manifest in metadata database /// @@ -98,32 +94,35 @@ pub async fn handler( Error::InvalidManifest(err) })?; - // Parse as CommonManifest to extract kind - let manifest = serde_json::from_str::(&manifest_str).map_err(|err| { + // Parse ManifestHeader to extract validated kind + let header = serde_json::from_str::(&manifest_str).map_err(|err| { tracing::error!(error = %err, error_source = logging::error_source(&err), "failed to parse manifest"); Error::InvalidManifest(err) })?; + let kind_str = DatasetKindStr::from(header.kind); + // Validate and serialize manifest based on dataset kind - let canonical_manifest_str = if manifest.kind == DerivedDatasetKind { - parse_and_canonicalize_derived_dataset_manifest( - &manifest_str, + let canonical_manifest_str = match header.kind { + DatasetKind::Derived => parse_and_canonicalize_derived_dataset_manifest( &ctx.datasets_cache, &ctx.ethcall_udfs_cache, + &manifest_str, ) .await - .map_err(Error::from)? - } else if manifest.kind == EvmRpcDatasetKind { - parse_and_canonicalize_raw_dataset_manifest::(&manifest_str) - .map_err(Error::from)? - } else if manifest.kind == SolanaDatasetKind { - parse_and_canonicalize_raw_dataset_manifest::(&manifest_str) - .map_err(Error::from)? - } else if manifest.kind == FirehoseDatasetKind { - parse_and_canonicalize_raw_dataset_manifest::(&manifest_str) - .map_err(Error::from)? - } else { - return Err(Error::UnsupportedDatasetKind(manifest.kind.to_string()).into()); + .map_err(Error::from)?, + DatasetKind::EvmRpc => { + parse_and_canonicalize_raw_dataset_manifest::(&manifest_str) + .map_err(Error::from)? + } + DatasetKind::Solana => { + parse_and_canonicalize_raw_dataset_manifest::(&manifest_str) + .map_err(Error::from)? + } + DatasetKind::Firehose => { + parse_and_canonicalize_raw_dataset_manifest::(&manifest_str) + .map_err(Error::from)? + } }; // Compute manifest hash from canonical serialization @@ -133,12 +132,12 @@ pub async fn handler( // This does NOT link to any dataset or create version tags if let Err(err) = ctx .datasets_registry - .register_manifest(&hash, canonical_manifest_str) + .register_manifest(&hash, &kind_str, canonical_manifest_str) .await { tracing::error!( manifest_hash = %hash, - kind = %manifest.kind, + kind = %kind_str, error = %err, error_source = logging::error_source(&err), "failed to register manifest" ); @@ -151,8 +150,8 @@ pub async fn handler( tracing::info!( manifest_hash = %hash, - kind = %manifest.kind, - "manifest registered successfully" + kind = %kind_str, + "manifest registered" ); Ok((StatusCode::CREATED, Json(RegisterManifestResponse { hash }))) @@ -198,16 +197,6 @@ pub enum Error { #[error("manifest validation error")] ManifestValidationError(#[source] ManifestValidationError), - /// Unsupported dataset kind - /// - /// This occurs when: - /// - Dataset kind is not one of the supported types (manifest, evm-rpc, firehose) - /// - The 'kind' field in the manifest contains an unrecognized value - #[error( - "unsupported kind '{0}' - supported kinds: 'manifest' (derived), 'evm-rpc', 'firehose'" - )] - UnsupportedDatasetKind(String), - /// Failed to write manifest to object store /// /// This occurs when: @@ -235,7 +224,6 @@ impl IntoErrorResponse for Error { Error::InvalidPayloadFormat => "INVALID_PAYLOAD_FORMAT", Error::InvalidManifest(_) => "INVALID_MANIFEST", Error::ManifestValidationError(_) => "MANIFEST_VALIDATION_ERROR", - Error::UnsupportedDatasetKind(_) => "UNSUPPORTED_DATASET_KIND", Error::ObjectStoreWriteError(_) => "MANIFEST_STORAGE_ERROR", Error::MetadataDbError(_) => "MANIFEST_REGISTRATION_ERROR", } @@ -246,7 +234,6 @@ impl IntoErrorResponse for Error { Error::InvalidPayloadFormat => StatusCode::BAD_REQUEST, Error::InvalidManifest(_) => StatusCode::BAD_REQUEST, Error::ManifestValidationError(_) => StatusCode::BAD_REQUEST, - Error::UnsupportedDatasetKind(_) => StatusCode::BAD_REQUEST, Error::ObjectStoreWriteError(_) => StatusCode::INTERNAL_SERVER_ERROR, Error::MetadataDbError(_) => StatusCode::INTERNAL_SERVER_ERROR, } diff --git a/docs/schemas/openapi/admin.spec.json b/docs/schemas/openapi/admin.spec.json index a23de4cf2..e8a0a3de1 100644 --- a/docs/schemas/openapi/admin.spec.json +++ b/docs/schemas/openapi/admin.spec.json @@ -45,7 +45,7 @@ "datasets" ], "summary": "Handler for the `POST /datasets` endpoint", - "description": "Registers a new dataset configuration in the server's local registry. Accepts a JSON payload\ncontaining the dataset registration configuration.\n\n**Note**: This endpoint only registers datasets and does NOT schedule data extraction.\nTo extract data after registration, make a separate call to:\n- `POST /datasets/{namespace}/{name}/versions/dev/deploy` - for dev tag\n- `POST /datasets/{namespace}/{name}/versions/latest/deploy` - for latest tag\n- `POST /datasets/{namespace}/{name}/versions/{version}/deploy` - for specific version\n\n## Request Body\n- `dataset_name`: Name of the dataset to be registered (must be valid dataset name)\n- `version`: Optional version of the dataset to register. If omitted, only the \"dev\" tag is updated.\n- `manifest`: JSON string representation of the dataset manifest\n\n## Response\n- **201 Created**: Dataset successfully registered (or updated if version tag already exists)\n- **400 Bad Request**: Invalid dataset name, version, or manifest format\n- **500 Internal Server Error**: Database or object store error\n\n## Error Codes\n- `INVALID_PAYLOAD_FORMAT`: Request JSON is malformed or invalid\n- `INVALID_MANIFEST`: Manifest JSON parsing or structure error\n- `MANIFEST_VALIDATION_ERROR`: Manifest validation failed (SQL queries invalid, undeclared dependencies, etc.)\n- `MANIFEST_REGISTRATION_ERROR`: Failed to register manifest in system\n- `MANIFEST_LINKING_ERROR`: Failed to link manifest to dataset\n- `MANIFEST_NOT_FOUND`: Manifest hash provided but manifest doesn't exist\n- `VERSION_TAGGING_ERROR`: Failed to tag the manifest with the version\n- `UNSUPPORTED_DATASET_KIND`: Dataset kind is not supported\n\n## Behavior\nThis handler supports multiple dataset kinds for registration:\n- **Derived dataset** (kind=\"manifest\"): Registers a derived dataset manifest that transforms data from other datasets using SQL queries\n- **EVM-RPC dataset** (kind=\"evm-rpc\"): Registers a raw dataset that extracts blockchain data directly from Ethereum-compatible JSON-RPC endpoints\n- **Firehose dataset** (kind=\"firehose\"): Registers a raw dataset that streams blockchain data from StreamingFast Firehose protocol\n- **Legacy SQL datasets** are **not supported** and will return an error\n\n## Registration Process\nThe registration process involves two or three steps depending on whether a version is provided:\n1. **Register or validate manifest**: Either stores a new manifest in hash-based storage and creates\n a metadata database entry, or validates that a provided manifest hash exists in the system\n2. **Link manifest to dataset**: Links the manifest to the dataset namespace/name and automatically\n updates the \"dev\" tag to point to this manifest (performed in a transaction for atomicity)\n3. **Tag version** (optional): If a version is provided, associates the version identifier with the\n manifest hash, and updates the \"latest\" tag if this version is higher than the current latest\n\nThis approach enables:\n- Content-addressable storage by manifest hash\n- Deduplication of identical manifests\n- Separation of manifest storage, dataset linking, and version management\n- Development workflow: register without version to only update \"dev\" tag via linking\n- Release workflow: register with version to create semantic version tags and update \"latest\"\n- Reuse workflow: provide manifest hash to link existing manifest without re-registering it\n\nAll operations are idempotent:\n- **Manifest registration**: If the manifest already exists (same hash), the operation succeeds without changes\n- **Manifest linking**: If the manifest is already linked to the dataset, the operation succeeds without changes\n- **Dev tag update**: The dev tag is always updated to point to the linked manifest (last-write-wins)\n- **Version tag**: If the version tag doesn't exist, it is created; if it exists with the same hash, no changes;\n if it exists with a different hash, it is updated to point to the new hash\n- **Latest tag**: Automatically updated only if the new version is higher than the current latest version\n\nThe handler:\n- Validates dataset name and version format\n- Checks that dataset kind is supported\n- Registers/validates the manifest, links it to the dataset, and optionally tags it with a version\n- Returns appropriate status codes and error messages\n\n## Typical Workflow\nFor users wanting both registration and data extraction:\n1. `POST /datasets` - Register the dataset (this endpoint)\n2. `POST /datasets/{namespace}/{name}/versions/{version}/deploy` - Schedule data extraction", + "description": "Registers a new dataset configuration in the server's local registry. Accepts a JSON payload\ncontaining the dataset registration configuration.\n\n**Note**: This endpoint only registers datasets and does NOT schedule data extraction.\nTo extract data after registration, make a separate call to:\n- `POST /datasets/{namespace}/{name}/versions/dev/deploy` - for dev tag\n- `POST /datasets/{namespace}/{name}/versions/latest/deploy` - for latest tag\n- `POST /datasets/{namespace}/{name}/versions/{version}/deploy` - for specific version\n\n## Request Body\n- `dataset_name`: Name of the dataset to be registered (must be valid dataset name)\n- `version`: Optional version of the dataset to register. If omitted, only the \"dev\" tag is updated.\n- `manifest`: JSON string representation of the dataset manifest\n\n## Response\n- **201 Created**: Dataset successfully registered (or updated if version tag already exists)\n- **400 Bad Request**: Invalid dataset name, version, or manifest format\n- **500 Internal Server Error**: Database or object store error\n\n## Error Codes\n- `INVALID_PAYLOAD_FORMAT`: Request JSON is malformed or invalid\n- `INVALID_MANIFEST`: Manifest JSON parsing or structure error\n- `MANIFEST_VALIDATION_ERROR`: Manifest validation failed (SQL queries invalid, undeclared dependencies, etc.)\n- `MANIFEST_REGISTRATION_ERROR`: Failed to register manifest in system\n- `MANIFEST_LINKING_ERROR`: Failed to link manifest to dataset\n- `MANIFEST_NOT_FOUND`: Manifest hash provided but manifest doesn't exist\n- `VERSION_TAGGING_ERROR`: Failed to tag the manifest with the version\n\n## Behavior\nThis handler supports multiple dataset kinds for registration:\n- **Derived dataset** (kind=\"manifest\"): Registers a derived dataset manifest that transforms data from other datasets using SQL queries\n- **EVM-RPC dataset** (kind=\"evm-rpc\"): Registers a raw dataset that extracts blockchain data directly from Ethereum-compatible JSON-RPC endpoints\n- **Firehose dataset** (kind=\"firehose\"): Registers a raw dataset that streams blockchain data from StreamingFast Firehose protocol\n- **Legacy SQL datasets** are **not supported** and will return an error\n\n## Registration Process\nThe registration process involves two or three steps depending on whether a version is provided:\n1. **Register or validate manifest**: Either stores a new manifest in hash-based storage and creates\n a metadata database entry, or validates that a provided manifest hash exists in the system\n2. **Link manifest to dataset**: Links the manifest to the dataset namespace/name and automatically\n updates the \"dev\" tag to point to this manifest (performed in a transaction for atomicity)\n3. **Tag version** (optional): If a version is provided, associates the version identifier with the\n manifest hash, and updates the \"latest\" tag if this version is higher than the current latest\n\nThis approach enables:\n- Content-addressable storage by manifest hash\n- Deduplication of identical manifests\n- Separation of manifest storage, dataset linking, and version management\n- Development workflow: register without version to only update \"dev\" tag via linking\n- Release workflow: register with version to create semantic version tags and update \"latest\"\n- Reuse workflow: provide manifest hash to link existing manifest without re-registering it\n\nAll operations are idempotent:\n- **Manifest registration**: If the manifest already exists (same hash), the operation succeeds without changes\n- **Manifest linking**: If the manifest is already linked to the dataset, the operation succeeds without changes\n- **Dev tag update**: The dev tag is always updated to point to the linked manifest (last-write-wins)\n- **Version tag**: If the version tag doesn't exist, it is created; if it exists with the same hash, no changes;\n if it exists with a different hash, it is updated to point to the new hash\n- **Latest tag**: Automatically updated only if the new version is higher than the current latest version\n\nThe handler:\n- Validates dataset name and version format\n- Checks that dataset kind is supported\n- Registers/validates the manifest, links it to the dataset, and optionally tags it with a version\n- Returns appropriate status codes and error messages\n\n## Typical Workflow\nFor users wanting both registration and data extraction:\n1. `POST /datasets` - Register the dataset (this endpoint)\n2. `POST /datasets/{namespace}/{name}/versions/{version}/deploy` - Schedule data extraction", "operationId": "datasets_register", "requestBody": { "content": { @@ -1200,7 +1200,7 @@ "manifests" ], "summary": "Handler for the `GET /manifests` endpoint", - "description": "Returns all registered manifests in the system.\n\n## Response\n- **200 OK**: Successfully retrieved all manifests\n- **500 Internal Server Error**: Database query error\n\n## Error Codes\n- `LIST_ALL_MANIFESTS_ERROR`: Failed to list all manifests from metadata database\n\n## Behavior\nThis handler returns a comprehensive list of all manifests registered in the system.\nFor each manifest, it includes:\n- The content-addressable hash (SHA-256)\n- The object store path where the manifest is stored\n\nResults are ordered by hash (lexicographical).", + "description": "Returns all registered manifests in the system.\n\n## Response\n- **200 OK**: Successfully retrieved all manifests\n- **500 Internal Server Error**: Database query error\n\n## Error Codes\n- `LIST_ALL_MANIFESTS_ERROR`: Failed to list all manifests from metadata database\n\n## Behavior\nThis handler returns a comprehensive list of all manifests registered in the system.\nFor each manifest, it includes:\n- The content-addressable hash (SHA-256)\n- The dataset kind (e.g., \"evm-rpc\", \"solana\", \"firehose\", \"manifest\")\n- The number of datasets using this manifest\n\nResults are ordered by hash (lexicographical).", "operationId": "list_all_manifests", "responses": { "200": { @@ -1230,7 +1230,7 @@ "manifests" ], "summary": "Handler for the `POST /manifests` endpoint", - "description": "Registers a new manifest in content-addressable storage without linking to any dataset or creating version tags.\nThis endpoint is useful for pre-registering manifests before associating them with specific datasets.\n\n## Request Body\nThe request body should contain a complete manifest JSON object. The manifest kind determines\nthe validation rules:\n- `kind=\"manifest\"` (Derived): Validates SQL dependencies\n- `kind=\"evm-rpc\"`, `kind=\"firehose\"`, `kind=\"solana\"` (Raw): Validates structure only\n\n## Response\n- **201 Created**: Manifest successfully registered, returns the computed hash\n- **400 Bad Request**: Invalid JSON format, unsupported kind, or validation failure\n- **500 Internal Server Error**: Manifest store error\n\n## Error Codes\n- `INVALID_PAYLOAD_FORMAT`: Request JSON is malformed or invalid\n- `INVALID_MANIFEST`: Manifest JSON parsing or structure error\n- `MANIFEST_VALIDATION_ERROR`: Manifest validation failed (derived datasets only)\n- `UNSUPPORTED_DATASET_KIND`: Dataset kind is not supported\n- `MANIFEST_STORAGE_ERROR`: Failed to store manifest in object store\n- `MANIFEST_REGISTRATION_ERROR`: Failed to register manifest in metadata database\n\n## Registration Process\nUnlike `POST /datasets`, this endpoint performs minimal registration:\n1. **Parse and validate**: Validates manifest structure and dependencies (for derived datasets)\n2. **Canonicalize**: Re-serializes manifest to canonical JSON format\n3. **Compute hash**: Generates content hash from canonical JSON\n4. **Store manifest**: Writes to object store and registers in metadata database\n\nThis handler:\n- Validates and extracts the manifest JSON from the request body\n- Parses and validates based on dataset kind\n- Stores the manifest in content-addressable storage\n- Returns the computed manifest hash", + "description": "Registers a new manifest in content-addressable storage without linking to any dataset or creating version tags.\nThis endpoint is useful for pre-registering manifests before associating them with specific datasets.\n\n## Request Body\nThe request body should contain a complete manifest JSON object. The manifest kind determines\nthe validation rules:\n- `kind=\"manifest\"` (Derived): Validates SQL dependencies\n- `kind=\"evm-rpc\"`, `kind=\"firehose\"`, `kind=\"solana\"` (Raw): Validates structure only\n\n## Response\n- **201 Created**: Manifest successfully registered, returns the computed hash\n- **400 Bad Request**: Invalid JSON format, unsupported kind, or validation failure\n- **500 Internal Server Error**: Manifest store error\n\n## Error Codes\n- `INVALID_PAYLOAD_FORMAT`: Request JSON is malformed or invalid\n- `INVALID_MANIFEST`: Manifest JSON parsing or structure error\n- `MANIFEST_VALIDATION_ERROR`: Manifest validation failed (derived datasets only)\n- `MANIFEST_STORAGE_ERROR`: Failed to store manifest in object store\n- `MANIFEST_REGISTRATION_ERROR`: Failed to register manifest in metadata database\n\n## Registration Process\nUnlike `POST /datasets`, this endpoint performs minimal registration:\n1. **Parse and validate**: Validates manifest structure and dependencies (for derived datasets)\n2. **Canonicalize**: Re-serializes manifest to canonical JSON format\n3. **Compute hash**: Generates content hash from canonical JSON\n4. **Store manifest**: Writes to object store and registers in metadata database\n\nThis handler:\n- Validates and extracts the manifest JSON from the request body\n- Parses and validates based on dataset kind\n- Stores the manifest in content-addressable storage\n- Returns the computed manifest hash", "operationId": "manifests_register", "requestBody": { "content": { @@ -2859,6 +2859,7 @@ "description": "Summary information for a single manifest", "required": [ "hash", + "kind", "dataset_count" ], "properties": { @@ -2871,6 +2872,10 @@ "hash": { "type": "string", "description": "Content-addressable hash (SHA-256)" + }, + "kind": { + "type": "string", + "description": "Dataset kind (e.g. \"evm-rpc\", \"solana\", \"firehose\", \"manifest\")" } } }, diff --git a/typescript/amp/src/api/Admin.ts b/typescript/amp/src/api/Admin.ts index fe6b6d9da..1722d5991 100644 --- a/typescript/amp/src/api/Admin.ts +++ b/typescript/amp/src/api/Admin.ts @@ -51,7 +51,6 @@ const registerDataset = HttpApiEndpoint.post("registerDataset")`/datasets` .addError(Error.InvalidPayloadFormat) .addError(Error.InvalidManifest) .addError(Error.ManifestValidationError) - .addError(Error.UnsupportedDatasetKind) .addError(Error.ManifestRegistrationError) .addError(Error.ManifestLinkingError) .addError(Error.VersionTaggingError) @@ -66,7 +65,6 @@ const registerDataset = HttpApiEndpoint.post("registerDataset")`/datasets` * - InvalidPayloadFormat: Request JSON is malformed or invalid. * - InvalidManifest: Manifest JSON is malformed or structurally invalid. * - ManifestValidationError: Manifest validation error (e.g., non-incremental operations). - * - UnsupportedDatasetKind: Dataset kind is not supported. * - ManifestRegistrationError: Failed to register manifest in system. * - ManifestLinkingError: Failed to link manifest to dataset. * - VersionTaggingError: Failed to tag version for the dataset. @@ -77,7 +75,6 @@ export type RegisterDatasetError = | Error.InvalidPayloadFormat | Error.InvalidManifest | Error.ManifestValidationError - | Error.UnsupportedDatasetKind | Error.ManifestRegistrationError | Error.ManifestLinkingError | Error.VersionTaggingError diff --git a/typescript/amp/src/api/Error.ts b/typescript/amp/src/api/Error.ts index e0ba826b1..6c671e341 100644 --- a/typescript/amp/src/api/Error.ts +++ b/typescript/amp/src/api/Error.ts @@ -434,29 +434,6 @@ export class ManifestValidationError extends Schema.Class("UnsupportedDatasetKind")( - { - code: Schema.Literal("UNSUPPORTED_DATASET_KIND").pipe(Schema.propertySignature, Schema.fromKey("error_code")), - message: Schema.String.pipe(Schema.propertySignature, Schema.fromKey("error_message")), - }, - { - [HttpApiSchema.AnnotationStatus]: 400, - }, -) { - readonly _tag = "UnsupportedDatasetKind" as const -} - /** * ManifestNotFound - Manifest with the provided hash not found. *