From 2617d13d056c178b4c39fa020830f08ab98327ae Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Thu, 12 Mar 2026 16:22:02 -0400 Subject: [PATCH 1/3] refactor(common): add multi-network support to PhysicalTable Derived tables will soon depend on raw datasets from multiple networks. Store resolved networks as BTreeSet on PhysicalTable so downstream consumers have accurate network information. - Add `PhysicalTable.networks: BTreeSet` populated at construction time; raw tables contribute a singleton, derived tables resolve from transitive dependencies - Extract `collect_raw_datasets` shared traversal in `datasets_cache` and rewrite `resolve_dataset_networks` and `resolve_raw_dataset_from_dependencies` on top of it - Resolve networks per-table at each `from_revision` call site, keeping `Table::network()` as the intrinsic single-network accessor - Update `find_cross_network_join` to compare `BTreeSet` per table instead of a single optional network --- .../common/src/catalog/physical/for_dump.rs | 30 +++++- .../common/src/catalog/physical/for_query.rs | 28 ++++++ crates/core/common/src/datasets_cache.rs | 93 ++++++++++++++++++- .../core/common/src/physical_table/table.rs | 18 ++-- crates/core/common/src/plan_visitors.rs | 11 ++- crates/core/common/src/streaming_query.rs | 68 ++++---------- .../tests/it_session_async_resolution.rs | 7 +- crates/core/datasets-common/src/dataset.rs | 6 +- .../worker-datasets-derived/src/job_impl.rs | 32 ++++++- .../core/worker-datasets-raw/src/job_impl.rs | 1 + .../src/handlers/datasets/restore.rs | 1 + .../admin-api/src/handlers/jobs/progress.rs | 1 + tests/src/testlib/helpers.rs | 3 + 13 files changed, 229 insertions(+), 70 deletions(-) diff --git a/crates/core/common/src/catalog/physical/for_dump.rs b/crates/core/common/src/catalog/physical/for_dump.rs index 6f4a132fa..acc265d90 100644 --- a/crates/core/common/src/catalog/physical/for_dump.rs +++ b/crates/core/common/src/catalog/physical/for_dump.rs @@ -5,7 +5,7 @@ //! then builds the catalog from resolved entries. use std::{ - collections::{BTreeMap, btree_map::Entry}, + collections::{BTreeMap, BTreeSet, btree_map::Entry}, sync::Arc, }; @@ -135,11 +135,27 @@ pub async fn resolve_external_deps( .get_table(table_name) .expect("table validated in Phase 1"); + // Resolve networks: raw tables have an intrinsic network; derived tables + // need resolution from the transitive dependency chain. + let networks = match table_def.network() { + Some(id) => BTreeSet::from([id.clone()]), + None => crate::datasets_cache::resolve_dataset_networks( + datasets_cache, + Arc::clone(&dataset), + ) + .await + .map_err(|err| CreateCatalogError::ResolveNetworks { + dataset: dataset_ref.clone(), + source: err, + })?, + }; + let physical_table = PhysicalTable::from_revision( data_store.clone(), table.dataset_reference().clone(), dataset.start_block(), Arc::clone(table_def), + networks, revision, ); entries.push(ResolvedTableEntry { @@ -257,4 +273,16 @@ pub enum CreateCatalogError { #[source] source: GetDatasetError, }, + + /// Failed to resolve networks from dependency chain. + /// + /// This occurs when traversing a derived dataset's dependencies fails + /// to resolve the set of networks. + #[error("Failed to resolve networks for dataset {dataset}")] + ResolveNetworks { + /// The hash reference of the dataset + dataset: HashReference, + #[source] + source: crate::datasets_cache::DependencyTraversalError, + }, } diff --git a/crates/core/common/src/catalog/physical/for_query.rs b/crates/core/common/src/catalog/physical/for_query.rs index e9ce411f5..8522fff60 100644 --- a/crates/core/common/src/catalog/physical/for_query.rs +++ b/crates/core/common/src/catalog/physical/for_query.rs @@ -97,11 +97,27 @@ pub async fn create( table: (**table).clone(), })?; + // Resolve networks: raw tables have an intrinsic network; derived tables + // need resolution from the transitive dependency chain. + let networks = match dataset_table.network() { + Some(id) => BTreeSet::from([id.clone()]), + None => crate::datasets_cache::resolve_dataset_networks( + datasets_cache, + Arc::clone(&dataset), + ) + .await + .map_err(|err| CreateCatalogError::ResolveNetworks { + dataset: hash_ref.clone(), + source: err, + })?, + }; + let physical_table = PhysicalTable::from_revision( data_store.clone(), hash_ref, dataset.start_block(), dataset_table.clone(), + networks, revision, ); entries.push(( @@ -181,4 +197,16 @@ pub enum CreateCatalogError { #[source] source: GetDatasetError, }, + + /// Failed to resolve networks from dependency chain. + /// + /// This occurs when traversing a derived dataset's dependencies fails + /// to resolve the set of networks. + #[error("Failed to resolve networks for dataset {dataset}")] + ResolveNetworks { + /// The hash reference of the dataset + dataset: HashReference, + #[source] + source: crate::datasets_cache::DependencyTraversalError, + }, } diff --git a/crates/core/common/src/datasets_cache.rs b/crates/core/common/src/datasets_cache.rs index 061b307ec..935097c1d 100644 --- a/crates/core/common/src/datasets_cache.rs +++ b/crates/core/common/src/datasets_cache.rs @@ -4,7 +4,7 @@ //! through the datasets registry with in-memory caching. // TODO: Move to providers-registry once the derived dataset constructor is decoupled from common -use std::sync::Arc; +use std::{collections::BTreeSet, sync::Arc}; pub use amp_datasets_registry::error::ResolveRevisionError; use amp_datasets_registry::{ @@ -13,10 +13,13 @@ use amp_datasets_registry::{ }; use datafusion::common::HashMap; use datasets_common::{ - dataset_kind_str::DatasetKindStr, hash::Hash, hash_reference::HashReference, - manifest::Manifest as CommonManifest, reference::Reference, + dataset::Dataset, dataset_kind_str::DatasetKindStr, hash::Hash, hash_reference::HashReference, + manifest::Manifest as CommonManifest, network_id::NetworkId, reference::Reference, }; -use datasets_derived::{DerivedDatasetKind, Manifest as DerivedManifest}; +use datasets_derived::{ + DerivedDatasetKind, Manifest as DerivedManifest, dataset::Dataset as DerivedDataset, +}; +use datasets_raw::dataset::Dataset as RawDataset; use evm_rpc_datasets::{EvmRpcDatasetKind, Manifest as EvmRpcManifest}; use firehose_datasets::{FirehoseDatasetKind, Manifest as FirehoseManifest}; use parking_lot::RwLock; @@ -229,6 +232,88 @@ impl crate::retryable::RetryableErrorExt for GetDerivedManifestError { } } +/// Collects all raw datasets reachable through a dataset's transitive dependency chain. +/// +/// For a raw dataset, returns a singleton vec containing it. For a derived dataset, +/// traverses the dependency chain and returns all raw dataset leaves. For other +/// dataset types (e.g., static), returns an empty vec. +pub async fn collect_raw_datasets( + datasets_cache: &DatasetsCache, + dataset: Arc, +) -> Result>, DependencyTraversalError> { + if let Ok(raw) = dataset.clone().downcast_arc::() { + return Ok(vec![raw]); + } + + let mut raw_datasets = Vec::new(); + let mut stack: Vec> = vec![dataset]; + let mut visited = BTreeSet::new(); + + while let Some(current) = stack.pop() { + let current_ref = current.reference().clone(); + if !visited.insert(current_ref) { + continue; + } + + if let Ok(raw) = current.clone().downcast_arc::() { + raw_datasets.push(raw); + continue; + } + + if let Some(derived) = current.downcast_ref::() { + for dep in derived.dependencies().values() { + let hash_ref = datasets_cache + .resolve_revision(dep.to_reference()) + .await + .map_err(DependencyTraversalError::ResolveRevision)? + .ok_or_else(|| { + DependencyTraversalError::NotFound(dep.to_reference().to_string()) + })?; + let dep_dataset = datasets_cache + .get_dataset(&hash_ref) + .await + .map_err(DependencyTraversalError::GetDataset)?; + stack.push(dep_dataset); + } + } + } + + Ok(raw_datasets) +} + +/// Resolves the set of networks for a dataset by collecting all raw datasets +/// in its transitive dependency chain and extracting their networks. +/// +/// For raw datasets, returns a singleton set. For derived datasets, returns +/// the union of all networks from reachable raw datasets. +/// +/// This is used at catalog construction time to inject accurate network information +/// into `PhysicalTable` for derived datasets, whose `Table::network()` returns +/// `None` by default. +pub async fn resolve_dataset_networks( + datasets_cache: &DatasetsCache, + dataset: Arc, +) -> Result, DependencyTraversalError> { + let raw_datasets = collect_raw_datasets(datasets_cache, dataset).await?; + Ok(raw_datasets.iter().map(|r| r.network().clone()).collect()) +} + +/// Errors that occur when traversing dataset dependencies. +#[derive(Debug, thiserror::Error)] +pub enum DependencyTraversalError { + /// Failed to get dataset from dataset store. + #[error("failed to get dataset")] + GetDataset(#[source] GetDatasetError), + + /// Failed to resolve revision. + #[error("failed to resolve revision")] + ResolveRevision(#[source] ResolveRevisionError), + + /// Dependency not found. + #[error("dependency '{0}' not found")] + NotFound(String), +} + /// Parses manifest content according to the dataset kind and creates the appropriate dataset implementation. fn create_dataset_from_manifest( kind: &DatasetKindStr, diff --git a/crates/core/common/src/physical_table/table.rs b/crates/core/common/src/physical_table/table.rs index 361fb1229..6f4acacd9 100644 --- a/crates/core/common/src/physical_table/table.rs +++ b/crates/core/common/src/physical_table/table.rs @@ -1,4 +1,4 @@ -use std::{ops::RangeInclusive, sync::Arc}; +use std::{collections::BTreeSet, ops::RangeInclusive, sync::Arc}; use amp_data_store::{DataStore, PhyTableRevision, physical_table::PhyTableRevisionPath}; use amp_parquet::{meta::ParquetMeta, writer::WriterTarget}; @@ -32,13 +32,16 @@ pub struct PhysicalTable { /// Table name. table_name: TableName, - /// Network identifier (if the table has one). - network: Option, + /// Networks this table is associated with. + /// + /// For raw tables this is a singleton set from the table definition. + /// For derived tables this is resolved from the transitive dependency chain. + networks: BTreeSet, /// Data store for accessing metadata database and object storage. store: DataStore, - /// Table definition (schema, sorted_by, optional network). + /// Table definition (schema, sorted_by). table: Arc, } @@ -53,6 +56,7 @@ impl PhysicalTable { dataset_reference: HashReference, dataset_start_block: Option, table: Arc, + networks: BTreeSet, revision: PhyTableRevision, ) -> Self { Self { @@ -60,7 +64,7 @@ impl PhysicalTable { dataset_reference, dataset_start_block, table_name: table.name().clone(), - network: table.network().cloned(), + networks, store, table, } @@ -77,8 +81,8 @@ impl PhysicalTable { &self.table_name } - pub fn network(&self) -> Option<&NetworkId> { - self.network.as_ref() + pub fn networks(&self) -> &BTreeSet { + &self.networks } pub fn url(&self) -> &Url { diff --git a/crates/core/common/src/plan_visitors.rs b/crates/core/common/src/plan_visitors.rs index b20982c14..9ffd653ba 100644 --- a/crates/core/common/src/plan_visitors.rs +++ b/crates/core/common/src/plan_visitors.rs @@ -609,16 +609,15 @@ pub fn find_cross_network_join( plan: &LogicalPlan, catalog: &crate::catalog::physical::Catalog, ) -> Result, DataFusionError> { - let table_to_network: BTreeMap = catalog + let table_to_networks: BTreeMap> = catalog .entries() .iter() - .filter_map(|(physical_table, sql_schema_name)| { - let network = physical_table.network()?.clone(); + .map(|(physical_table, sql_schema_name)| { let table_ref = TableReference::Partial { schema: Arc::from(&**sql_schema_name), table: Arc::from(physical_table.table_name().as_str()), }; - Some((table_ref, network)) + (table_ref, physical_table.networks().clone()) }) .collect(); @@ -627,7 +626,9 @@ pub fn find_cross_network_join( let table_refs = extract_table_references_from_plan(subtree)?; Ok(table_refs .into_iter() - .filter_map(|table_ref| table_to_network.get(&table_ref).cloned()) + .filter_map(|table_ref| table_to_networks.get(&table_ref)) + .flatten() + .cloned() .collect()) }; diff --git a/crates/core/common/src/streaming_query.rs b/crates/core/common/src/streaming_query.rs index 925e2f956..f444b8104 100644 --- a/crates/core/common/src/streaming_query.rs +++ b/crates/core/common/src/streaming_query.rs @@ -1,7 +1,7 @@ pub mod message_stream_with_block_complete; use std::{ - collections::{BTreeMap, BTreeSet, VecDeque}, + collections::{BTreeMap, BTreeSet}, sync::Arc, time::Duration, }; @@ -16,7 +16,7 @@ use datasets_common::{ network_id::NetworkId, table_name::TableName, }; -use datasets_derived::{dataset::Dataset as DerivedDataset, deps::SELF_REF_KEYWORD}; +use datasets_derived::deps::SELF_REF_KEYWORD; use datasets_raw::dataset::Dataset as RawDataset; use futures::stream::{self, BoxStream, StreamExt}; use js_runtime::isolate_pool::IsolatePool; @@ -45,7 +45,7 @@ use crate::{ plan::PlanContextBuilder, }, cursor::{Cursor, CursorNetworkNotFoundError, NetworkCursor, Watermark}, - datasets_cache::{DatasetsCache, ResolveRevisionError}, + datasets_cache::DatasetsCache, detached_logical_plan::DetachedLogicalPlan, exec_env::ExecEnv, incrementalizer::incrementalize_plan, @@ -119,7 +119,7 @@ pub enum SpawnError { /// Failed to resolve raw dataset from dependencies /// - /// This occurs when BFS through dataset dependencies fails to find a raw + /// This occurs when traversing dataset dependencies fails to find a raw /// (non-derived) dataset whose network can be used for the streaming query. #[error("failed to resolve raw dataset from dependencies")] ResolveRawDataset(#[source] ResolveRawDatasetError), @@ -1247,11 +1247,13 @@ async fn resolve_blocks_table( })?; let sql_schema_name = dataset.reference().to_reference().to_string(); + let networks = table.network().into_iter().cloned().collect(); let physical_table = PhysicalTable::from_revision( data_store, dataset.reference().clone(), dataset.start_block(), Arc::clone(table) as Arc, + networks, revision, ); Ok(( @@ -1284,63 +1286,37 @@ pub enum ResolveBlocksTableError { TableNotSynced(String, String), } -/// Resolve the raw dataset and its network by BFS through dataset dependencies. +/// Resolve the raw dataset from a set of root dataset references. /// -/// Returns the first raw (non-derived) dataset found and its network, validating that all raw -/// datasets in the dependency tree belong to the same network. +/// Traverses transitive dependencies from each root, collecting raw datasets. +/// Returns the first raw dataset found, validating that all raw datasets in +/// the dependency tree belong to the same network. async fn resolve_raw_dataset_from_dependencies( datasets_cache: &DatasetsCache, root_dataset_refs: impl Iterator, ) -> Result, ResolveRawDatasetError> { let mut found: Option> = None; - let mut queue: VecDeque> = VecDeque::new(); for hash_ref in root_dataset_refs { let dataset = datasets_cache .get_dataset(hash_ref) .await .map_err(ResolveRawDatasetError::GetDataset)?; - queue.push_back(dataset); - } - let mut visited = BTreeSet::new(); - while let Some(dataset) = queue.pop_front() { - let dataset_ref = dataset.reference().clone(); - if !visited.insert(dataset_ref) { - continue; - } + let raw_datasets = crate::datasets_cache::collect_raw_datasets(datasets_cache, dataset) + .await + .map_err(ResolveRawDatasetError::Traversal)?; - // Raw dataset: record its network, fail if a second network appears - if let Ok(raw) = dataset.clone().downcast_arc::() { + for raw in raw_datasets { match &found { - None => { - found = Some(raw); - } + None => found = Some(raw), Some(first) if *first.network() != *raw.network() => { return Err(ResolveRawDatasetError::MultipleNetworks { first: first.network().clone(), second: raw.network().clone(), }); } - Some(_) => {} // same network, continue BFS - } - } - - // Derived dataset: enqueue dependencies - if let Some(derived) = dataset.downcast_ref::() { - for dep in derived.dependencies().values() { - let hash_ref = datasets_cache - .resolve_revision(dep.to_reference()) - .await - .map_err(ResolveRawDatasetError::ResolveRevision)? - .ok_or_else(|| { - ResolveRawDatasetError::NotFound(dep.to_reference().to_string()) - })?; - let dataset = datasets_cache - .get_dataset(&hash_ref) - .await - .map_err(ResolveRawDatasetError::GetDataset)?; - queue.push_back(dataset); + Some(_) => {} // same network } } } @@ -1351,17 +1327,13 @@ async fn resolve_raw_dataset_from_dependencies( /// Errors that occur when resolving the raw dataset from dependencies. #[derive(Debug, thiserror::Error)] pub enum ResolveRawDatasetError { - /// Failed to get dataset from dataset store. + /// Failed to get root dataset from dataset store. #[error("failed to get dataset")] GetDataset(#[source] crate::datasets_cache::GetDatasetError), - /// Failed to resolve revision. - #[error("failed to resolve revision")] - ResolveRevision(#[source] ResolveRevisionError), - - /// Dependency not found. - #[error("dependency '{0}' not found")] - NotFound(String), + /// Failed to traverse dataset dependencies. + #[error("failed to traverse dependencies")] + Traversal(#[source] crate::datasets_cache::DependencyTraversalError), /// Multiple networks found in the dependency tree. #[error("multiple networks in dependency tree: {first} and {second}")] diff --git a/crates/core/common/tests/it_session_async_resolution.rs b/crates/core/common/tests/it_session_async_resolution.rs index c7e9bcb63..d5f6bea74 100644 --- a/crates/core/common/tests/it_session_async_resolution.rs +++ b/crates/core/common/tests/it_session_async_resolution.rs @@ -1,6 +1,6 @@ use std::{ any::Any, - collections::BTreeMap, + collections::{BTreeMap, BTreeSet}, sync::{Arc, Mutex}, }; @@ -540,11 +540,14 @@ async fn exec_statement_to_plan_with_overlapping_async_and_physical_tables_succe url: revision_url, }; + let dataset_table = Arc::new(dataset_table); + let networks = BTreeSet::from([dataset_table.network_ref().clone()]); let physical_table = Arc::new(PhysicalTable::from_revision( data_store.clone(), hash_ref, None, - Arc::new(dataset_table), + dataset_table, + networks, revision, )); diff --git a/crates/core/datasets-common/src/dataset.rs b/crates/core/datasets-common/src/dataset.rs index bf9680c73..9a74c9353 100644 --- a/crates/core/datasets-common/src/dataset.rs +++ b/crates/core/datasets-common/src/dataset.rs @@ -84,9 +84,11 @@ pub trait Table: DowncastSync + std::fmt::Debug { /// Returns the Arrow schema defining this table's columns and types. fn schema(&self) -> &SchemaRef; - /// Returns the network this table is associated with, if any. + /// Returns the intrinsic network this table belongs to, if any. /// - /// Raw tables always have a network; derived tables return `None`. + /// Raw tables return `Some` (they always belong to exactly one network). + /// Derived and static tables return `None` (their networks are resolved + /// from the dependency chain at catalog construction time). fn network(&self) -> Option<&NetworkId> { None } diff --git a/crates/core/worker-datasets-derived/src/job_impl.rs b/crates/core/worker-datasets-derived/src/job_impl.rs index 3ab0625b1..5e431ab84 100644 --- a/crates/core/worker-datasets-derived/src/job_impl.rs +++ b/crates/core/worker-datasets-derived/src/job_impl.rs @@ -98,7 +98,10 @@ pub mod query; pub mod table; -use std::{collections::BTreeMap, sync::Arc}; +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; use amp_data_store::retryable::RetryableErrorExt as _; use amp_worker_core::{ @@ -165,11 +168,24 @@ pub async fn execute( .map_err(Error::RegisterNewPhysicalTable)?, }; + // Resolve networks: raw tables have an intrinsic network; derived tables + // need resolution from the transitive dependency chain. + let networks = match table_def.network() { + Some(id) => BTreeSet::from([id.clone()]), + None => common::datasets_cache::resolve_dataset_networks( + &ctx.datasets_cache, + Arc::clone(&dataset), + ) + .await + .map_err(Error::ResolveNetworks)?, + }; + let physical_table = Arc::new(PhysicalTable::from_revision( ctx.data_store.clone(), dataset.reference().clone(), dataset.start_block(), table_def.clone(), + networks, revision, )); @@ -339,6 +355,16 @@ pub enum Error { source: amp_worker_core::check::ConsistencyError, }, + /// Failed to resolve networks from dependency chain + /// + /// This occurs when traversing the derived dataset's dependencies fails + /// to resolve all raw dataset networks. Common causes: + /// - Dependency dataset not found + /// - Failed to resolve dependency revision + /// - Dataset store connectivity issues + #[error("Failed to resolve networks from dependencies")] + ResolveNetworks(#[source] common::datasets_cache::DependencyTraversalError), + /// Failed to retrieve derived dataset manifest /// /// This occurs when the manifest for a derived dataset cannot be fetched from @@ -388,6 +414,9 @@ impl RetryableErrorExt for Error { // Delegate to inner error classification Self::GetDataset(err) => err.is_retryable(), + // Network resolution depends on dataset store — transient + Self::ResolveNetworks(_) => true, + // Transient DB/store lookup failures Self::GetActivePhysicalTable(err) => err.is_retryable(), Self::RegisterNewPhysicalTable(_) => true, @@ -425,6 +454,7 @@ impl amp_worker_core::retryable::JobErrorExt for Error { fn error_code(&self) -> &'static str { match self { Self::GetDataset(_) => "GET_DATASET", + Self::ResolveNetworks(_) => "RESOLVE_NETWORKS", Self::GetActivePhysicalTable(_) => "GET_ACTIVE_PHYSICAL_TABLE", Self::RegisterNewPhysicalTable(_) => "REGISTER_NEW_PHYSICAL_TABLE", Self::LockRevisionsForWriter(_) => "LOCK_REVISIONS_FOR_WRITER", diff --git a/crates/core/worker-datasets-raw/src/job_impl.rs b/crates/core/worker-datasets-raw/src/job_impl.rs index 79f151825..1edc0a3a8 100644 --- a/crates/core/worker-datasets-raw/src/job_impl.rs +++ b/crates/core/worker-datasets-raw/src/job_impl.rs @@ -169,6 +169,7 @@ pub async fn execute( dataset.reference().clone(), dataset.start_block(), table_def.clone(), + table_def.network().into_iter().cloned().collect(), revision, )); diff --git a/crates/services/admin-api/src/handlers/datasets/restore.rs b/crates/services/admin-api/src/handlers/datasets/restore.rs index f6fe55c02..89a0b1cda 100644 --- a/crates/services/admin-api/src/handlers/datasets/restore.rs +++ b/crates/services/admin-api/src/handlers/datasets/restore.rs @@ -173,6 +173,7 @@ pub async fn handler( dataset_ref.clone(), start_block, table_def.clone(), + table_def.network().into_iter().cloned().collect(), info, ); diff --git a/crates/services/admin-api/src/handlers/jobs/progress.rs b/crates/services/admin-api/src/handlers/jobs/progress.rs index 784c6c33b..65715f1e0 100644 --- a/crates/services/admin-api/src/handlers/jobs/progress.rs +++ b/crates/services/admin-api/src/handlers/jobs/progress.rs @@ -203,6 +203,7 @@ pub async fn handler( hash_ref.clone(), dataset.start_block(), table_config.clone(), + table_config.network().into_iter().cloned().collect(), revision, ) }); diff --git a/tests/src/testlib/helpers.rs b/tests/src/testlib/helpers.rs index 4d7778d25..e16288dd9 100644 --- a/tests/src/testlib/helpers.rs +++ b/tests/src/testlib/helpers.rs @@ -203,6 +203,7 @@ pub async fn load_physical_tables( dataset_ref.clone(), dataset.start_block(), table_def.clone(), + table_def.network().into_iter().cloned().collect(), revision, ); dumped_tables.push(physical_table.into()); @@ -295,6 +296,7 @@ pub async fn restore_dataset_snapshot( dataset_ref.clone(), dataset.start_block(), table_def.clone(), + table_def.network().into_iter().cloned().collect(), revision, ); tables.push(physical_table.into()); @@ -360,6 +362,7 @@ pub async fn catalog_for_dataset( dataset_ref.clone(), dataset.start_block(), table_def.clone(), + table_def.network().into_iter().cloned().collect(), revision, ); tables.push(( From 7784a1c74a9f7fcc9e9e09dcaf7dee85706c4221 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Fri, 13 Mar 2026 09:36:46 -0400 Subject: [PATCH 2/3] Update crates/core/common/src/datasets_cache.rs Co-authored-by: Shiyas Mohammed <83513144+shiyasmohd@users.noreply.github.com> Signed-off-by: Theo Butler --- crates/core/common/src/datasets_cache.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/crates/core/common/src/datasets_cache.rs b/crates/core/common/src/datasets_cache.rs index 935097c1d..79983602f 100644 --- a/crates/core/common/src/datasets_cache.rs +++ b/crates/core/common/src/datasets_cache.rs @@ -313,7 +313,16 @@ pub enum DependencyTraversalError { #[error("dependency '{0}' not found")] NotFound(String), } - +impl crate::retryable::RetryableErrorExt for DependencyTraversalError { + fn is_retryable(&self) -> bool { + use amp_datasets_registry::retryable::RetryableErrorExt as _; + match self { + Self::GetDataset(err) => err.is_retryable(), + Self::ResolveRevision(err) => err.is_retryable(), + Self::NotFound(_) => false, + } + } +} /// Parses manifest content according to the dataset kind and creates the appropriate dataset implementation. fn create_dataset_from_manifest( kind: &DatasetKindStr, From c023c7ecfb436bda23dd6369dff814da55344cf0 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Fri, 13 Mar 2026 09:37:01 -0400 Subject: [PATCH 3/3] Update crates/core/worker-datasets-derived/src/job_impl.rs Co-authored-by: Shiyas Mohammed <83513144+shiyasmohd@users.noreply.github.com> Signed-off-by: Theo Butler --- crates/core/worker-datasets-derived/src/job_impl.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/worker-datasets-derived/src/job_impl.rs b/crates/core/worker-datasets-derived/src/job_impl.rs index 5e431ab84..3a09c0441 100644 --- a/crates/core/worker-datasets-derived/src/job_impl.rs +++ b/crates/core/worker-datasets-derived/src/job_impl.rs @@ -415,7 +415,7 @@ impl RetryableErrorExt for Error { Self::GetDataset(err) => err.is_retryable(), // Network resolution depends on dataset store — transient - Self::ResolveNetworks(_) => true, + Self::ResolveNetworks(err) => err.is_retryable(), // Transient DB/store lookup failures Self::GetActivePhysicalTable(err) => err.is_retryable(),