From 1b2d9e7c3b7be5fe288704e54c7bf69d60f5c10d Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Tue, 10 Mar 2026 09:26:50 -0500 Subject: [PATCH 1/6] feat(admin-api): support inter-table dependencies in derived datasets Add self-qualified table references (self.table_name) enabling tables within a derived dataset to reference sibling tables. Includes topological ordering, cycle detection, and self-reference rejection. - Add `DepAliasOrSelfRef` type for parsing `self.`-qualified refs - Implement topological sort with `CyclicDepError` in `datasets-derived` - Register sibling schemas progressively via `SelfSchemaProvider` - Add `CYCLIC_DEPENDENCY`, `SELF_REFERENCING_TABLE`, `CATALOG_QUALIFIED_TABLE`, `INVALID_TABLE_NAME` error codes - Add runtime inter-table dependency support in worker-datasets-derived Signed-off-by: Mitchell Spencer --- .../common/src/catalog/physical/for_dump.rs | 61 +++-- .../core/common/src/self_schema_provider.rs | 10 + .../worker-datasets-derived/src/job_impl.rs | 30 +- .../src/job_impl/table.rs | 259 +++++++++++++++--- .../services/admin-api/src/handlers/common.rs | 157 ++++++++--- .../services/admin-api/src/handlers/schema.rs | 214 ++++++++++++--- docs/feat/data-inter-table-dependencies.md | 126 +++++++++ docs/openapi-specs/admin.spec.json | 2 +- .../config/packages/intra_deps/amp.config.ts | 4 +- tests/src/tests/it_admin_api_schema.rs | 226 ++++++++++++++- tests/src/tests/it_dependencies.rs | 1 - 11 files changed, 947 insertions(+), 143 deletions(-) create mode 100644 docs/feat/data-inter-table-dependencies.md diff --git a/crates/core/common/src/catalog/physical/for_dump.rs b/crates/core/common/src/catalog/physical/for_dump.rs index 63affa850..6f4a132fa 100644 --- a/crates/core/common/src/catalog/physical/for_dump.rs +++ b/crates/core/common/src/catalog/physical/for_dump.rs @@ -2,7 +2,7 @@ //! //! This module provides physical catalog creation for derived dataset execution. //! It resolves dependency tables from manifest deps and SQL table references, -//! then adds physical parquet locations. +//! then builds the catalog from resolved entries. use std::{ collections::{BTreeMap, btree_map::Entry}, @@ -22,10 +22,18 @@ use crate::{ sql::TableReference, }; -/// Creates a full catalog with physical data access for derived dataset dumps. +/// A resolved table entry containing logical metadata, physical data access, +/// and the SQL schema name used in queries. +pub struct ResolvedTableEntry { + pub logical: LogicalTable, + pub physical: Arc, + pub schema_name: Arc, +} + +/// Resolves external dependency table references into resolved table entries. /// /// This function resolves dependency tables from manifest deps and SQL table references, -/// loads dataset metadata, builds physical table entries, and constructs the catalog. +/// loads dataset metadata, and builds physical table entries. /// /// ## Parameters /// @@ -33,14 +41,12 @@ use crate::{ /// - `data_store`: Used to query metadata database for physical parquet locations /// - `manifest_deps`: Dependency alias → hash reference mappings from the manifest /// - `table_refs`: Parsed SQL table references with dep alias schemas -/// - `udfs`: Pre-resolved self-ref UDFs (from logical catalog) -pub async fn create( +pub async fn resolve_external_deps( datasets_cache: &DatasetsCache, data_store: &DataStore, manifest_deps: &BTreeMap, table_refs: Vec>, - udfs: Vec, -) -> Result { +) -> Result, CreateCatalogError> { // Resolve table references to LogicalTable instances let mut tables_by_hash: BTreeMap, LogicalTable>> = Default::default(); @@ -98,7 +104,7 @@ pub async fn create( .flat_map(|map| map.into_values()) .collect(); - // Build physical catalog entries from resolved logical tables + // Build resolved entries from logical tables let mut entries = Vec::new(); for table in &logical_tables { let dataset_ref = table.dataset_reference(); @@ -136,24 +142,47 @@ pub async fn create( Arc::clone(table_def), revision, ); - entries.push(( - Arc::from(physical_table), - Arc::from(table.sql_schema_name()), - )); + entries.push(ResolvedTableEntry { + logical: table.clone(), + physical: Arc::from(physical_table), + schema_name: Arc::from(table.sql_schema_name()), + }); } - // Build dep_aliases map + Ok(entries) +} + +/// Builds a physical catalog from resolved table entries. +/// +/// Takes already-resolved table entries (from both external deps and self-ref +/// siblings) and constructs the final catalog. +/// +/// ## Parameters +/// +/// - `entries`: All resolved table entries (external deps + self-refs combined) +/// - `udfs`: Pre-resolved UDFs (from logical catalog) +/// - `manifest_deps`: Dependency alias → hash reference mappings from the manifest +pub fn build_catalog( + entries: Vec, + udfs: Vec, + manifest_deps: &BTreeMap, +) -> Catalog { + let (logical_tables, physical_entries): (Vec<_>, Vec<_>) = entries + .into_iter() + .map(|e| (e.logical, (e.physical, e.schema_name))) + .unzip(); + let dep_aliases: BTreeMap = manifest_deps .iter() .map(|(alias, hash_ref)| (alias.to_string(), hash_ref.clone())) .collect(); - Ok(Catalog::new(logical_tables, udfs, entries, dep_aliases)) + Catalog::new(logical_tables, udfs, physical_entries, dep_aliases) } -/// Errors that can occur when creating a physical catalog. +/// Errors that can occur when resolving external dependency table references. /// -/// Returned by [`create`] when catalog creation fails. +/// Returned by [`resolve_external_deps`] when resolution fails. #[derive(Debug, thiserror::Error)] pub enum CreateCatalogError { /// Table is not qualified with a schema/dataset name. diff --git a/crates/core/common/src/self_schema_provider.rs b/crates/core/common/src/self_schema_provider.rs index 75ffe7a47..a23c56d77 100644 --- a/crates/core/common/src/self_schema_provider.rs +++ b/crates/core/common/src/self_schema_provider.rs @@ -7,6 +7,7 @@ use std::{any::Any, collections::BTreeMap, sync::Arc}; use async_trait::async_trait; use datafusion::{ + arrow::datatypes::SchemaRef, catalog::{ AsyncSchemaProvider as TableAsyncSchemaProvider, SchemaProvider as TableSchemaProvider, TableProvider, @@ -57,6 +58,15 @@ impl SelfSchemaProvider { &self.udfs } + /// Registers a table dynamically for progressive schema resolution. + /// + /// After a table's schema is inferred during inter-table dependency processing, + /// call this method so that subsequent tables can reference it via `self.`. + pub fn add_table(&self, name: impl Into, schema: SchemaRef) { + let table_provider: Arc = Arc::new(PlanTable::new(schema)); + self.table_cache.write().insert(name.into(), table_provider); + } + /// Creates a provider from manifest functions (no tables). /// /// Functions are already validated at deserialization time. diff --git a/crates/core/worker-datasets-derived/src/job_impl.rs b/crates/core/worker-datasets-derived/src/job_impl.rs index 82375a896..89cb06dfc 100644 --- a/crates/core/worker-datasets-derived/src/job_impl.rs +++ b/crates/core/worker-datasets-derived/src/job_impl.rs @@ -98,7 +98,7 @@ pub mod query; pub mod table; -use std::sync::Arc; +use std::{collections::BTreeMap, sync::Arc}; use amp_data_store::retryable::RetryableErrorExt as _; use amp_worker_core::{ @@ -106,7 +106,7 @@ use amp_worker_core::{ retryable::RetryableErrorExt, tasks::TryWaitAllError, }; use common::{physical_table::PhysicalTable, retryable::RetryableErrorExt as _}; -use datasets_common::hash_reference::HashReference; +use datasets_common::{hash_reference::HashReference, table_name::TableName}; use tracing::Instrument; use self::table::{MaterializeTableError, materialize_table}; @@ -208,6 +208,14 @@ pub async fn execute( })?; } + // Collect all sibling tables for inter-table dependency resolution. + // Each materialize_table call receives this map so self-ref tables + // (e.g., `self.blocks_base`) can be resolved to sibling PhysicalTables. + let siblings: BTreeMap> = tables + .iter() + .map(|(pt, _)| (pt.table_name().clone(), Arc::clone(pt))) + .collect(); + // Process all tables in parallel using FailFastJoinSet let mut join_set = amp_worker_core::tasks::FailFastJoinSet::>::new(); @@ -227,8 +235,9 @@ pub async fn execute( let env = env.clone(); let table = Arc::clone(table); let compactor = Arc::clone(compactor); - let opts = parquet_opts.clone(); let manifest = manifest.clone(); + let siblings = siblings.clone(); + let opts = parquet_opts.clone(); join_set.spawn( async move { @@ -242,6 +251,7 @@ pub async fn execute( compactor, opts.clone(), end, + &siblings, ) .await?; @@ -477,6 +487,20 @@ mod tests { //* Then assert!(result, "DependencyNotFound should be fatal"); } + + #[test] + fn is_fatal_self_ref_table_not_found_returns_true() { + //* Given + let err = MaterializeTableError::SelfRefTableNotFound( + "missing_table".parse().expect("should parse table name"), + ); + + //* When + let result = err.is_fatal(); + + //* Then + assert!(result, "SelfRefTableNotFound should be fatal"); + } } mod materialize_table_spawn_error_is_fatal { diff --git a/crates/core/worker-datasets-derived/src/job_impl/table.rs b/crates/core/worker-datasets-derived/src/job_impl/table.rs index 27ba67640..137dedce9 100644 --- a/crates/core/worker-datasets-derived/src/job_impl/table.rs +++ b/crates/core/worker-datasets-derived/src/job_impl/table.rs @@ -12,7 +12,10 @@ use amp_worker_core::{ use common::{ BlockNum, amp_catalog_provider::{AMP_CATALOG_NAME, AmpCatalogProvider}, - catalog::physical::{EarliestBlockError, for_dump as physical_for_dump}, + catalog::{ + logical::LogicalTable, + physical::{EarliestBlockError, for_dump as physical_for_dump}, + }, context::{exec::ExecContextBuilder, plan::PlanContextBuilder}, cursor::Cursor, datasets_cache::ResolveRevisionError, @@ -20,12 +23,12 @@ use common::{ physical_table::{CanonicalChainError, PhysicalTable}, retryable::RetryableErrorExt as _, self_schema_provider::SelfSchemaProvider, - sql::{ParseSqlError, ResolveTableReferencesError, resolve_table_references}, + sql::{ParseSqlError, ResolveTableReferencesError, TableReference, resolve_table_references}, }; -use datasets_common::hash_reference::HashReference; +use datasets_common::{hash_reference::HashReference, table_name::TableName}; use datasets_derived::{ Manifest as DerivedManifest, - deps::{DepAlias, DepAliasError}, + deps::{DepAlias, DepAliasOrSelfRef, DepAliasOrSelfRefError, SELF_REF_KEYWORD}, manifest::TableInput, }; use tracing::Instrument as _; @@ -33,7 +36,40 @@ use tracing::Instrument as _; use super::query::{MaterializeSqlQueryError, materialize_sql_query}; use crate::job_ctx::Context; +/// Partitions table references into external dependency refs and self-ref table names. +/// +/// References with `self.` schema are collected as self-ref table names. +/// All other qualified references are converted to external `DepAlias` refs. +fn partition_table_refs( + refs: Vec>, +) -> (Vec>, Vec) { + let mut ext_refs: Vec> = Vec::new(); + let mut self_ref_tables: Vec = Vec::new(); + + for table_ref in refs { + match table_ref { + TableReference::Bare { table } => { + ext_refs.push(TableReference::Bare { table }); + } + TableReference::Partial { schema, table } => match schema.as_ref() { + DepAliasOrSelfRef::SelfRef => { + self_ref_tables.push(table.as_ref().clone()); + } + DepAliasOrSelfRef::DepAlias(alias) => { + ext_refs.push(TableReference::Partial { + schema: Arc::new(alias.clone()), + table, + }); + } + }, + } + } + + (ext_refs, self_ref_tables) +} + /// Materializes a derived dataset table +#[allow(clippy::too_many_arguments)] #[tracing::instrument(skip_all, fields(table = %table.table_name()), err)] pub async fn materialize_table( ctx: Context, @@ -43,6 +79,7 @@ pub async fn materialize_table( compactor: Arc, opts: Arc, end: EndBlock, + siblings: &BTreeMap>, ) -> Result<(), MaterializeTableError> { let materialize_start_time = Instant::now(); @@ -94,19 +131,51 @@ pub async fn materialize_table( let self_schema_provider = SelfSchemaProvider::from_manifest_udfs(env.isolate_pool.clone(), &manifest.functions); - let catalog = { - let table_refs = resolve_table_references::(&query) - .map_err(MaterializeTableError::ResolveTableReferences)?; - physical_for_dump::create( - &ctx.datasets_cache, - &ctx.data_store, - &dependencies, - table_refs, - self_schema_provider.udfs().to_vec(), - ) - .await - .map_err(MaterializeTableError::CreatePhysicalCatalog)? - }; + // Resolve and partition table references into external deps and self-refs. + let all_table_refs = resolve_table_references::(&query) + .map_err(MaterializeTableError::ResolveTableReferences)?; + let (ext_refs, self_ref_tables) = partition_table_refs(all_table_refs); + + // Resolve self-ref sibling tables for planning (schema) and execution (physical) + let mut pre_resolved = Vec::new(); + for self_ref_name in &self_ref_tables { + let sibling = siblings + .get(self_ref_name) + .ok_or_else(|| MaterializeTableError::SelfRefTableNotFound(self_ref_name.clone()))?; + + // Planning: register schema so DataFusion knows column types + self_schema_provider.add_table(self_ref_name.to_string(), sibling.table().schema().clone()); + + // Execution: build resolved entry so attach() can find real data + pre_resolved.push(physical_for_dump::ResolvedTableEntry { + logical: LogicalTable::new( + SELF_REF_KEYWORD.to_string(), + sibling.dataset_reference().clone(), + sibling.table().clone(), + ), + physical: Arc::clone(sibling), + schema_name: Arc::from(SELF_REF_KEYWORD), + }); + } + + // Resolve external deps, combine with self-ref entries, and build the catalog + let mut all_entries = physical_for_dump::resolve_external_deps( + &ctx.datasets_cache, + &ctx.data_store, + &dependencies, + ext_refs, + ) + .await + .map_err(MaterializeTableError::CreatePhysicalCatalog)?; + + all_entries.extend(pre_resolved); + + let catalog = physical_for_dump::build_catalog( + all_entries, + self_schema_provider.udfs().to_vec(), + &dependencies, + ); + let dep_alias_map = catalog.dep_aliases().clone(); // Planning context: tables resolved lazily by AmpCatalogProvider with dep aliases. @@ -140,18 +209,40 @@ pub async fn materialize_table( }); } - let Some(dependency_earliest_block) = catalog - .earliest_block() - .await - .map_err(MaterializeTableSpawnError::EarliestBlock)? - else { - // If the dependencies have synced nothing, we have nothing to do. - tracing::warn!("no blocks to materialize for {table_name}, dependencies are empty"); - return Ok::<(), MaterializeTableSpawnError>(()); - }; + // Wait until at least one dependency has synced data, then use its + // earliest block as the start. For tables with only external deps this + // returns immediately. For tables with self-refs, this blocks on the + // notification pipeline until the sibling table produces data. + // No explicit timeout: if a sibling fails, FailFastJoinSet aborts this task. + let start = { + let mut receivers: BTreeMap<_, _> = BTreeMap::new(); + for pt in catalog.physical_tables() { + let loc = pt.location_id(); + receivers + .entry(loc) + .or_insert(ctx.notification_multiplexer.subscribe(loc).await); + } - // Derived datasets inherit start_block from their dependencies - let start = dependency_earliest_block; + loop { + match catalog + .earliest_block() + .await + .map_err(MaterializeTableSpawnError::EarliestBlock)? + { + Some(block) => break block, + None => { + tracing::debug!( + "dependencies not yet synced, blocking on notification" + ); + let futs: Vec<_> = receivers + .values_mut() + .map(|rx| Box::pin(rx.changed())) + .collect(); + let _ = futures::future::select_all(futs).await; + } + } + } + }; let resolved = resolve_end_block(&end, start, async { let query_ctx = ExecContextBuilder::new(env.clone()) @@ -299,7 +390,7 @@ pub enum MaterializeTableError { /// This occurs when extracting and resolving table references from the /// parsed SQL query fails. #[error("failed to resolve table references: {0}")] - ResolveTableReferences(#[source] ResolveTableReferencesError), + ResolveTableReferences(#[source] ResolveTableReferencesError), /// Failed to create the physical catalog for query execution /// @@ -308,6 +399,13 @@ pub enum MaterializeTableError { #[error("failed to create physical catalog: {0}")] CreatePhysicalCatalog(#[source] physical_for_dump::CreateCatalogError), + /// A self-ref table was not found among sibling tables + /// + /// This occurs when a SQL query references `self.table_name` but no sibling + /// table with that name exists in the dataset. + #[error("self-referenced table '{0}' not found in dataset")] + SelfRefTableNotFound(TableName), + /// A parallel materialization task failed during execution /// /// This occurs when one of the parallel tasks spawned for materializing table data @@ -335,6 +433,7 @@ impl RetryableErrorExt for MaterializeTableError { Self::DependencyNotFound { .. } => false, Self::ResolveTableReferences(_) => false, Self::CreatePhysicalCatalog(_) => false, + Self::SelfRefTableNotFound(_) => false, // Transient DB failure — recoverable Self::ResolveRevision(err) => err.0.is_retryable(), @@ -368,13 +467,6 @@ pub enum MaterializeTableSpawnError { source: common::incrementalizer::NonIncrementalQueryError, }, - /// Failed to determine the earliest block from dependencies - /// - /// This occurs when querying the catalog for the earliest available - /// block across all dependency datasets fails. - #[error("failed to get earliest block: {0}")] - EarliestBlock(#[source] EarliestBlockError), - /// Failed to resolve the end block for materialization /// /// This occurs when determining the target end block for the @@ -389,6 +481,13 @@ pub enum MaterializeTableSpawnError { #[error("failed to get canonical chain: {0}")] CanonicalChain(#[source] CanonicalChainError), + /// Failed to compute the earliest block from dependencies + /// + /// This occurs when snapshotting dependency tables to determine their + /// earliest synced block fails. + #[error("failed to compute earliest block: {0}")] + EarliestBlock(#[source] EarliestBlockError), + /// Failed to execute the SQL query materialization operation /// /// This occurs when the inner `materialize_sql_query` function fails. @@ -413,8 +512,8 @@ impl RetryableErrorExt for MaterializeTableSpawnError { Self::NonIncrementalQuery { .. } => false, // Delegate to inner error classification - Self::EarliestBlock(err) => err.is_retryable(), Self::CanonicalChain(err) => err.is_retryable(), + Self::EarliestBlock(err) => err.is_retryable(), // Block range resolution — inspect the source variant Self::ResolveEndBlock(err) => err.is_retryable(), @@ -424,3 +523,89 @@ impl RetryableErrorExt for MaterializeTableSpawnError { } } } + +#[cfg(test)] +mod tests { + use common::sql::resolve_table_references; + use datasets_derived::{deps::DepAliasOrSelfRef, sql_str::SqlStr}; + + use super::*; + + fn parse_and_partition(sql: &str) -> (Vec>, Vec) { + let sql_str: SqlStr = sql.parse().expect("sql should parse to SqlStr"); + let stmt = common::sql::parse(&sql_str).expect("sql should parse to statement"); + let refs = resolve_table_references::(&stmt) + .expect("table references should resolve"); + partition_table_refs(refs) + } + + #[test] + fn partition_table_refs_with_only_external_deps_returns_ext_refs_only() { + //* When + let (ext, self_refs) = parse_and_partition("SELECT * FROM eth_firehose.blocks"); + + //* Then + assert_eq!(ext.len(), 1, "should have one external ref"); + assert!(self_refs.is_empty(), "should have no self-refs"); + assert_eq!(ext[0].to_string(), "eth_firehose.blocks"); + } + + #[test] + fn partition_table_refs_with_only_self_refs_returns_self_ref_tables_only() { + //* When + let (ext, self_refs) = parse_and_partition("SELECT * FROM self.blocks_base"); + + //* Then + assert!(ext.is_empty(), "should have no external refs"); + assert_eq!(self_refs.len(), 1, "should have one self-ref"); + assert_eq!(self_refs[0].to_string(), "blocks_base"); + } + + #[test] + fn partition_table_refs_with_mixed_refs_splits_correctly() { + //* When + let (ext, self_refs) = parse_and_partition( + "SELECT a.block_num, b.hash FROM eth_firehose.blocks a JOIN self.blocks_base b ON a.block_num = b.block_num", + ); + + //* Then + assert_eq!(ext.len(), 1, "should have one external ref"); + assert_eq!(self_refs.len(), 1, "should have one self-ref"); + assert_eq!(ext[0].to_string(), "eth_firehose.blocks"); + assert_eq!(self_refs[0].to_string(), "blocks_base"); + } + + #[test] + fn partition_table_refs_with_multiple_self_refs_captures_all() { + //* When + let (ext, self_refs) = parse_and_partition( + "SELECT * FROM self.blocks_base JOIN self.transactions ON self.blocks_base.block_num = self.transactions.block_num", + ); + + //* Then + assert!(ext.is_empty(), "should have no external refs"); + assert_eq!(self_refs.len(), 2, "should have two self-refs"); + assert!( + self_refs.iter().any(|t| *t == "blocks_base"), + "should contain blocks_base" + ); + assert!( + self_refs.iter().any(|t| *t == "transactions"), + "should contain transactions" + ); + } + + #[test] + fn is_retryable_with_self_ref_table_not_found_returns_false() { + //* Given + let err = MaterializeTableError::SelfRefTableNotFound( + "missing_table".parse().expect("should parse table name"), + ); + + //* When + let result = err.is_retryable(); + + //* Then + assert!(!result, "SelfRefTableNotFound should not be retryable"); + } +} diff --git a/crates/services/admin-api/src/handlers/common.rs b/crates/services/admin-api/src/handlers/common.rs index 8e7f7b26f..a275b263e 100644 --- a/crates/services/admin-api/src/handlers/common.rs +++ b/crates/services/admin-api/src/handlers/common.rs @@ -21,8 +21,9 @@ use datafusion::sql::parser::Statement; use datasets_common::{hash_reference::HashReference, table_name::TableName}; use datasets_derived::{ Manifest as DerivedDatasetManifest, - deps::{DepAlias, DepAliasError, DepAliasOrSelfRef, DepAliasOrSelfRefError}, + deps::{DepAlias, DepAliasOrSelfRef, DepAliasOrSelfRefError}, manifest::{TableInput, View}, + sorting::{self, CyclicDepError}, }; use futures::{StreamExt as _, stream}; use js_runtime::isolate_pool::IsolatePool; @@ -31,7 +32,7 @@ use js_runtime::isolate_pool::IsolatePool; type TableReferencesMap = BTreeMap< TableName, ( - Vec>, + Vec>, Vec>, ), >; @@ -232,6 +233,12 @@ pub enum ParseRawManifestError { /// - All tables used in SQL exist in their datasets /// - All functions used in SQL exist in their datasets /// - Table schemas are compatible with SQL queries +/// +/// # Panics +/// +/// Panics if `topological_sort` returns a table name that was not in the original +/// statements or manifest tables maps. This is structurally impossible because the +/// sort only returns keys from its input, which are derived from these maps. // TODO: This validation logic was moved here from datasets-derived as part of a refactoring // to break the dependency between datasets-derived and common. This should eventually be // moved to a more appropriate location. @@ -281,22 +288,23 @@ pub async fn validate_derived_manifest( source: err, })?; - // Extract table references - let table_refs = resolve_table_references::(&stmt).map_err(|err| match &err { - ResolveTableReferencesError::InvalidTableName { .. } => { - ManifestValidationError::InvalidTableName(err) - } - ResolveTableReferencesError::CatalogQualifiedTable { .. } => { - ManifestValidationError::CatalogQualifiedTableInSql { + // Extract table references (using DepAliasOrSelfRef to support `self.` inter-table refs) + let table_refs = + resolve_table_references::(&stmt).map_err(|err| match &err { + ResolveTableReferencesError::InvalidTableName { .. } => { + ManifestValidationError::InvalidTableName(err) + } + ResolveTableReferencesError::CatalogQualifiedTable { .. } => { + ManifestValidationError::CatalogQualifiedTableInSql { + table_name: table_name.clone(), + source: err, + } + } + _ => ManifestValidationError::TableReferenceResolution { table_name: table_name.clone(), source: err, - } - } - _ => ManifestValidationError::TableReferenceResolution { - table_name: table_name.clone(), - source: err, - }, - })?; + }, + })?; // Reject tables whose SQL references no source tables (e.g., `SELECT 1`). // Derived tables must reference at least one external dependency or sibling table. @@ -306,14 +314,16 @@ pub async fn validate_derived_manifest( }); } - // Validate dependency aliases in table references before catalog creation + // Validate dependency aliases in table references before catalog creation. + // Skip `self` schema refs — those are inter-table references, not external deps. for table_ref in &table_refs { if let TableReference::Partial { schema, .. } = table_ref - && !dependencies.contains_key(schema.as_ref()) + && let DepAliasOrSelfRef::DepAlias(dep_alias) = schema.as_ref() + && !dependencies.contains_key(dep_alias) { return Err(ManifestValidationError::DependencyAliasNotFound { table_name: table_name.clone(), - alias: schema.to_string(), + alias: dep_alias.to_string(), }); } } @@ -343,22 +353,49 @@ pub async fn validate_derived_manifest( statements.insert(table_name.clone(), stmt); } - // Step 3: Create planning context to validate all table and function references - // This validates: - // - All table references resolve to existing tables in dependencies - // - All function references resolve to existing functions in dependencies - // - Bare function references can be created as UDFs or are assumed to be built-ins - // - Table references use valid dataset aliases from dependencies - // - Schema compatibility across dependencies + // Step 2b: Extract inter-table dependencies and determine processing order. + // `self.`-qualified table references that match sibling table names are inter-table deps. + let table_order = { + let mut deps: BTreeMap> = BTreeMap::new(); + for (table_name, (table_refs, _func_refs)) in &references { + let mut table_deps = Vec::new(); + for table_ref in table_refs { + if let TableReference::Partial { schema, table } = table_ref + && schema.as_ref().is_self() + { + if table.as_ref() == table_name { + return Err(ManifestValidationError::SelfReferencingTable { + table_name: table_name.clone(), + }); + } + if manifest.tables.contains_key(table.as_ref()) { + table_deps.push(table.as_ref().clone()); + } else { + return Err(ManifestValidationError::SelfRefTableNotFound { + source_table: table_name.clone(), + referenced_table: table.as_ref().clone(), + }); + } + } + } + deps.insert(table_name.clone(), table_deps); + } + sorting::topological_sort(deps).map_err(ManifestValidationError::CyclicDependency)? + }; + + // Step 3: Create planning context to validate all table and function references. + // Inter-table references use `self.` syntax, which resolves through the + // SelfSchemaProvider registered under the "self" schema in the AmpCatalogProvider. let session_config = default_session_config().map_err(ManifestValidationError::SessionConfig)?; let dep_aliases: BTreeMap = dependencies .iter() .map(|(alias, hash_ref)| (alias.to_string(), hash_ref.clone())) .collect(); - let self_schema: Arc = Arc::new( - SelfSchemaProvider::from_manifest_udfs(IsolatePool::dummy(), &manifest.functions), - ); + let self_schema_provider = Arc::new(SelfSchemaProvider::from_manifest_udfs( + IsolatePool::dummy(), + &manifest.functions, + )); let amp_catalog = Arc::new( AmpCatalogProvider::new( datasets_cache.clone(), @@ -366,18 +403,22 @@ pub async fn validate_derived_manifest( IsolatePool::dummy(), ) .with_dep_aliases(dep_aliases) - .with_self_schema(self_schema), + .with_self_schema(self_schema_provider.clone() as Arc), ); let planning_ctx = PlanContextBuilder::new(session_config) .with_table_catalog(AMP_CATALOG_NAME, amp_catalog.clone()) .with_func_catalog(AMP_CATALOG_NAME, amp_catalog) .build(); - // Step 4: Validate that all table SQL queries are incremental. - // Incremental processing is required for derived datasets to efficiently update - // as new blocks arrive. This check ensures no non-incremental operations are used. - // Use cached parsed statements from Step 2 to avoid re-parsing. - for (table_name, stmt) in statements { + // Step 4: Validate that all table SQL queries are incremental, in topological order. + // After validating each table, register its manifest schema so subsequent tables + // can reference it via `self.`. + for table_name in table_order { + // topological_sort only returns keys from the input map + let stmt = statements + .remove(&table_name) + .expect("topological_sort returned unknown table"); + // Plan the SQL query to a logical plan let plan = planning_ctx.statement_to_plan(stmt).await.map_err(|err| { ManifestValidationError::SqlPlanningError { @@ -387,12 +428,20 @@ pub async fn validate_derived_manifest( })?; // Validate that the plan can be processed incrementally - // This checks for non-incremental operations like aggregations, sorts, limits, outer joins, etc. plan.is_incremental() .map_err(|err| ManifestValidationError::NonIncrementalSql { table_name: table_name.clone(), source: err, })?; + + // Register the table's manifest schema so subsequent tables can reference it. + // table_name comes from the manifest's own tables + let table = manifest + .tables + .get(&table_name) + .expect("manifest table missing after topological sort"); + let schema = table.schema.arrow.clone().into_schema_ref(); + self_schema_provider.add_table(table_name.as_str(), schema); } Ok(()) @@ -421,7 +470,7 @@ pub enum ManifestValidationError { /// The table whose SQL query contains unresolvable table references table_name: TableName, #[source] - source: ResolveTableReferencesError, + source: ResolveTableReferencesError, }, /// Failed to resolve function references from SQL query @@ -493,15 +542,15 @@ pub enum ManifestValidationError { /// The table whose SQL query contains a catalog-qualified table reference table_name: TableName, #[source] - source: ResolveTableReferencesError, + source: ResolveTableReferencesError, }, /// Invalid table name /// /// Table name does not conform to SQL identifier rules (must start with letter/underscore, /// contain only alphanumeric/underscore/dollar, and be <= 63 bytes). - #[error("Invalid table name in SQL query")] - InvalidTableName(#[source] ResolveTableReferencesError), + #[error("Invalid table name in SQL query: {0}")] + InvalidTableName(#[source] ResolveTableReferencesError), /// Dependency alias not found /// @@ -516,6 +565,34 @@ pub enum ManifestValidationError { alias: String, }, + /// A table references itself via `self.` + /// + /// This occurs when a table's SQL query contains `self.`, which would + /// create a trivial circular dependency. Tables cannot reference themselves. + #[error("Table '{table_name}' references itself via self.{table_name}")] + SelfReferencingTable { + /// The table that references itself + table_name: TableName, + }, + + /// A `self.`-qualified table reference targets a table that does not exist in the dataset. + #[error( + "Table '{source_table}' references non-existent sibling table 'self.{referenced_table}'" + )] + SelfRefTableNotFound { + /// The table whose SQL contains the invalid self-ref + source_table: TableName, + /// The referenced table name that does not exist + referenced_table: TableName, + }, + + /// Cyclic dependency detected among tables within the dataset + /// + /// This occurs when tables reference each other in a cycle (e.g., table_a references + /// table_b which references table_a). Inter-table dependencies must form a DAG. + #[error("Cyclic dependency detected among inter-table references: {0}")] + CyclicDependency(#[source] CyclicDepError), + /// Non-incremental SQL operation in table query /// /// This occurs when a table's SQL query contains operations that cannot be diff --git a/crates/services/admin-api/src/handlers/schema.rs b/crates/services/admin-api/src/handlers/schema.rs index b6e706122..8198ecaee 100644 --- a/crates/services/admin-api/src/handlers/schema.rs +++ b/crates/services/admin-api/src/handlers/schema.rs @@ -12,18 +12,19 @@ use common::{ incrementalizer::NonIncrementalQueryError, plan_visitors::prepend_special_block_num_field, self_schema_provider::SelfSchemaProvider, - sql::{ResolveTableReferencesError, resolve_table_references}, + sql::{self, ResolveTableReferencesError, TableReference}, sql_str::SqlStr, }; -use datafusion::sql::parser::Statement; +use datafusion::{arrow, sql::parser::Statement}; use datasets_common::{ hash_reference::HashReference, network_id::NetworkId, table_name::TableName, }; use datasets_derived::{ - deps::{DepAlias, DepAliasError, DepReference, HashOrVersion}, + deps::{DepAlias, DepAliasOrSelfRef, DepAliasOrSelfRefError, DepReference, HashOrVersion}, func_name::FuncName, function::Function, manifest::TableSchema, + sorting::{self, CyclicDepError}, }; use js_runtime::isolate_pool::IsolatePool; use tracing::instrument; @@ -55,6 +56,8 @@ use crate::{ /// - `EMPTY_TABLES_AND_FUNCTIONS`: No tables or functions provided (at least one is required) /// - `INVALID_TABLE_SQL`: SQL syntax error in table definition /// - `NON_INCREMENTAL_QUERY`: SQL query is non-incremental +/// - `SELF_REFERENCING_TABLE`: A table references itself via `self.` +/// - `CYCLIC_DEPENDENCY`: Inter-table dependencies form a cycle /// - `TABLE_REFERENCE_RESOLUTION`: Failed to extract table references from SQL /// - `NO_TABLE_REFERENCES`: Table SQL does not reference any source tables /// - `FUNCTION_REFERENCE_RESOLUTION`: Failed to extract function references from SQL @@ -84,6 +87,12 @@ use crate::{ /// 4. **Infer Schema**: Uses DataFusion's query planner to determine output schema without executing the query /// 5. **Prepend Special Fields**: Adds `RESERVED_BLOCK_NUM_COLUMN_NAME` field to the output schema /// 6. **Extract Networks**: Identifies which blockchain networks are referenced by the query +/// +/// # Panics +/// +/// Panics if `topological_sort` returns a table name that was not in the original +/// statements map. This is structurally impossible because the sort only returns +/// keys from its input. #[instrument(skip_all, err)] #[cfg_attr( feature = "utoipa", @@ -192,7 +201,7 @@ pub async fn handler( }; // Parse all SQL queries from tables and extract table references and function names - let statements = { + let mut statements = { let mut statements: BTreeMap = BTreeMap::new(); for (table_name, sql_query) in tables { @@ -207,17 +216,82 @@ pub async fn handler( statements }; + // Extract inter-table dependencies and determine processing order. + // `self.`-qualified table references that match sibling table names are inter-table deps. + let table_order = { + let mut deps: BTreeMap> = BTreeMap::new(); + for (table_name, stmt) in &statements { + let table_refs = sql::resolve_table_references::(stmt).map_err( + |err| match &err { + ResolveTableReferencesError::InvalidTableName { .. } => { + Error::InvalidTableName { + table_name: table_name.clone(), + source: err, + } + } + ResolveTableReferencesError::CatalogQualifiedTable { .. } => { + Error::CatalogQualifiedTableInSql { + table_name: table_name.clone(), + source: err, + } + } + _ => Error::TableReferenceResolution { + table_name: table_name.clone(), + source: err, + }, + }, + )?; + + // Reject tables whose SQL references no source tables (e.g., `SELECT 1`). + // Derived tables must reference at least one external dependency or sibling table. + if table_refs.is_empty() { + return Err(Error::NoTableReferences { + table_name: table_name.clone(), + } + .into()); + } + + let mut table_deps = Vec::new(); + for table_ref in table_refs { + if let TableReference::Partial { schema, table } = &table_ref + && schema.as_ref().is_self() + { + if table.as_ref() == table_name { + return Err(Error::SelfReferencingTable { + table_name: table_name.clone(), + } + .into()); + } + if statements.contains_key(table.as_ref()) { + table_deps.push(table.as_ref().clone()); + } else { + return Err(Error::SelfRefTableNotFound { + source_table: table_name.clone(), + referenced_table: table.as_ref().clone(), + } + .into()); + } + } + } + deps.insert(table_name.clone(), table_deps); + } + sorting::topological_sort(deps).map_err(Error::CyclicDependency)? + }; + // Build dep_aliases for AmpCatalogProvider before dependencies is consumed let dep_aliases: BTreeMap = dependencies .iter() .map(|(alias, hash_ref)| (alias.to_string(), hash_ref.clone())) .collect(); - // Create planning context with self-schema provider + // Create planning context with self-schema provider. + // Inter-table references use `self.` syntax, which resolves through the + // SelfSchemaProvider registered under the "self" schema in the AmpCatalogProvider. let session_config = default_session_config().map_err(Error::SessionConfig)?; - let self_schema: Arc = Arc::new( - SelfSchemaProvider::from_manifest_udfs(IsolatePool::dummy(), &functions), - ); + let self_schema_provider = Arc::new(SelfSchemaProvider::from_manifest_udfs( + IsolatePool::dummy(), + &functions, + )); let amp_catalog = Arc::new( AmpCatalogProvider::new( ctx.datasets_cache.clone(), @@ -225,30 +299,22 @@ pub async fn handler( IsolatePool::dummy(), ) .with_dep_aliases(dep_aliases) - .with_self_schema(self_schema), + .with_self_schema(self_schema_provider.clone() as Arc), ); let planning_ctx = PlanContextBuilder::new(session_config) .with_table_catalog(AMP_CATALOG_NAME, amp_catalog.clone()) .with_func_catalog(AMP_CATALOG_NAME, amp_catalog) .build(); - // Infer schema for each table and extract networks + // Infer schema for each table in topological order. + // After inferring a table's schema, register it with the self-schema provider + // so that subsequent tables can reference it via `self.`. let mut schemas = BTreeMap::new(); - for (table_name, stmt) in statements { - // Extract table references and reject tables with no source tables - let table_refs = resolve_table_references::(&stmt).map_err(|err| { - Error::TableReferenceResolution { - table_name: table_name.clone(), - source: err, - } - })?; - if table_refs.is_empty() { - return Err(Error::NoTableReferences { - table_name: table_name.clone(), - } - .into()); - } - + for table_name in table_order { + // topological_sort only returns keys from the input map + let stmt = statements + .remove(&table_name) + .expect("topological_sort returned unknown table"); let plan = planning_ctx.statement_to_plan(stmt).await.map_err(|err| { Error::SchemaPlanInference { table_name: table_name.clone(), @@ -267,6 +333,10 @@ pub async fn handler( // Prepend the special block number field let schema = prepend_special_block_num_field(&schema); + // Register the inferred schema so subsequent tables can reference this table + let arrow_schema: Arc = Arc::new(schema.as_arrow().clone()); + self_schema_provider.add_table(table_name.as_str(), arrow_schema); + schemas.insert( table_name, TableSchemaWithNetworks { @@ -376,13 +446,81 @@ enum Error { source: common::sql::ParseSqlError, }, + /// SQL query contains non-incremental operations + /// + /// This occurs when a table's SQL uses operations that cannot be processed incrementally + /// (e.g., aggregations, sorts, limits, outer joins, window functions, distinct). #[error("Table '{table_name}' contains non-incremental SQL: {source}")] NonIncrementalQuery { + /// The table whose SQL query contains non-incremental operations table_name: TableName, #[source] source: NonIncrementalQueryError, }, + /// A table references itself via `self.` + /// + /// This occurs when a table's SQL query contains `self.`, which would + /// create a trivial circular dependency. Tables cannot reference themselves. + #[error("Table '{table_name}' references itself via self.{table_name}")] + SelfReferencingTable { + /// The table that references itself + table_name: TableName, + }, + + /// A `self.`-qualified table reference targets a table that does not exist in the dataset. + #[error( + "Table '{source_table}' references non-existent sibling table 'self.{referenced_table}'" + )] + SelfRefTableNotFound { + /// The table whose SQL contains the invalid self-ref + source_table: TableName, + /// The referenced table name that does not exist + referenced_table: TableName, + }, + + /// Cyclic dependency detected among tables within the dataset + /// + /// This occurs when tables reference each other in a cycle (e.g., table_a references + /// table_b which references table_a). Inter-table dependencies must form a DAG. + #[error("Cyclic dependency detected among inter-table references: {0}")] + CyclicDependency(#[source] CyclicDepError), + + /// Catalog-qualified tables (e.g., `catalog.schema.table`) are not supported. + /// + /// This occurs during SQL parsing when a 3-part table reference is detected. + #[error("Catalog-qualified table reference in table '{table_name}': {source}")] + CatalogQualifiedTableInSql { + /// The table whose SQL query contains a catalog-qualified table reference + table_name: TableName, + #[source] + source: ResolveTableReferencesError, + }, + + /// Table name in SQL reference has invalid format. + /// + /// This occurs when a table name extracted from SQL does not conform to identifier rules. + #[error("Invalid table name in table '{table_name}': {source}")] + InvalidTableName { + /// The table whose SQL query contains an invalid table name + table_name: TableName, + #[source] + source: ResolveTableReferencesError, + }, + + /// Failed to extract table references from SQL query + /// + /// This occurs when resolving table references from a parsed SQL statement fails + /// for reasons other than catalog qualification or invalid table names. + #[error("Failed to extract table references from table '{table_name}': {source}")] + TableReferenceResolution { + /// The table whose SQL query failed reference extraction + table_name: TableName, + /// The underlying error + #[source] + source: ResolveTableReferencesError, + }, + /// Dependency not found in dataset store /// /// This occurs when: @@ -426,18 +564,6 @@ enum Error { source: amp_datasets_registry::error::ResolveRevisionError, }, - /// Failed to extract table references from SQL query - /// - /// This occurs when extracting table references from the parsed SQL fails, - /// typically due to unsupported table reference formats. - #[error("Failed to extract table references for table '{table_name}': {source}")] - TableReferenceResolution { - /// The table whose SQL query failed table reference extraction - table_name: TableName, - #[source] - source: ResolveTableReferencesError, - }, - /// Table SQL does not reference any source tables /// /// This occurs when a derived table's SQL query contains no table references @@ -494,10 +620,15 @@ impl IntoErrorResponse for Error { Error::EmptyTablesAndFunctions => "EMPTY_TABLES_AND_FUNCTIONS", Error::InvalidTableSql { .. } => "INVALID_TABLE_SQL", Error::NonIncrementalQuery { .. } => "NON_INCREMENTAL_QUERY", + Error::SelfReferencingTable { .. } => "SELF_REFERENCING_TABLE", + Error::SelfRefTableNotFound { .. } => "SELF_REF_TABLE_NOT_FOUND", + Error::CyclicDependency(_) => "CYCLIC_DEPENDENCY", + Error::CatalogQualifiedTableInSql { .. } => "CATALOG_QUALIFIED_TABLE", + Error::InvalidTableName { .. } => "INVALID_TABLE_NAME", + Error::TableReferenceResolution { .. } => "TABLE_REFERENCE_RESOLUTION", Error::DependencyNotFound { .. } => "DEPENDENCY_NOT_FOUND", Error::DependencyManifestLinkCheck { .. } => "DEPENDENCY_MANIFEST_LINK_CHECK", Error::DependencyVersionResolution { .. } => "DEPENDENCY_VERSION_RESOLUTION", - Error::TableReferenceResolution { .. } => "TABLE_REFERENCE_RESOLUTION", Error::NoTableReferences { .. } => "NO_TABLE_REFERENCES", Error::SessionConfig(_) => "SESSION_CONFIG_ERROR", Error::SchemaPlanInference { source, .. } if is_user_input_error(source) => { @@ -513,10 +644,15 @@ impl IntoErrorResponse for Error { Error::EmptyTablesAndFunctions => StatusCode::BAD_REQUEST, Error::InvalidTableSql { .. } => StatusCode::BAD_REQUEST, Error::NonIncrementalQuery { .. } => StatusCode::BAD_REQUEST, + Error::SelfReferencingTable { .. } => StatusCode::BAD_REQUEST, + Error::SelfRefTableNotFound { .. } => StatusCode::BAD_REQUEST, + Error::CyclicDependency(_) => StatusCode::BAD_REQUEST, + Error::CatalogQualifiedTableInSql { .. } => StatusCode::BAD_REQUEST, + Error::InvalidTableName { .. } => StatusCode::BAD_REQUEST, + Error::TableReferenceResolution { .. } => StatusCode::BAD_REQUEST, Error::DependencyNotFound { .. } => StatusCode::NOT_FOUND, Error::DependencyManifestLinkCheck { .. } => StatusCode::INTERNAL_SERVER_ERROR, Error::DependencyVersionResolution { .. } => StatusCode::INTERNAL_SERVER_ERROR, - Error::TableReferenceResolution { .. } => StatusCode::BAD_REQUEST, Error::NoTableReferences { .. } => StatusCode::BAD_REQUEST, Error::SessionConfig(_) => StatusCode::INTERNAL_SERVER_ERROR, Error::SchemaPlanInference { source, .. } if is_user_input_error(source) => { diff --git a/docs/feat/data-inter-table-dependencies.md b/docs/feat/data-inter-table-dependencies.md new file mode 100644 index 000000000..995c51a96 --- /dev/null +++ b/docs/feat/data-inter-table-dependencies.md @@ -0,0 +1,126 @@ +--- +name: "data-inter-table-dependencies" +description: "Inter-table dependencies within derived datasets: self-qualified table references, topological ordering, cycle detection. Load when asking about tables referencing other tables in the same dataset" +type: feature +status: unstable +components: "service:admin-api,service:server,crate:common,crate:datasets-derived,crate:worker-datasets-derived" +--- + +# Inter-Table Dependencies + +## Summary + +Derived datasets can contain tables that reference other tables within the same dataset. For example, `table_b` can `SELECT` from `table_a` using a `self.`-qualified table reference (e.g., `SELECT * FROM self.table_a`). This syntax is consistent with how UDFs reference same-dataset functions (e.g., `self.myFunction()`). The system validates that inter-table dependencies form a directed acyclic graph (DAG) and rejects cyclic references with a `CYCLIC_DEPENDENCY` error. + +## Table of Contents + +1. [Key Concepts](#key-concepts) +2. [Usage](#usage) +3. [Architecture](#architecture) +4. [API Reference](#api-reference) +5. [Implementation](#implementation) +6. [Limitations](#limitations) + +## Key Concepts + +- **Inter-table dependency**: A table within a derived dataset that references another table in the same dataset via its SQL query +- **Self-qualified table reference**: A `self.`-prefixed SQL table name (e.g., `SELECT * FROM self.my_table`) that resolves to a sibling table in the same dataset, consistent with the `self.` convention used for UDFs +- **Topological ordering**: Processing tables in dependency order so that each table is validated after all tables it depends on have been validated +- **Cycle detection**: Verification that inter-table dependencies do not form circular references (e.g., table_a depends on table_b which depends on table_a) + +## Usage + +### Defining inter-table dependencies + +Reference sibling tables using `self.`-qualified table names in SQL queries: + +```typescript +export default defineDataset(() => ({ + name: "my_dataset", + network: "mainnet", + dependencies: { + eth_firehose: "_/eth_firehose@0.0.0", + }, + tables: { + // Base table: references an external dependency (qualified by dep alias) + blocks_base: { + sql: "SELECT block_num, gas_limit, miner FROM eth_firehose.blocks", + network: "mainnet", + }, + // Derived table: references a sibling table (self-qualified) + blocks_summary: { + sql: "SELECT block_num, miner FROM self.blocks_base", + network: "mainnet", + }, + }, +})) +``` + +External dependencies use dependency-alias-qualified references (`eth_firehose.blocks`), while inter-table references use the `self.` prefix (`self.blocks_base`). This is consistent with how UDFs reference same-dataset functions (`self.myFunction()`). + +### Cycle detection errors + +If tables form a circular dependency, both the schema endpoint and manifest registration return a `CYCLIC_DEPENDENCY` error: + +```json +{ + "error_code": "CYCLIC_DEPENDENCY", + "error_message": "Cyclic dependency detected among inter-table references: ..." +} +``` + +## Architecture + +### Validation flow + +1. **Parse SQL** for all tables in the dataset +2. **Extract `self.`-qualified table references** from each table's SQL that match sibling table names +3. **Build dependency graph** mapping each table to its intra-dataset dependencies +4. **Topological sort** the graph, detecting cycles (returns `CYCLIC_DEPENDENCY` on failure) +5. **Process tables in topological order**: validate each table's SQL, infer its schema, and register it with the self-schema provider so subsequent tables can reference it + +### Self-schema resolution + +The `SelfSchemaProvider` is registered under the `"self"` schema in the `AmpCatalogProvider`. When a SQL query contains `self.table_name`, DataFusion resolves it through this provider, which holds schemas for already-processed sibling tables. This is the same mechanism used for UDF resolution (`self.functionName()`). + +## API Reference + +Inter-table dependency validation occurs at two endpoints: + +| Endpoint | Method | Validation | +|----------|--------|------------| +| `/schema` | POST | Schema inference with inter-table resolution | +| `/manifests` | POST | Manifest registration with inter-table validation | + +### Error codes + +| Code | Status | Description | +|------|--------|-------------| +| `SELF_REFERENCING_TABLE` | 400 | A table references itself via `self.` | +| `SELF_REF_TABLE_NOT_FOUND` | 400 | A `self.`-qualified reference targets a table that does not exist in the dataset | +| `CYCLIC_DEPENDENCY` | 400 | Inter-table references form a cycle | +| `NO_TABLE_REFERENCES` | 400 | Table SQL references no source tables (e.g., `SELECT 1`) | +| `CATALOG_QUALIFIED_TABLE` | 400 | Table uses unsupported 3-part catalog-qualified reference (e.g., `catalog.schema.table`) | +| `INVALID_TABLE_NAME` | 400 | Table name in SQL reference does not conform to identifier rules | + +## Implementation + +### Source files + +#### Validation (admin API) + +- `crates/core/common/src/self_schema_provider.rs` - `SelfSchemaProvider::add_table()` for progressive schema registration +- `crates/services/admin-api/src/handlers/schema.rs` - Schema endpoint with topological ordering and cycle detection +- `crates/services/admin-api/src/handlers/common.rs` - Manifest validation with topological ordering and cycle detection +- `crates/core/datasets-derived/src/sorting.rs` - `topological_sort()` and `CyclicDepError` used by both handlers + +#### Runtime (dump and query) + +- `crates/core/worker-datasets-derived/src/job_impl.rs` - Passes sibling `PhysicalTable` map to each table's materialization +- `crates/core/worker-datasets-derived/src/job_impl/table.rs` - Splits `self.` refs from external deps, registers sibling tables in both planning and execution phases +- `crates/core/common/src/catalog/physical/for_dump.rs` - `resolve_external_deps()` + `build_catalog()` for composable catalog construction + +## Limitations + +- Inter-table references must use the `self.` prefix; dataset-name-qualified self-references (e.g., `my_dataset.my_table`) are not supported because the dataset name is not yet assigned at validation time +- All tables referenced via `self.` must exist in the same dataset definition; `self.` references to non-existent tables will cause planning errors diff --git a/docs/openapi-specs/admin.spec.json b/docs/openapi-specs/admin.spec.json index bac9c92f5..092acda72 100644 --- a/docs/openapi-specs/admin.spec.json +++ b/docs/openapi-specs/admin.spec.json @@ -2264,7 +2264,7 @@ "schema" ], "summary": "Handler for the `POST /schema` endpoint", - "description": "Analyzes SQL queries and returns the output schema without executing the query.\nPerforms comprehensive validation and schema inference using real registered datasets\nand their actual schemas.\n\n## Request Body\n- `dependencies`: External dataset dependencies mapped by alias\n- `tables`: Table definitions mapped by table name (optional if functions provided)\n- `functions`: Function names defined in dataset config (optional if tables provided)\n\n## Response\n- **200 OK**: Returns the inferred schema and networks referenced by the query\n- **400 Bad Request**: Invalid SQL syntax, table references, or function format\n- **404 Not Found**: Referenced dataset does not exist\n- **500 Internal Server Error**: Dataset store, planning, or internal errors\n\n## Error Codes\n- `INVALID_PAYLOAD_FORMAT`: Request JSON is malformed or missing required fields\n- `EMPTY_TABLES_AND_FUNCTIONS`: No tables or functions provided (at least one is required)\n- `INVALID_TABLE_SQL`: SQL syntax error in table definition\n- `NON_INCREMENTAL_QUERY`: SQL query is non-incremental\n- `TABLE_REFERENCE_RESOLUTION`: Failed to extract table references from SQL\n- `NO_TABLE_REFERENCES`: Table SQL does not reference any source tables\n- `FUNCTION_REFERENCE_RESOLUTION`: Failed to extract function references from SQL\n- `DEPENDENCY_NOT_FOUND`: Referenced dependency does not exist\n- `DEPENDENCY_MANIFEST_LINK_CHECK`: Failed to verify manifest link for dependency\n- `DEPENDENCY_VERSION_RESOLUTION`: Failed to resolve version for dependency\n- `CATALOG_QUALIFIED_TABLE`: Table uses unsupported catalog qualification\n- `UNQUALIFIED_TABLE`: Table missing required dataset qualification\n- `INVALID_TABLE_NAME`: Table name violates SQL identifier rules\n- `INVALID_DEPENDENCY_ALIAS_FOR_TABLE_REF`: Dependency alias in table reference is invalid\n- `INVALID_DEPENDENCY_ALIAS_FOR_FUNCTION_REF`: Dependency alias in function reference is invalid\n- `CATALOG_QUALIFIED_FUNCTION`: Function uses unsupported catalog qualification\n- `DEPENDENCY_ALIAS_NOT_FOUND`: Referenced alias not in dependencies\n- `DATASET_NOT_FOUND`: Referenced dataset does not exist\n- `GET_DATASET_ERROR`: Failed to retrieve dataset from store\n- `ETH_CALL_UDF_CREATION_ERROR`: Failed to create eth_call UDF\n- `TABLE_NOT_FOUND_IN_DATASET`: Table not found in referenced dataset\n- `FUNCTION_NOT_FOUND_IN_DATASET`: Function not found in referenced dataset\n- `ETH_CALL_NOT_AVAILABLE`: eth_call function not available for dataset\n- `SESSION_CONFIG_ERROR`: Failed to create DataFusion session configuration\n- `SCHEMA_INFERENCE`: Failed to infer output schema from query\n\n## Schema Analysis Process\n1. **Parse SQL**: Validates syntax using DataFusion's SQL parser\n2. **Load Datasets**: Retrieves dataset definitions from the registry for all referenced datasets\n3. **Create Planning Context**: Builds planning context with real table schemas from stored datasets\n4. **Infer Schema**: Uses DataFusion's query planner to determine output schema without executing the query\n5. **Prepend Special Fields**: Adds `RESERVED_BLOCK_NUM_COLUMN_NAME` field to the output schema\n6. **Extract Networks**: Identifies which blockchain networks are referenced by the query", + "description": "Analyzes SQL queries and returns the output schema without executing the query.\nPerforms comprehensive validation and schema inference using real registered datasets\nand their actual schemas.\n\n## Request Body\n- `dependencies`: External dataset dependencies mapped by alias\n- `tables`: Table definitions mapped by table name (optional if functions provided)\n- `functions`: Function names defined in dataset config (optional if tables provided)\n\n## Response\n- **200 OK**: Returns the inferred schema and networks referenced by the query\n- **400 Bad Request**: Invalid SQL syntax, table references, or function format\n- **404 Not Found**: Referenced dataset does not exist\n- **500 Internal Server Error**: Dataset store, planning, or internal errors\n\n## Error Codes\n- `INVALID_PAYLOAD_FORMAT`: Request JSON is malformed or missing required fields\n- `EMPTY_TABLES_AND_FUNCTIONS`: No tables or functions provided (at least one is required)\n- `INVALID_TABLE_SQL`: SQL syntax error in table definition\n- `NON_INCREMENTAL_QUERY`: SQL query is non-incremental\n- `SELF_REFERENCING_TABLE`: A table references itself via `self.`\n- `CYCLIC_DEPENDENCY`: Inter-table dependencies form a cycle\n- `TABLE_REFERENCE_RESOLUTION`: Failed to extract table references from SQL\n- `NO_TABLE_REFERENCES`: Table SQL does not reference any source tables\n- `FUNCTION_REFERENCE_RESOLUTION`: Failed to extract function references from SQL\n- `DEPENDENCY_NOT_FOUND`: Referenced dependency does not exist\n- `DEPENDENCY_MANIFEST_LINK_CHECK`: Failed to verify manifest link for dependency\n- `DEPENDENCY_VERSION_RESOLUTION`: Failed to resolve version for dependency\n- `CATALOG_QUALIFIED_TABLE`: Table uses unsupported catalog qualification\n- `UNQUALIFIED_TABLE`: Table missing required dataset qualification\n- `INVALID_TABLE_NAME`: Table name violates SQL identifier rules\n- `INVALID_DEPENDENCY_ALIAS_FOR_TABLE_REF`: Dependency alias in table reference is invalid\n- `INVALID_DEPENDENCY_ALIAS_FOR_FUNCTION_REF`: Dependency alias in function reference is invalid\n- `CATALOG_QUALIFIED_FUNCTION`: Function uses unsupported catalog qualification\n- `DEPENDENCY_ALIAS_NOT_FOUND`: Referenced alias not in dependencies\n- `DATASET_NOT_FOUND`: Referenced dataset does not exist\n- `GET_DATASET_ERROR`: Failed to retrieve dataset from store\n- `ETH_CALL_UDF_CREATION_ERROR`: Failed to create eth_call UDF\n- `TABLE_NOT_FOUND_IN_DATASET`: Table not found in referenced dataset\n- `FUNCTION_NOT_FOUND_IN_DATASET`: Function not found in referenced dataset\n- `ETH_CALL_NOT_AVAILABLE`: eth_call function not available for dataset\n- `SESSION_CONFIG_ERROR`: Failed to create DataFusion session configuration\n- `SCHEMA_INFERENCE`: Failed to infer output schema from query\n\n## Schema Analysis Process\n1. **Parse SQL**: Validates syntax using DataFusion's SQL parser\n2. **Load Datasets**: Retrieves dataset definitions from the registry for all referenced datasets\n3. **Create Planning Context**: Builds planning context with real table schemas from stored datasets\n4. **Infer Schema**: Uses DataFusion's query planner to determine output schema without executing the query\n5. **Prepend Special Fields**: Adds `RESERVED_BLOCK_NUM_COLUMN_NAME` field to the output schema\n6. **Extract Networks**: Identifies which blockchain networks are referenced by the query\n\n# Panics\n\nPanics if `topological_sort` returns a table name that was not in the original\nstatements map. This is structurally impossible because the sort only returns\nkeys from its input.", "operationId": "schema_analyze", "requestBody": { "content": { diff --git a/tests/config/packages/intra_deps/amp.config.ts b/tests/config/packages/intra_deps/amp.config.ts index fcf129555..fc14071f7 100644 --- a/tests/config/packages/intra_deps/amp.config.ts +++ b/tests/config/packages/intra_deps/amp.config.ts @@ -14,12 +14,12 @@ export default defineDataset(() => ({ }, // derived table -- Added aa prefix, it should be dumped after zz_blocks_base aa_blocks_derived: { - sql: "SELECT block_num, gas_limit, gas_used, miner, hash, parent_hash FROM intra_deps.zz_blocks_base", + sql: "SELECT block_num, gas_limit, gas_used, miner, hash, parent_hash FROM self.zz_blocks_base", network: "mainnet", }, // derived table -- Added mm prefix, it should be dumped after aa_blocks_derived mm_blocks_derived: { - sql: "SELECT block_num, miner, hash, parent_hash FROM intra_deps.aa_blocks_derived", + sql: "SELECT block_num, miner, hash, parent_hash FROM self.aa_blocks_derived", network: "mainnet", }, }, diff --git a/tests/src/tests/it_admin_api_schema.rs b/tests/src/tests/it_admin_api_schema.rs index 9445e26cd..54c8cbe59 100644 --- a/tests/src/tests/it_admin_api_schema.rs +++ b/tests/src/tests/it_admin_api_schema.rs @@ -77,8 +77,8 @@ async fn resolve_schema_with_catalog_qualified_table_fails() { .expect("failed to parse error response JSON"); assert_eq!( - response.error_code, "TABLE_REFERENCE_RESOLUTION", - "should return TABLE_REFERENCE_RESOLUTION for catalog-qualified table" + response.error_code, "CATALOG_QUALIFIED_TABLE", + "should return CATALOG_QUALIFIED_TABLE for catalog-qualified table" ); assert!( response @@ -652,8 +652,8 @@ async fn multiple_tables_catalog_qualified_fails() { .expect("failed to parse error response JSON"); assert_eq!( - response.error_code, "TABLE_REFERENCE_RESOLUTION", - "should return TABLE_REFERENCE_RESOLUTION for catalog-qualified table" + response.error_code, "CATALOG_QUALIFIED_TABLE", + "should return CATALOG_QUALIFIED_TABLE for catalog-qualified table" ); } @@ -1760,6 +1760,224 @@ async fn resolve_schema_with_no_table_references_fails() { ); } +// ============================================================================ +// Inter-table dependency tests +// ============================================================================ + +#[tokio::test] +async fn resolve_schema_with_self_qualified_inter_table_ref_succeeds() { + //* Given + let ctx = TestCtx::setup( + "inter_table_self_ref", + [("eth_firehose", "_/eth_firehose@0.0.1")], + ) + .await; + + //* When — table_b references table_a via self. + let resp = ctx + .send_schema_request_with_tables_and_deps( + [ + ("table_a", "SELECT block_num, hash FROM eth.blocks"), + ("table_b", "SELECT block_num FROM self.table_a"), + ], + [("eth", "_/eth_firehose@0.0.1")], + ) + .await; + + //* Then + assert_eq!( + resp.status(), + StatusCode::OK, + "schema resolution should succeed with self-qualified inter-table reference" + ); + + let _response: SchemaResponse = resp + .json() + .await + .expect("failed to parse schema response JSON"); +} + +#[tokio::test] +async fn resolve_schema_with_chained_inter_table_deps_succeeds() { + //* Given + let ctx = TestCtx::setup( + "inter_table_chain_three", + [("eth_firehose", "_/eth_firehose@0.0.1")], + ) + .await; + + //* When — C depends on B, B depends on A + let resp = ctx + .send_schema_request_with_tables_and_deps( + [ + ("table_a", "SELECT block_num, hash FROM eth.blocks"), + ("table_b", "SELECT block_num, hash FROM self.table_a"), + ("table_c", "SELECT block_num FROM self.table_b"), + ], + [("eth", "_/eth_firehose@0.0.1")], + ) + .await; + + //* Then + assert_eq!( + resp.status(), + StatusCode::OK, + "schema resolution should succeed with chain of three inter-table deps" + ); + + let _response: SchemaResponse = resp + .json() + .await + .expect("failed to parse schema response JSON"); +} + +#[tokio::test] +async fn resolve_schema_with_cyclic_inter_table_deps_fails() { + //* Given + let ctx = TestCtx::setup( + "inter_table_cycle", + [("eth_firehose", "_/eth_firehose@0.0.1")], + ) + .await; + + //* When — A references B and B references A (cycle) + let resp = ctx + .send_schema_request_with_tables_and_deps( + [ + ("table_a", "SELECT block_num FROM self.table_b"), + ("table_b", "SELECT block_num FROM self.table_a"), + ], + [("eth", "_/eth_firehose@0.0.1")], + ) + .await; + + //* Then + assert_eq!( + resp.status(), + StatusCode::BAD_REQUEST, + "schema resolution should reject cyclic inter-table dependencies" + ); + + let response: ErrorResponse = resp + .json() + .await + .expect("failed to parse error response JSON"); + + assert_eq!( + response.error_code, "CYCLIC_DEPENDENCY", + "should return CYCLIC_DEPENDENCY for inter-table cycle" + ); +} + +#[tokio::test] +async fn resolve_schema_with_self_referencing_table_fails() { + //* Given + let ctx = TestCtx::setup( + "inter_table_self_ref_cycle", + [("eth_firehose", "_/eth_firehose@0.0.1")], + ) + .await; + + //* When — table references itself + let resp = ctx + .send_schema_request_with_tables_and_deps( + [("table_a", "SELECT block_num FROM self.table_a")], + [("eth", "_/eth_firehose@0.0.1")], + ) + .await; + + //* Then — self-references are detected during dependency extraction and rejected + assert_eq!( + resp.status(), + StatusCode::BAD_REQUEST, + "schema resolution should fail for self-referencing table" + ); + + let response: ErrorResponse = resp + .json() + .await + .expect("failed to parse error response JSON"); + + assert_eq!( + response.error_code, "SELF_REFERENCING_TABLE", + "should return SELF_REFERENCING_TABLE for table referencing itself" + ); +} + +#[tokio::test] +async fn resolve_schema_with_self_ref_to_nonexistent_table_fails() { + //* Given + let ctx = TestCtx::setup( + "inter_table_nonexistent_ref", + [("eth_firehose", "_/eth_firehose@0.0.1")], + ) + .await; + + //* When — table_a references a sibling that doesn't exist + let resp = ctx + .send_schema_request_with_tables_and_deps( + [ + ("table_a", "SELECT block_num, hash FROM eth.blocks"), + ("table_b", "SELECT block_num FROM self.nonexistent_table"), + ], + [("eth", "_/eth_firehose@0.0.1")], + ) + .await; + + //* Then + assert_eq!( + resp.status(), + StatusCode::BAD_REQUEST, + "schema resolution should fail for self-ref to nonexistent sibling" + ); + + let response: ErrorResponse = resp + .json() + .await + .expect("failed to parse error response JSON"); + + assert_eq!( + response.error_code, "SELF_REF_TABLE_NOT_FOUND", + "should return SELF_REF_TABLE_NOT_FOUND for reference to nonexistent sibling" + ); +} + +#[tokio::test] +async fn resolve_schema_with_mixed_inter_table_and_external_deps_succeeds() { + //* Given + let ctx = TestCtx::setup( + "inter_table_mixed_deps", + [("eth_firehose", "_/eth_firehose@0.0.1")], + ) + .await; + + //* When — table_a uses external dep, table_b uses both external and self ref + let resp = ctx + .send_schema_request_with_tables_and_deps( + [ + ("table_a", "SELECT block_num, hash FROM eth.blocks"), + ( + "table_b", + "SELECT a.block_num, t.gas_used FROM self.table_a a JOIN eth.transactions t ON a.block_num = t.block_num", + ), + ], + [("eth", "_/eth_firehose@0.0.1")], + ) + .await; + + //* Then + assert_eq!( + resp.status(), + StatusCode::OK, + "schema resolution should succeed with mixed inter-table and external deps" + ); + + let _response: SchemaResponse = resp + .json() + .await + .expect("failed to parse schema response JSON"); +} + struct TestCtx { _ctx: crate::testlib::ctx::TestCtx, client: reqwest::Client, diff --git a/tests/src/tests/it_dependencies.rs b/tests/src/tests/it_dependencies.rs index c7642d6db..c234c38b1 100644 --- a/tests/src/tests/it_dependencies.rs +++ b/tests/src/tests/it_dependencies.rs @@ -3,7 +3,6 @@ use monitoring::logging; use crate::{steps::run_spec, testlib::ctx::TestCtxBuilder}; #[tokio::test] -#[ignore = "The intra-deps resolution functionality is broken. Enable this test once fixed"] async fn intra_deps_test() { logging::init(); From 1b7dc5c168e214571e5ac44178c3e836d5a82208 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Tue, 10 Mar 2026 15:48:09 -0500 Subject: [PATCH 2/6] refactor(admin-api): extract shared inter-table dependency resolution Deduplicate inter-table dep logic that was copy-pasted between manifest validation and schema inference handlers. - Add `resolve_inter_table_order` shared function in `common.rs` - Add `InterTableDepError` enum with `error_code()` to preserve API error codes - Replace inline dep extraction in both `common.rs` and `schema.rs` with shared function - Consolidate 6 duplicated error variants into 3 defined once Signed-off-by: Mitchell Spencer --- .../services/admin-api/src/handlers/common.rs | 156 ++++++++++++------ .../services/admin-api/src/handlers/schema.rs | 149 ++++++----------- 2 files changed, 157 insertions(+), 148 deletions(-) diff --git a/crates/services/admin-api/src/handlers/common.rs b/crates/services/admin-api/src/handlers/common.rs index a275b263e..9c1d90eb0 100644 --- a/crates/services/admin-api/src/handlers/common.rs +++ b/crates/services/admin-api/src/handlers/common.rs @@ -1,6 +1,9 @@ //! Common utilities for HTTP handlers -use std::{collections::BTreeMap, sync::Arc}; +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; use amp_data_store::{DataStore, PhyTableRevision}; use amp_datasets_registry::error::ResolveRevisionError; @@ -37,6 +40,91 @@ type TableReferencesMap = BTreeMap< ), >; +/// Extracts inter-table dependencies from parsed table references and returns +/// a topologically sorted processing order. +/// +/// For each table, inspects `self.`-qualified table references, validates that +/// they don't self-reference or target nonexistent siblings, builds a dependency +/// graph, and topologically sorts it. +/// +/// ## Parameters +/// - `table_refs`: Iterator of `(table_name, table_references)` pairs +/// - `known_tables`: Set of all table names in the dataset (used to validate +/// that `self.` targets exist) +pub fn resolve_inter_table_order<'a>( + table_refs: impl IntoIterator])>, + known_tables: &BTreeSet, +) -> Result, InterTableDepError> { + let mut deps: BTreeMap> = BTreeMap::new(); + for (table_name, refs) in table_refs { + let mut table_deps = Vec::new(); + for table_ref in refs { + if let TableReference::Partial { schema, table } = table_ref + && schema.as_ref().is_self() + { + if table.as_ref() == table_name { + return Err(InterTableDepError::SelfReferencingTable { + table_name: table_name.clone(), + }); + } + if known_tables.contains(table.as_ref()) { + table_deps.push(table.as_ref().clone()); + } else { + return Err(InterTableDepError::SelfRefTableNotFound { + source_table: table_name.clone(), + referenced_table: table.as_ref().clone(), + }); + } + } + } + deps.insert(table_name.clone(), table_deps); + } + sorting::topological_sort(deps).map_err(InterTableDepError::CyclicDependency) +} + +/// Errors from extracting and validating inter-table dependencies. +#[derive(Debug, thiserror::Error)] +pub enum InterTableDepError { + /// A table references itself via `self.` + /// + /// This occurs when a table's SQL query contains `self.`, which would + /// create a trivial circular dependency. Tables cannot reference themselves. + #[error("Table '{table_name}' references itself via self.{table_name}")] + SelfReferencingTable { + /// The table that references itself + table_name: TableName, + }, + + /// A `self.`-qualified table reference targets a table that does not exist in the dataset. + #[error( + "Table '{source_table}' references non-existent sibling table 'self.{referenced_table}'" + )] + SelfRefTableNotFound { + /// The table whose SQL contains the invalid self-ref + source_table: TableName, + /// The referenced table name that does not exist + referenced_table: TableName, + }, + + /// Cyclic dependency detected among tables within the dataset + /// + /// This occurs when tables reference each other in a cycle (e.g., table_a references + /// table_b which references table_a). Inter-table dependencies must form a DAG. + #[error("Cyclic dependency detected among inter-table references: {0}")] + CyclicDependency(#[source] CyclicDepError), +} + +impl InterTableDepError { + /// Returns the machine-readable error code for this error. + pub fn error_code(&self) -> &'static str { + match self { + Self::SelfReferencingTable { .. } => "SELF_REFERENCING_TABLE", + Self::SelfRefTableNotFound { .. } => "SELF_REF_TABLE_NOT_FOUND", + Self::CyclicDependency(_) => "CYCLIC_DEPENDENCY", + } + } +} + /// A string wrapper that ensures the value is not empty or whitespace-only /// /// This invariant-holding _new-type_ validates that strings contain at least one non-whitespace character. @@ -355,33 +443,14 @@ pub async fn validate_derived_manifest( // Step 2b: Extract inter-table dependencies and determine processing order. // `self.`-qualified table references that match sibling table names are inter-table deps. - let table_order = { - let mut deps: BTreeMap> = BTreeMap::new(); - for (table_name, (table_refs, _func_refs)) in &references { - let mut table_deps = Vec::new(); - for table_ref in table_refs { - if let TableReference::Partial { schema, table } = table_ref - && schema.as_ref().is_self() - { - if table.as_ref() == table_name { - return Err(ManifestValidationError::SelfReferencingTable { - table_name: table_name.clone(), - }); - } - if manifest.tables.contains_key(table.as_ref()) { - table_deps.push(table.as_ref().clone()); - } else { - return Err(ManifestValidationError::SelfRefTableNotFound { - source_table: table_name.clone(), - referenced_table: table.as_ref().clone(), - }); - } - } - } - deps.insert(table_name.clone(), table_deps); - } - sorting::topological_sort(deps).map_err(ManifestValidationError::CyclicDependency)? - }; + let known_tables: BTreeSet = manifest.tables.keys().cloned().collect(); + let table_order = resolve_inter_table_order( + references + .iter() + .map(|(name, (refs, _))| (name, refs.as_slice())), + &known_tables, + ) + .map_err(ManifestValidationError::InterTableDep)?; // Step 3: Create planning context to validate all table and function references. // Inter-table references use `self.` syntax, which resolves through the @@ -565,33 +634,12 @@ pub enum ManifestValidationError { alias: String, }, - /// A table references itself via `self.` - /// - /// This occurs when a table's SQL query contains `self.`, which would - /// create a trivial circular dependency. Tables cannot reference themselves. - #[error("Table '{table_name}' references itself via self.{table_name}")] - SelfReferencingTable { - /// The table that references itself - table_name: TableName, - }, - - /// A `self.`-qualified table reference targets a table that does not exist in the dataset. - #[error( - "Table '{source_table}' references non-existent sibling table 'self.{referenced_table}'" - )] - SelfRefTableNotFound { - /// The table whose SQL contains the invalid self-ref - source_table: TableName, - /// The referenced table name that does not exist - referenced_table: TableName, - }, - - /// Cyclic dependency detected among tables within the dataset + /// Inter-table dependency validation failed /// - /// This occurs when tables reference each other in a cycle (e.g., table_a references - /// table_b which references table_a). Inter-table dependencies must form a DAG. - #[error("Cyclic dependency detected among inter-table references: {0}")] - CyclicDependency(#[source] CyclicDepError), + /// This occurs when validating `self.`-qualified table references within the + /// dataset: self-referencing tables, nonexistent sibling targets, or cyclic deps. + #[error("Inter-table dependency error")] + InterTableDep(#[source] InterTableDepError), /// Non-incremental SQL operation in table query /// diff --git a/crates/services/admin-api/src/handlers/schema.rs b/crates/services/admin-api/src/handlers/schema.rs index 8198ecaee..71fb328bc 100644 --- a/crates/services/admin-api/src/handlers/schema.rs +++ b/crates/services/admin-api/src/handlers/schema.rs @@ -1,4 +1,7 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; use axum::{ Json, @@ -12,7 +15,7 @@ use common::{ incrementalizer::NonIncrementalQueryError, plan_visitors::prepend_special_block_num_field, self_schema_provider::SelfSchemaProvider, - sql::{self, ResolveTableReferencesError, TableReference}, + sql::{self, ResolveTableReferencesError}, sql_str::SqlStr, }; use datafusion::{arrow, sql::parser::Statement}; @@ -24,14 +27,16 @@ use datasets_derived::{ func_name::FuncName, function::Function, manifest::TableSchema, - sorting::{self, CyclicDepError}, }; use js_runtime::isolate_pool::IsolatePool; use tracing::instrument; use crate::{ ctx::Ctx, - handlers::error::{ErrorResponse, IntoErrorResponse}, + handlers::{ + common::{InterTableDepError, resolve_inter_table_order}, + error::{ErrorResponse, IntoErrorResponse}, + }, }; /// Handler for the `POST /schema` endpoint @@ -90,9 +95,9 @@ use crate::{ /// /// # Panics /// -/// Panics if `topological_sort` returns a table name that was not in the original -/// statements map. This is structurally impossible because the sort only returns -/// keys from its input. +/// Panics if `resolve_inter_table_order` returns a table name that was not in the +/// original statements map. This is structurally impossible because the sort only +/// returns keys from its input. #[instrument(skip_all, err)] #[cfg_attr( feature = "utoipa", @@ -216,67 +221,48 @@ pub async fn handler( statements }; - // Extract inter-table dependencies and determine processing order. - // `self.`-qualified table references that match sibling table names are inter-table deps. - let table_order = { - let mut deps: BTreeMap> = BTreeMap::new(); - for (table_name, stmt) in &statements { - let table_refs = sql::resolve_table_references::(stmt).map_err( - |err| match &err { - ResolveTableReferencesError::InvalidTableName { .. } => { - Error::InvalidTableName { - table_name: table_name.clone(), - source: err, - } - } - ResolveTableReferencesError::CatalogQualifiedTable { .. } => { - Error::CatalogQualifiedTableInSql { - table_name: table_name.clone(), - source: err, - } - } - _ => Error::TableReferenceResolution { + // Extract table references per table, validate non-empty, then determine + // inter-table processing order via the shared helper. + let mut parsed_refs: BTreeMap>> = + BTreeMap::new(); + for (table_name, stmt) in &statements { + let table_refs = + sql::resolve_table_references::(stmt).map_err(|err| match &err { + ResolveTableReferencesError::InvalidTableName { .. } => Error::InvalidTableName { + table_name: table_name.clone(), + source: err, + }, + ResolveTableReferencesError::CatalogQualifiedTable { .. } => { + Error::CatalogQualifiedTableInSql { table_name: table_name.clone(), source: err, - }, - }, - )?; - - // Reject tables whose SQL references no source tables (e.g., `SELECT 1`). - // Derived tables must reference at least one external dependency or sibling table. - if table_refs.is_empty() { - return Err(Error::NoTableReferences { - table_name: table_name.clone(), - } - .into()); - } - - let mut table_deps = Vec::new(); - for table_ref in table_refs { - if let TableReference::Partial { schema, table } = &table_ref - && schema.as_ref().is_self() - { - if table.as_ref() == table_name { - return Err(Error::SelfReferencingTable { - table_name: table_name.clone(), - } - .into()); - } - if statements.contains_key(table.as_ref()) { - table_deps.push(table.as_ref().clone()); - } else { - return Err(Error::SelfRefTableNotFound { - source_table: table_name.clone(), - referenced_table: table.as_ref().clone(), - } - .into()); } } + _ => Error::TableReferenceResolution { + table_name: table_name.clone(), + source: err, + }, + })?; + + // Reject tables whose SQL references no source tables (e.g., `SELECT 1`). + if table_refs.is_empty() { + return Err(Error::NoTableReferences { + table_name: table_name.clone(), } - deps.insert(table_name.clone(), table_deps); + .into()); } - sorting::topological_sort(deps).map_err(Error::CyclicDependency)? - }; + + parsed_refs.insert(table_name.clone(), table_refs); + } + + let known_tables: BTreeSet = statements.keys().cloned().collect(); + let table_order = resolve_inter_table_order( + parsed_refs + .iter() + .map(|(name, refs)| (name, refs.as_slice())), + &known_tables, + ) + .map_err(Error::InterTableDep)?; // Build dep_aliases for AmpCatalogProvider before dependencies is consumed let dep_aliases: BTreeMap = dependencies @@ -458,33 +444,12 @@ enum Error { source: NonIncrementalQueryError, }, - /// A table references itself via `self.` - /// - /// This occurs when a table's SQL query contains `self.`, which would - /// create a trivial circular dependency. Tables cannot reference themselves. - #[error("Table '{table_name}' references itself via self.{table_name}")] - SelfReferencingTable { - /// The table that references itself - table_name: TableName, - }, - - /// A `self.`-qualified table reference targets a table that does not exist in the dataset. - #[error( - "Table '{source_table}' references non-existent sibling table 'self.{referenced_table}'" - )] - SelfRefTableNotFound { - /// The table whose SQL contains the invalid self-ref - source_table: TableName, - /// The referenced table name that does not exist - referenced_table: TableName, - }, - - /// Cyclic dependency detected among tables within the dataset + /// Inter-table dependency validation failed /// - /// This occurs when tables reference each other in a cycle (e.g., table_a references - /// table_b which references table_a). Inter-table dependencies must form a DAG. - #[error("Cyclic dependency detected among inter-table references: {0}")] - CyclicDependency(#[source] CyclicDepError), + /// This occurs when validating `self.`-qualified table references within the + /// dataset: self-referencing tables, nonexistent sibling targets, or cyclic deps. + #[error("Inter-table dependency error")] + InterTableDep(#[source] InterTableDepError), /// Catalog-qualified tables (e.g., `catalog.schema.table`) are not supported. /// @@ -620,9 +585,7 @@ impl IntoErrorResponse for Error { Error::EmptyTablesAndFunctions => "EMPTY_TABLES_AND_FUNCTIONS", Error::InvalidTableSql { .. } => "INVALID_TABLE_SQL", Error::NonIncrementalQuery { .. } => "NON_INCREMENTAL_QUERY", - Error::SelfReferencingTable { .. } => "SELF_REFERENCING_TABLE", - Error::SelfRefTableNotFound { .. } => "SELF_REF_TABLE_NOT_FOUND", - Error::CyclicDependency(_) => "CYCLIC_DEPENDENCY", + Error::InterTableDep(inner) => inner.error_code(), Error::CatalogQualifiedTableInSql { .. } => "CATALOG_QUALIFIED_TABLE", Error::InvalidTableName { .. } => "INVALID_TABLE_NAME", Error::TableReferenceResolution { .. } => "TABLE_REFERENCE_RESOLUTION", @@ -644,9 +607,7 @@ impl IntoErrorResponse for Error { Error::EmptyTablesAndFunctions => StatusCode::BAD_REQUEST, Error::InvalidTableSql { .. } => StatusCode::BAD_REQUEST, Error::NonIncrementalQuery { .. } => StatusCode::BAD_REQUEST, - Error::SelfReferencingTable { .. } => StatusCode::BAD_REQUEST, - Error::SelfRefTableNotFound { .. } => StatusCode::BAD_REQUEST, - Error::CyclicDependency(_) => StatusCode::BAD_REQUEST, + Error::InterTableDep(_) => StatusCode::BAD_REQUEST, Error::CatalogQualifiedTableInSql { .. } => StatusCode::BAD_REQUEST, Error::InvalidTableName { .. } => StatusCode::BAD_REQUEST, Error::TableReferenceResolution { .. } => StatusCode::BAD_REQUEST, From c6a983dcbce10cd9cfe1e4c878ce8dc2e3aec0d5 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Tue, 10 Mar 2026 16:09:19 -0500 Subject: [PATCH 3/6] refactor(admin-api): simplify resolve_inter_table_order signature Replace generic iterator parameter with concrete BTreeMap reference for clarity and caller ergonomics. - Change `impl IntoIterator])>` to `&BTreeMap>>` - Simplify schema.rs call site from `.iter().map()` chain to direct `&parsed_refs` - Build explicit `table_refs_only` map in common.rs where tuple destructuring is needed Signed-off-by: Mitchell Spencer --- .../services/admin-api/src/handlers/common.rs | 19 +++++++++---------- .../services/admin-api/src/handlers/schema.rs | 9 ++------- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/crates/services/admin-api/src/handlers/common.rs b/crates/services/admin-api/src/handlers/common.rs index 9c1d90eb0..9f3ed5b2b 100644 --- a/crates/services/admin-api/src/handlers/common.rs +++ b/crates/services/admin-api/src/handlers/common.rs @@ -48,11 +48,11 @@ type TableReferencesMap = BTreeMap< /// graph, and topologically sorts it. /// /// ## Parameters -/// - `table_refs`: Iterator of `(table_name, table_references)` pairs +/// - `table_refs`: Map of table names to their parsed SQL table references /// - `known_tables`: Set of all table names in the dataset (used to validate /// that `self.` targets exist) -pub fn resolve_inter_table_order<'a>( - table_refs: impl IntoIterator])>, +pub fn resolve_inter_table_order( + table_refs: &BTreeMap>>, known_tables: &BTreeSet, ) -> Result, InterTableDepError> { let mut deps: BTreeMap> = BTreeMap::new(); @@ -444,13 +444,12 @@ pub async fn validate_derived_manifest( // Step 2b: Extract inter-table dependencies and determine processing order. // `self.`-qualified table references that match sibling table names are inter-table deps. let known_tables: BTreeSet = manifest.tables.keys().cloned().collect(); - let table_order = resolve_inter_table_order( - references - .iter() - .map(|(name, (refs, _))| (name, refs.as_slice())), - &known_tables, - ) - .map_err(ManifestValidationError::InterTableDep)?; + let table_refs_only: BTreeMap>> = references + .iter() + .map(|(name, (refs, _))| (name.clone(), refs.clone())) + .collect(); + let table_order = resolve_inter_table_order(&table_refs_only, &known_tables) + .map_err(ManifestValidationError::InterTableDep)?; // Step 3: Create planning context to validate all table and function references. // Inter-table references use `self.` syntax, which resolves through the diff --git a/crates/services/admin-api/src/handlers/schema.rs b/crates/services/admin-api/src/handlers/schema.rs index 71fb328bc..81c17c8a1 100644 --- a/crates/services/admin-api/src/handlers/schema.rs +++ b/crates/services/admin-api/src/handlers/schema.rs @@ -256,13 +256,8 @@ pub async fn handler( } let known_tables: BTreeSet = statements.keys().cloned().collect(); - let table_order = resolve_inter_table_order( - parsed_refs - .iter() - .map(|(name, refs)| (name, refs.as_slice())), - &known_tables, - ) - .map_err(Error::InterTableDep)?; + let table_order = + resolve_inter_table_order(&parsed_refs, &known_tables).map_err(Error::InterTableDep)?; // Build dep_aliases for AmpCatalogProvider before dependencies is consumed let dep_aliases: BTreeMap = dependencies From 6a192d545e895898518e8ab79fc1f0ae69b4c810 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Tue, 10 Mar 2026 17:26:19 -0500 Subject: [PATCH 4/6] chore(admin-api): regenerate OpenAPI spec Update generated spec to reflect renamed resolve_inter_table_order function in schema handler panics documentation. --- docs/openapi-specs/admin.spec.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/openapi-specs/admin.spec.json b/docs/openapi-specs/admin.spec.json index 092acda72..25f5dea57 100644 --- a/docs/openapi-specs/admin.spec.json +++ b/docs/openapi-specs/admin.spec.json @@ -2264,7 +2264,7 @@ "schema" ], "summary": "Handler for the `POST /schema` endpoint", - "description": "Analyzes SQL queries and returns the output schema without executing the query.\nPerforms comprehensive validation and schema inference using real registered datasets\nand their actual schemas.\n\n## Request Body\n- `dependencies`: External dataset dependencies mapped by alias\n- `tables`: Table definitions mapped by table name (optional if functions provided)\n- `functions`: Function names defined in dataset config (optional if tables provided)\n\n## Response\n- **200 OK**: Returns the inferred schema and networks referenced by the query\n- **400 Bad Request**: Invalid SQL syntax, table references, or function format\n- **404 Not Found**: Referenced dataset does not exist\n- **500 Internal Server Error**: Dataset store, planning, or internal errors\n\n## Error Codes\n- `INVALID_PAYLOAD_FORMAT`: Request JSON is malformed or missing required fields\n- `EMPTY_TABLES_AND_FUNCTIONS`: No tables or functions provided (at least one is required)\n- `INVALID_TABLE_SQL`: SQL syntax error in table definition\n- `NON_INCREMENTAL_QUERY`: SQL query is non-incremental\n- `SELF_REFERENCING_TABLE`: A table references itself via `self.`\n- `CYCLIC_DEPENDENCY`: Inter-table dependencies form a cycle\n- `TABLE_REFERENCE_RESOLUTION`: Failed to extract table references from SQL\n- `NO_TABLE_REFERENCES`: Table SQL does not reference any source tables\n- `FUNCTION_REFERENCE_RESOLUTION`: Failed to extract function references from SQL\n- `DEPENDENCY_NOT_FOUND`: Referenced dependency does not exist\n- `DEPENDENCY_MANIFEST_LINK_CHECK`: Failed to verify manifest link for dependency\n- `DEPENDENCY_VERSION_RESOLUTION`: Failed to resolve version for dependency\n- `CATALOG_QUALIFIED_TABLE`: Table uses unsupported catalog qualification\n- `UNQUALIFIED_TABLE`: Table missing required dataset qualification\n- `INVALID_TABLE_NAME`: Table name violates SQL identifier rules\n- `INVALID_DEPENDENCY_ALIAS_FOR_TABLE_REF`: Dependency alias in table reference is invalid\n- `INVALID_DEPENDENCY_ALIAS_FOR_FUNCTION_REF`: Dependency alias in function reference is invalid\n- `CATALOG_QUALIFIED_FUNCTION`: Function uses unsupported catalog qualification\n- `DEPENDENCY_ALIAS_NOT_FOUND`: Referenced alias not in dependencies\n- `DATASET_NOT_FOUND`: Referenced dataset does not exist\n- `GET_DATASET_ERROR`: Failed to retrieve dataset from store\n- `ETH_CALL_UDF_CREATION_ERROR`: Failed to create eth_call UDF\n- `TABLE_NOT_FOUND_IN_DATASET`: Table not found in referenced dataset\n- `FUNCTION_NOT_FOUND_IN_DATASET`: Function not found in referenced dataset\n- `ETH_CALL_NOT_AVAILABLE`: eth_call function not available for dataset\n- `SESSION_CONFIG_ERROR`: Failed to create DataFusion session configuration\n- `SCHEMA_INFERENCE`: Failed to infer output schema from query\n\n## Schema Analysis Process\n1. **Parse SQL**: Validates syntax using DataFusion's SQL parser\n2. **Load Datasets**: Retrieves dataset definitions from the registry for all referenced datasets\n3. **Create Planning Context**: Builds planning context with real table schemas from stored datasets\n4. **Infer Schema**: Uses DataFusion's query planner to determine output schema without executing the query\n5. **Prepend Special Fields**: Adds `RESERVED_BLOCK_NUM_COLUMN_NAME` field to the output schema\n6. **Extract Networks**: Identifies which blockchain networks are referenced by the query\n\n# Panics\n\nPanics if `topological_sort` returns a table name that was not in the original\nstatements map. This is structurally impossible because the sort only returns\nkeys from its input.", + "description": "Analyzes SQL queries and returns the output schema without executing the query.\nPerforms comprehensive validation and schema inference using real registered datasets\nand their actual schemas.\n\n## Request Body\n- `dependencies`: External dataset dependencies mapped by alias\n- `tables`: Table definitions mapped by table name (optional if functions provided)\n- `functions`: Function names defined in dataset config (optional if tables provided)\n\n## Response\n- **200 OK**: Returns the inferred schema and networks referenced by the query\n- **400 Bad Request**: Invalid SQL syntax, table references, or function format\n- **404 Not Found**: Referenced dataset does not exist\n- **500 Internal Server Error**: Dataset store, planning, or internal errors\n\n## Error Codes\n- `INVALID_PAYLOAD_FORMAT`: Request JSON is malformed or missing required fields\n- `EMPTY_TABLES_AND_FUNCTIONS`: No tables or functions provided (at least one is required)\n- `INVALID_TABLE_SQL`: SQL syntax error in table definition\n- `NON_INCREMENTAL_QUERY`: SQL query is non-incremental\n- `SELF_REFERENCING_TABLE`: A table references itself via `self.`\n- `CYCLIC_DEPENDENCY`: Inter-table dependencies form a cycle\n- `TABLE_REFERENCE_RESOLUTION`: Failed to extract table references from SQL\n- `NO_TABLE_REFERENCES`: Table SQL does not reference any source tables\n- `FUNCTION_REFERENCE_RESOLUTION`: Failed to extract function references from SQL\n- `DEPENDENCY_NOT_FOUND`: Referenced dependency does not exist\n- `DEPENDENCY_MANIFEST_LINK_CHECK`: Failed to verify manifest link for dependency\n- `DEPENDENCY_VERSION_RESOLUTION`: Failed to resolve version for dependency\n- `CATALOG_QUALIFIED_TABLE`: Table uses unsupported catalog qualification\n- `UNQUALIFIED_TABLE`: Table missing required dataset qualification\n- `INVALID_TABLE_NAME`: Table name violates SQL identifier rules\n- `INVALID_DEPENDENCY_ALIAS_FOR_TABLE_REF`: Dependency alias in table reference is invalid\n- `INVALID_DEPENDENCY_ALIAS_FOR_FUNCTION_REF`: Dependency alias in function reference is invalid\n- `CATALOG_QUALIFIED_FUNCTION`: Function uses unsupported catalog qualification\n- `DEPENDENCY_ALIAS_NOT_FOUND`: Referenced alias not in dependencies\n- `DATASET_NOT_FOUND`: Referenced dataset does not exist\n- `GET_DATASET_ERROR`: Failed to retrieve dataset from store\n- `ETH_CALL_UDF_CREATION_ERROR`: Failed to create eth_call UDF\n- `TABLE_NOT_FOUND_IN_DATASET`: Table not found in referenced dataset\n- `FUNCTION_NOT_FOUND_IN_DATASET`: Function not found in referenced dataset\n- `ETH_CALL_NOT_AVAILABLE`: eth_call function not available for dataset\n- `SESSION_CONFIG_ERROR`: Failed to create DataFusion session configuration\n- `SCHEMA_INFERENCE`: Failed to infer output schema from query\n\n## Schema Analysis Process\n1. **Parse SQL**: Validates syntax using DataFusion's SQL parser\n2. **Load Datasets**: Retrieves dataset definitions from the registry for all referenced datasets\n3. **Create Planning Context**: Builds planning context with real table schemas from stored datasets\n4. **Infer Schema**: Uses DataFusion's query planner to determine output schema without executing the query\n5. **Prepend Special Fields**: Adds `RESERVED_BLOCK_NUM_COLUMN_NAME` field to the output schema\n6. **Extract Networks**: Identifies which blockchain networks are referenced by the query\n\n# Panics\n\nPanics if `resolve_inter_table_order` returns a table name that was not in the\noriginal statements map. This is structurally impossible because the sort only\nreturns keys from its input.", "operationId": "schema_analyze", "requestBody": { "content": { From 81dc4338970882d4fc51db1ed70ad63f3db52577 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Wed, 11 Mar 2026 21:54:34 -0500 Subject: [PATCH 5/6] refactor(worker-datasets-derived): address PR review feedback - Remove trivial is_fatal test for SelfRefTableNotFound - Move partition_table_refs to end of module per ordering guidelines - Move parse_and_partition test helper to end of tests module --- .../worker-datasets-derived/src/job_impl.rs | 14 ---- .../src/job_impl/table.rs | 80 +++++++++---------- 2 files changed, 40 insertions(+), 54 deletions(-) diff --git a/crates/core/worker-datasets-derived/src/job_impl.rs b/crates/core/worker-datasets-derived/src/job_impl.rs index 5cb9979e5..3ab0625b1 100644 --- a/crates/core/worker-datasets-derived/src/job_impl.rs +++ b/crates/core/worker-datasets-derived/src/job_impl.rs @@ -486,20 +486,6 @@ mod tests { //* Then assert!(result, "DependencyNotFound should be fatal"); } - - #[test] - fn is_fatal_self_ref_table_not_found_returns_true() { - //* Given - let err = MaterializeTableError::SelfRefTableNotFound( - "missing_table".parse().expect("should parse table name"), - ); - - //* When - let result = err.is_fatal(); - - //* Then - assert!(result, "SelfRefTableNotFound should be fatal"); - } } mod materialize_table_spawn_error_is_fatal { diff --git a/crates/core/worker-datasets-derived/src/job_impl/table.rs b/crates/core/worker-datasets-derived/src/job_impl/table.rs index 7abdf5996..846d6eb20 100644 --- a/crates/core/worker-datasets-derived/src/job_impl/table.rs +++ b/crates/core/worker-datasets-derived/src/job_impl/table.rs @@ -36,38 +36,6 @@ use tracing::Instrument as _; use super::query::{MaterializeSqlQueryError, materialize_sql_query}; use crate::job_ctx::Context; -/// Partitions table references into external dependency refs and self-ref table names. -/// -/// References with `self.` schema are collected as self-ref table names. -/// All other qualified references are converted to external `DepAlias` refs. -fn partition_table_refs( - refs: Vec>, -) -> (Vec>, Vec) { - let mut ext_refs: Vec> = Vec::new(); - let mut self_ref_tables: Vec = Vec::new(); - - for table_ref in refs { - match table_ref { - TableReference::Bare { table } => { - ext_refs.push(TableReference::Bare { table }); - } - TableReference::Partial { schema, table } => match schema.as_ref() { - DepAliasOrSelfRef::SelfRef => { - self_ref_tables.push(table.as_ref().clone()); - } - DepAliasOrSelfRef::DepAlias(alias) => { - ext_refs.push(TableReference::Partial { - schema: Arc::new(alias.clone()), - table, - }); - } - }, - } - } - - (ext_refs, self_ref_tables) -} - /// Materializes a derived dataset table #[allow(clippy::too_many_arguments)] #[tracing::instrument(skip_all, fields(table = %table.table_name()), err)] @@ -521,6 +489,38 @@ impl RetryableErrorExt for MaterializeTableSpawnError { } } +/// Partitions table references into external dependency refs and self-ref table names. +/// +/// References with `self.` schema are collected as self-ref table names. +/// All other qualified references are converted to external `DepAlias` refs. +fn partition_table_refs( + refs: Vec>, +) -> (Vec>, Vec) { + let mut ext_refs: Vec> = Vec::new(); + let mut self_ref_tables: Vec = Vec::new(); + + for table_ref in refs { + match table_ref { + TableReference::Bare { table } => { + ext_refs.push(TableReference::Bare { table }); + } + TableReference::Partial { schema, table } => match schema.as_ref() { + DepAliasOrSelfRef::SelfRef => { + self_ref_tables.push(table.as_ref().clone()); + } + DepAliasOrSelfRef::DepAlias(alias) => { + ext_refs.push(TableReference::Partial { + schema: Arc::new(alias.clone()), + table, + }); + } + }, + } + } + + (ext_refs, self_ref_tables) +} + #[cfg(test)] mod tests { use common::sql::resolve_table_references; @@ -528,14 +528,6 @@ mod tests { use super::*; - fn parse_and_partition(sql: &str) -> (Vec>, Vec) { - let sql_str: SqlStr = sql.parse().expect("sql should parse to SqlStr"); - let stmt = common::sql::parse(&sql_str).expect("sql should parse to statement"); - let refs = resolve_table_references::(&stmt) - .expect("table references should resolve"); - partition_table_refs(refs) - } - #[test] fn partition_table_refs_with_only_external_deps_returns_ext_refs_only() { //* When @@ -605,4 +597,12 @@ mod tests { //* Then assert!(!result, "SelfRefTableNotFound should not be retryable"); } + + fn parse_and_partition(sql: &str) -> (Vec>, Vec) { + let sql_str: SqlStr = sql.parse().expect("sql should parse to SqlStr"); + let stmt = common::sql::parse(&sql_str).expect("sql should parse to statement"); + let refs = resolve_table_references::(&stmt) + .expect("table references should resolve"); + partition_table_refs(refs) + } } From fbd81addab9861f910d3d38e7bd4c033bfdc2513 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Wed, 11 Mar 2026 22:15:27 -0500 Subject: [PATCH 6/6] docs: move inter-table dependencies doc to datasets-derived hierarchy Rename data-inter-table-dependencies.md to datasets-derived-inter-table-dependencies.md and add reference from the parent datasets-derived.md doc. --- ...ndencies.md => datasets-derived-inter-table-dependencies.md} | 2 +- docs/feat/datasets-derived.md | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) rename docs/feat/{data-inter-table-dependencies.md => datasets-derived-inter-table-dependencies.md} (99%) diff --git a/docs/feat/data-inter-table-dependencies.md b/docs/feat/datasets-derived-inter-table-dependencies.md similarity index 99% rename from docs/feat/data-inter-table-dependencies.md rename to docs/feat/datasets-derived-inter-table-dependencies.md index 995c51a96..2f29723ce 100644 --- a/docs/feat/data-inter-table-dependencies.md +++ b/docs/feat/datasets-derived-inter-table-dependencies.md @@ -1,5 +1,5 @@ --- -name: "data-inter-table-dependencies" +name: "datasets-derived-inter-table-dependencies" description: "Inter-table dependencies within derived datasets: self-qualified table references, topological ordering, cycle detection. Load when asking about tables referencing other tables in the same dataset" type: feature status: unstable diff --git a/docs/feat/datasets-derived.md b/docs/feat/datasets-derived.md index bd733dc9e..3443424a1 100644 --- a/docs/feat/datasets-derived.md +++ b/docs/feat/datasets-derived.md @@ -126,3 +126,4 @@ Custom functions can be declared in the `functions` field and used in SQL views. - [datasets](datasets.md) - Base: Dataset system overview - [datasets-manifest](datasets-manifest.md) - Related: Manifest format +- [datasets-derived-inter-table-dependencies](datasets-derived-inter-table-dependencies.md) - Child: Inter-table dependency resolution