Skip to content
Merged
61 changes: 45 additions & 16 deletions crates/core/common/src/catalog/physical/for_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! This module provides physical catalog creation for derived dataset execution.
//! It resolves dependency tables from manifest deps and SQL table references,
//! then adds physical parquet locations.
//! then builds the catalog from resolved entries.

use std::{
collections::{BTreeMap, btree_map::Entry},
Expand All @@ -22,25 +22,31 @@ use crate::{
sql::TableReference,
};

/// Creates a full catalog with physical data access for derived dataset dumps.
/// A resolved table entry containing logical metadata, physical data access,
/// and the SQL schema name used in queries.
pub struct ResolvedTableEntry {
pub logical: LogicalTable,
pub physical: Arc<PhysicalTable>,
pub schema_name: Arc<str>,
}

/// Resolves external dependency table references into resolved table entries.
///
/// This function resolves dependency tables from manifest deps and SQL table references,
/// loads dataset metadata, builds physical table entries, and constructs the catalog.
/// loads dataset metadata, and builds physical table entries.
///
/// ## Parameters
///
/// - `datasets_cache`: Used to retrieve dataset metadata including start_block
/// - `data_store`: Used to query metadata database for physical parquet locations
/// - `manifest_deps`: Dependency alias → hash reference mappings from the manifest
/// - `table_refs`: Parsed SQL table references with dep alias schemas
/// - `udfs`: Pre-resolved self-ref UDFs (from logical catalog)
pub async fn create(
pub async fn resolve_external_deps(
datasets_cache: &DatasetsCache,
data_store: &DataStore,
manifest_deps: &BTreeMap<DepAlias, HashReference>,
table_refs: Vec<TableReference<DepAlias>>,
udfs: Vec<ScalarUDF>,
) -> Result<Catalog, CreateCatalogError> {
) -> Result<Vec<ResolvedTableEntry>, CreateCatalogError> {
// Resolve table references to LogicalTable instances
let mut tables_by_hash: BTreeMap<Hash, BTreeMap<TableReference<DepAlias>, LogicalTable>> =
Default::default();
Expand Down Expand Up @@ -98,7 +104,7 @@ pub async fn create(
.flat_map(|map| map.into_values())
.collect();

// Build physical catalog entries from resolved logical tables
// Build resolved entries from logical tables
let mut entries = Vec::new();
for table in &logical_tables {
let dataset_ref = table.dataset_reference();
Expand Down Expand Up @@ -136,24 +142,47 @@ pub async fn create(
Arc::clone(table_def),
revision,
);
entries.push((
Arc::from(physical_table),
Arc::from(table.sql_schema_name()),
));
entries.push(ResolvedTableEntry {
logical: table.clone(),
physical: Arc::from(physical_table),
schema_name: Arc::from(table.sql_schema_name()),
});
}

// Build dep_aliases map
Ok(entries)
}

/// Builds a physical catalog from resolved table entries.
///
/// Takes already-resolved table entries (from both external deps and self-ref
/// siblings) and constructs the final catalog.
///
/// ## Parameters
///
/// - `entries`: All resolved table entries (external deps + self-refs combined)
/// - `udfs`: Pre-resolved UDFs (from logical catalog)
/// - `manifest_deps`: Dependency alias → hash reference mappings from the manifest
pub fn build_catalog(
entries: Vec<ResolvedTableEntry>,
udfs: Vec<ScalarUDF>,
manifest_deps: &BTreeMap<DepAlias, HashReference>,
) -> Catalog {
let (logical_tables, physical_entries): (Vec<_>, Vec<_>) = entries
.into_iter()
.map(|e| (e.logical, (e.physical, e.schema_name)))
.unzip();

let dep_aliases: BTreeMap<String, HashReference> = manifest_deps
.iter()
.map(|(alias, hash_ref)| (alias.to_string(), hash_ref.clone()))
.collect();

Ok(Catalog::new(logical_tables, udfs, entries, dep_aliases))
Catalog::new(logical_tables, udfs, physical_entries, dep_aliases)
}

/// Errors that can occur when creating a physical catalog.
/// Errors that can occur when resolving external dependency table references.
///
/// Returned by [`create`] when catalog creation fails.
/// Returned by [`resolve_external_deps`] when resolution fails.
#[derive(Debug, thiserror::Error)]
pub enum CreateCatalogError {
/// Table is not qualified with a schema/dataset name.
Expand Down
10 changes: 10 additions & 0 deletions crates/core/common/src/self_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{any::Any, collections::BTreeMap, sync::Arc};

use async_trait::async_trait;
use datafusion::{
arrow::datatypes::SchemaRef,
catalog::{
AsyncSchemaProvider as TableAsyncSchemaProvider, SchemaProvider as TableSchemaProvider,
TableProvider,
Expand Down Expand Up @@ -57,6 +58,15 @@ impl SelfSchemaProvider {
&self.udfs
}

/// Registers a table dynamically for progressive schema resolution.
///
/// After a table's schema is inferred during inter-table dependency processing,
/// call this method so that subsequent tables can reference it via `self.<table_name>`.
pub fn add_table(&self, name: impl Into<String>, schema: SchemaRef) {
let table_provider: Arc<dyn TableProvider> = Arc::new(PlanTable::new(schema));
self.table_cache.write().insert(name.into(), table_provider);
}

/// Creates a provider from manifest functions (no tables).
///
/// Functions are already validated at deserialization time, so this is
Expand Down
16 changes: 13 additions & 3 deletions crates/core/worker-datasets-derived/src/job_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@
pub mod query;
pub mod table;

use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};

use amp_data_store::retryable::RetryableErrorExt as _;
use amp_worker_core::{
check::consistency_check, compaction::AmpCompactor, error_detail::ErrorDetailsProvider,
retryable::RetryableErrorExt, tasks::TryWaitAllError,
};
use common::{physical_table::PhysicalTable, retryable::RetryableErrorExt as _};
use datasets_common::hash_reference::HashReference;
use datasets_common::{hash_reference::HashReference, table_name::TableName};
use tracing::Instrument;

use self::table::{MaterializeTableError, materialize_table};
Expand Down Expand Up @@ -208,6 +208,14 @@ pub async fn execute(
})?;
}

// Collect all sibling tables for inter-table dependency resolution.
// Each materialize_table call receives this map so self-ref tables
// (e.g., `self.blocks_base`) can be resolved to sibling PhysicalTables.
let siblings: BTreeMap<TableName, Arc<PhysicalTable>> = tables
.iter()
.map(|(pt, _)| (pt.table_name().clone(), Arc::clone(pt)))
.collect();

// Process all tables in parallel using FailFastJoinSet
let mut join_set =
amp_worker_core::tasks::FailFastJoinSet::<Result<(), MaterializeTableError>>::new();
Expand All @@ -226,8 +234,9 @@ pub async fn execute(
let env = env.clone();
let table = Arc::clone(table);
let compactor = Arc::clone(compactor);
let opts = parquet_opts.clone();
let manifest = manifest.clone();
let siblings = siblings.clone();
let opts = parquet_opts.clone();

join_set.spawn(
async move {
Expand All @@ -241,6 +250,7 @@ pub async fn execute(
compactor,
opts.clone(),
end,
&siblings,
)
.await?;

Expand Down
3 changes: 1 addition & 2 deletions crates/core/worker-datasets-derived/src/job_impl/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ use common::{
use datafusion::parquet::errors::ParquetError;
use futures::StreamExt as _;
use js_runtime::isolate_pool::IsolatePool;
use tracing::instrument;

use crate::job_ctx::Context;

#[instrument(skip_all, err)]
#[tracing::instrument(skip_all, err)]
#[expect(clippy::too_many_arguments)]
pub async fn materialize_sql_query(
ctx: &Context,
Expand Down
Loading
Loading