Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion crates/core/common/src/catalog/physical/for_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! then builds the catalog from resolved entries.

use std::{
collections::{BTreeMap, btree_map::Entry},
collections::{BTreeMap, BTreeSet, btree_map::Entry},
sync::Arc,
};

Expand Down Expand Up @@ -135,11 +135,27 @@ pub async fn resolve_external_deps(
.get_table(table_name)
.expect("table validated in Phase 1");

// Resolve networks: raw tables have an intrinsic network; derived tables
// need resolution from the transitive dependency chain.
let networks = match table_def.network() {
Some(id) => BTreeSet::from([id.clone()]),
None => crate::datasets_cache::resolve_dataset_networks(
datasets_cache,
Arc::clone(&dataset),
)
.await
.map_err(|err| CreateCatalogError::ResolveNetworks {
dataset: dataset_ref.clone(),
source: err,
})?,
};

let physical_table = PhysicalTable::from_revision(
data_store.clone(),
table.dataset_reference().clone(),
dataset.start_block(),
Arc::clone(table_def),
networks,
revision,
);
entries.push(ResolvedTableEntry {
Expand Down Expand Up @@ -257,4 +273,16 @@ pub enum CreateCatalogError {
#[source]
source: GetDatasetError,
},

/// Failed to resolve networks from dependency chain.
///
/// This occurs when traversing a derived dataset's dependencies fails
/// to resolve the set of networks.
#[error("Failed to resolve networks for dataset {dataset}")]
ResolveNetworks {
/// The hash reference of the dataset
dataset: HashReference,
#[source]
source: crate::datasets_cache::DependencyTraversalError,
},
}
28 changes: 28 additions & 0 deletions crates/core/common/src/catalog/physical/for_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,27 @@ pub async fn create(
table: (**table).clone(),
})?;

// Resolve networks: raw tables have an intrinsic network; derived tables
// need resolution from the transitive dependency chain.
let networks = match dataset_table.network() {
Some(id) => BTreeSet::from([id.clone()]),
None => crate::datasets_cache::resolve_dataset_networks(
datasets_cache,
Arc::clone(&dataset),
)
.await
.map_err(|err| CreateCatalogError::ResolveNetworks {
dataset: hash_ref.clone(),
source: err,
})?,
};

let physical_table = PhysicalTable::from_revision(
data_store.clone(),
hash_ref,
dataset.start_block(),
dataset_table.clone(),
networks,
revision,
);
entries.push((
Expand Down Expand Up @@ -181,4 +197,16 @@ pub enum CreateCatalogError {
#[source]
source: GetDatasetError,
},

/// Failed to resolve networks from dependency chain.
///
/// This occurs when traversing a derived dataset's dependencies fails
/// to resolve the set of networks.
#[error("Failed to resolve networks for dataset {dataset}")]
ResolveNetworks {
/// The hash reference of the dataset
dataset: HashReference,
#[source]
source: crate::datasets_cache::DependencyTraversalError,
},
}
102 changes: 98 additions & 4 deletions crates/core/common/src/datasets_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! through the datasets registry with in-memory caching.
// TODO: Move to providers-registry once the derived dataset constructor is decoupled from common

use std::sync::Arc;
use std::{collections::BTreeSet, sync::Arc};

pub use amp_datasets_registry::error::ResolveRevisionError;
use amp_datasets_registry::{
Expand All @@ -13,10 +13,13 @@ use amp_datasets_registry::{
};
use datafusion::common::HashMap;
use datasets_common::{
dataset_kind_str::DatasetKindStr, hash::Hash, hash_reference::HashReference,
manifest::Manifest as CommonManifest, reference::Reference,
dataset::Dataset, dataset_kind_str::DatasetKindStr, hash::Hash, hash_reference::HashReference,
manifest::Manifest as CommonManifest, network_id::NetworkId, reference::Reference,
};
use datasets_derived::{DerivedDatasetKind, Manifest as DerivedManifest};
use datasets_derived::{
DerivedDatasetKind, Manifest as DerivedManifest, dataset::Dataset as DerivedDataset,
};
use datasets_raw::dataset::Dataset as RawDataset;
use evm_rpc_datasets::{EvmRpcDatasetKind, Manifest as EvmRpcManifest};
use firehose_datasets::{FirehoseDatasetKind, Manifest as FirehoseManifest};
use parking_lot::RwLock;
Expand Down Expand Up @@ -229,6 +232,97 @@ impl crate::retryable::RetryableErrorExt for GetDerivedManifestError {
}
}

/// Collects all raw datasets reachable through a dataset's transitive dependency chain.
///
/// For a raw dataset, returns a singleton vec containing it. For a derived dataset,
/// traverses the dependency chain and returns all raw dataset leaves. For other
/// dataset types (e.g., static), returns an empty vec.
pub async fn collect_raw_datasets(
datasets_cache: &DatasetsCache,
dataset: Arc<dyn Dataset>,
) -> Result<Vec<Arc<RawDataset>>, DependencyTraversalError> {
if let Ok(raw) = dataset.clone().downcast_arc::<RawDataset>() {
return Ok(vec![raw]);
}

let mut raw_datasets = Vec::new();
let mut stack: Vec<Arc<dyn Dataset>> = vec![dataset];
let mut visited = BTreeSet::new();

while let Some(current) = stack.pop() {
let current_ref = current.reference().clone();
if !visited.insert(current_ref) {
continue;
}

if let Ok(raw) = current.clone().downcast_arc::<RawDataset>() {
raw_datasets.push(raw);
continue;
}

if let Some(derived) = current.downcast_ref::<DerivedDataset>() {
for dep in derived.dependencies().values() {
let hash_ref = datasets_cache
.resolve_revision(dep.to_reference())
.await
.map_err(DependencyTraversalError::ResolveRevision)?
.ok_or_else(|| {
DependencyTraversalError::NotFound(dep.to_reference().to_string())
})?;
let dep_dataset = datasets_cache
.get_dataset(&hash_ref)
.await
.map_err(DependencyTraversalError::GetDataset)?;
stack.push(dep_dataset);
}
}
}

Ok(raw_datasets)
}

/// Resolves the set of networks for a dataset by collecting all raw datasets
/// in its transitive dependency chain and extracting their networks.
///
/// For raw datasets, returns a singleton set. For derived datasets, returns
/// the union of all networks from reachable raw datasets.
///
/// This is used at catalog construction time to inject accurate network information
/// into `PhysicalTable` for derived datasets, whose `Table::network()` returns
/// `None` by default.
pub async fn resolve_dataset_networks(
datasets_cache: &DatasetsCache,
dataset: Arc<dyn Dataset>,
) -> Result<BTreeSet<NetworkId>, DependencyTraversalError> {
let raw_datasets = collect_raw_datasets(datasets_cache, dataset).await?;
Ok(raw_datasets.iter().map(|r| r.network().clone()).collect())
}

/// Errors that occur when traversing dataset dependencies.
#[derive(Debug, thiserror::Error)]
pub enum DependencyTraversalError {
/// Failed to get dataset from dataset store.
#[error("failed to get dataset")]
GetDataset(#[source] GetDatasetError),

/// Failed to resolve revision.
#[error("failed to resolve revision")]
ResolveRevision(#[source] ResolveRevisionError),

/// Dependency not found.
#[error("dependency '{0}' not found")]
NotFound(String),
}
impl crate::retryable::RetryableErrorExt for DependencyTraversalError {
fn is_retryable(&self) -> bool {
use amp_datasets_registry::retryable::RetryableErrorExt as _;
match self {
Self::GetDataset(err) => err.is_retryable(),
Self::ResolveRevision(err) => err.is_retryable(),
Self::NotFound(_) => false,
}
}
}
/// Parses manifest content according to the dataset kind and creates the appropriate dataset implementation.
fn create_dataset_from_manifest(
kind: &DatasetKindStr,
Expand Down
18 changes: 11 additions & 7 deletions crates/core/common/src/physical_table/table.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{ops::RangeInclusive, sync::Arc};
use std::{collections::BTreeSet, ops::RangeInclusive, sync::Arc};

use amp_data_store::{DataStore, PhyTableRevision, physical_table::PhyTableRevisionPath};
use amp_parquet::{meta::ParquetMeta, writer::WriterTarget};
Expand Down Expand Up @@ -32,13 +32,16 @@ pub struct PhysicalTable {
/// Table name.
table_name: TableName,

/// Network identifier (if the table has one).
network: Option<NetworkId>,
/// Networks this table is associated with.
///
/// For raw tables this is a singleton set from the table definition.
/// For derived tables this is resolved from the transitive dependency chain.
networks: BTreeSet<NetworkId>,

/// Data store for accessing metadata database and object storage.
store: DataStore,

/// Table definition (schema, sorted_by, optional network).
/// Table definition (schema, sorted_by).
table: Arc<dyn Table>,
}

Expand All @@ -53,14 +56,15 @@ impl PhysicalTable {
dataset_reference: HashReference,
dataset_start_block: Option<BlockNum>,
table: Arc<dyn Table>,
networks: BTreeSet<NetworkId>,
revision: PhyTableRevision,
) -> Self {
Self {
revision,
dataset_reference,
dataset_start_block,
table_name: table.name().clone(),
network: table.network().cloned(),
networks,
store,
table,
}
Expand All @@ -77,8 +81,8 @@ impl PhysicalTable {
&self.table_name
}

pub fn network(&self) -> Option<&NetworkId> {
self.network.as_ref()
pub fn networks(&self) -> &BTreeSet<NetworkId> {
&self.networks
}

pub fn url(&self) -> &Url {
Expand Down
11 changes: 6 additions & 5 deletions crates/core/common/src/plan_visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,16 +609,15 @@ pub fn find_cross_network_join(
plan: &LogicalPlan,
catalog: &crate::catalog::physical::Catalog,
) -> Result<Option<CrossNetworkJoinInfo>, DataFusionError> {
let table_to_network: BTreeMap<TableReference, NetworkId> = catalog
let table_to_networks: BTreeMap<TableReference, BTreeSet<NetworkId>> = catalog
.entries()
.iter()
.filter_map(|(physical_table, sql_schema_name)| {
let network = physical_table.network()?.clone();
.map(|(physical_table, sql_schema_name)| {
let table_ref = TableReference::Partial {
schema: Arc::from(&**sql_schema_name),
table: Arc::from(physical_table.table_name().as_str()),
};
Some((table_ref, network))
(table_ref, physical_table.networks().clone())
})
.collect();

Expand All @@ -627,7 +626,9 @@ pub fn find_cross_network_join(
let table_refs = extract_table_references_from_plan(subtree)?;
Ok(table_refs
.into_iter()
.filter_map(|table_ref| table_to_network.get(&table_ref).cloned())
.filter_map(|table_ref| table_to_networks.get(&table_ref))
.flatten()
.cloned()
.collect())
};

Expand Down
Loading
Loading