Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 284 additions & 10 deletions crates/integrations/datafusion/src/table/table_provider_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use datafusion::error::Result as DFResult;
use datafusion::logical_expr::CreateExternalTable;
use datafusion::sql::TableReference;
use iceberg::io::{FileIOBuilder, LocalFsStorageFactory, StorageFactory};
use iceberg::table::StaticTable;
use iceberg::spec::Transform;
use iceberg::table::{StaticTable, Table};
use iceberg::{Error, ErrorKind, Result, TableIdent};

use super::IcebergStaticTableProvider;
Expand Down Expand Up @@ -87,15 +88,31 @@ use crate::to_datafusion_error;
/// }
/// ```
///
/// A `PARTITIONED BY` clause is supported for identity-partitioned tables. Because the
/// table's partitioning is defined entirely by the Iceberg metadata, and DataFusion only
/// accepts plain column names here (not transforms like `bucket[N]` or `day`), the clause
/// must list the identity partition columns of the table's default partition spec, in order:
///
/// ```sql
/// CREATE EXTERNAL TABLE my_iceberg_table
/// STORED AS ICEBERG LOCATION '/path/to/metadata.json'
/// PARTITIONED BY (event_date)
/// ```
///
/// Tables partitioned with non-identity transforms can still be registered by omitting the
/// `PARTITIONED BY` clause.
///
/// # Note
/// This factory is designed to work with the DataFusion query engine,
/// specifically for handling Iceberg tables in external table commands.
/// Currently, this implementation supports only reading Iceberg tables, with
/// the creation of new tables not yet available.
///
/// # Errors
/// An error will be returned if any unsupported feature, such as partition columns,
/// order expressions, constraints, or column defaults, is detected in the table creation command.
/// An error will be returned if any unsupported feature, such as order expressions,
/// constraints, or column defaults, is detected in the table creation command, if a
/// `PARTITIONED BY` clause is used on a table with a non-identity transform, or if the
/// `PARTITIONED BY` columns do not match the table's identity partition columns.
#[derive(Debug, Default)]
pub struct IcebergTableProviderFactory {
storage_factory: Option<Arc<dyn StorageFactory>>,
Expand Down Expand Up @@ -146,6 +163,12 @@ impl TableProviderFactory for IcebergTableProviderFactory {
.map_err(to_datafusion_error)?
.into_table();

// The Iceberg metadata is the source of truth for partitioning, so a
// `PARTITIONED BY` clause is only accepted when it agrees with the
// table's default partition spec.
validate_partition_columns(&table, &cmd.table_partition_cols)
.map_err(to_datafusion_error)?;

let provider = IcebergStaticTableProvider::try_new_from_table(table)
.await
.map_err(to_datafusion_error)?;
Expand All @@ -157,7 +180,6 @@ impl TableProviderFactory for IcebergTableProviderFactory {
fn check_cmd(cmd: &CreateExternalTable) -> Result<()> {
let CreateExternalTable {
schema,
table_partition_cols,
order_exprs,
constraints,
column_defaults,
Expand All @@ -166,7 +188,6 @@ fn check_cmd(cmd: &CreateExternalTable) -> Result<()> {

// Check if any of the fields violate the constraints in a single condition
let is_invalid = !schema.fields().is_empty()
|| !table_partition_cols.is_empty()
|| !order_exprs.is_empty()
|| !constraints.is_empty()
|| !column_defaults.is_empty();
Expand All @@ -181,6 +202,64 @@ fn check_cmd(cmd: &CreateExternalTable) -> Result<()> {
Ok(())
}

/// Validates the `PARTITIONED BY` columns from a `CREATE EXTERNAL TABLE` command
/// against the table's default partition spec.
///
/// `CREATE EXTERNAL TABLE ... STORED AS ICEBERG` loads an existing table, so the
/// partitioning is fully determined by the Iceberg metadata. DataFusion's grammar only
/// accepts plain column names in `PARTITIONED BY` (it cannot express transforms such as
/// `bucket[N]` or `day`), so the clause is only supported for identity-partitioned tables.
/// For an identity transform the partition field name equals its source column name, so the
/// clause must list those column names in order.
///
/// An empty clause (no `PARTITIONED BY`) skips this check: any table, including one with
/// non-identity transforms, can still be registered for read-only access without declaring
/// its partitioning.
fn validate_partition_columns(table: &Table, declared_partition_cols: &[String]) -> Result<()> {
if declared_partition_cols.is_empty() {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior here is open for discussion.

We could choose ignore validation partition spec, pros is it will unblock user creating an external table that is partitioned(potentially with the case data fusion not supported), cons is the sql is not strictly accurate.

return Ok(());
}

let spec = table.metadata().default_partition_spec();

// DataFusion cannot express non-identity transforms in `PARTITIONED BY`, so a table
// partitioned by any other transform cannot be described by this clause.
//
// TODO: support non-identity transforms (bucket/truncate/year/month/day/hour) once
// DataFusion's `PARTITIONED BY` grammar can express transform functions such as
// `bucket(16, id)` or `days(ts)`.
if let Some(field) = spec
.fields()
.iter()
.find(|f| f.transform != Transform::Identity)
{
return Err(Error::new(
ErrorKind::FeatureUnsupported,
format!(
"PARTITIONED BY only supports identity-partitioned tables, but partition field \
'{}' uses the '{}' transform. Omit the PARTITIONED BY clause to register this table.",
field.name, field.transform
),
));
}

let actual: Vec<&str> = spec.fields().iter().map(|f| f.name.as_str()).collect();
let declared: Vec<&str> = declared_partition_cols.iter().map(String::as_str).collect();

if declared != actual {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"PARTITIONED BY columns {declared:?} do not match the table's identity partition \
columns {actual:?}. The PARTITIONED BY clause must list those columns in order, \
or be omitted."
),
));
}

Ok(())
}

/// Complements the namespace of a table name if necessary.
///
/// # Note
Expand Down Expand Up @@ -244,15 +323,32 @@ mod tests {
])
}

fn table_metadata_location() -> String {
fn metadata_location(file_name: &str) -> String {
format!(
"{}/testdata/table_metadata/{}",
env!("CARGO_MANIFEST_DIR"),
"TableMetadataV2.json"
"{}/testdata/table_metadata/{file_name}",
env!("CARGO_MANIFEST_DIR")
)
}

fn table_metadata_location() -> String {
metadata_location("TableMetadataV2.json")
}

fn bucket_partitioned_table_metadata_location() -> String {
metadata_location("TableMetadataV2WithBucketPartition.json")
}

fn multi_identity_partitioned_table_metadata_location() -> String {
metadata_location("TableMetadataV2WithMultiIdentityPartition.json")
}

fn create_external_table_cmd() -> CreateExternalTable {
create_external_table_cmd_with_partition_cols(Default::default())
}

fn create_external_table_cmd_with_partition_cols(
table_partition_cols: Vec<String>,
) -> CreateExternalTable {
let metadata_file_path = table_metadata_location();

CreateExternalTable {
Expand All @@ -261,7 +357,7 @@ mod tests {
schema: Arc::new(DFSchema::empty()),
file_type: "iceberg".to_string(),
options: Default::default(),
table_partition_cols: Default::default(),
table_partition_cols,
order_exprs: Default::default(),
constraints: Constraints::default(),
column_defaults: Default::default(),
Expand Down Expand Up @@ -324,4 +420,182 @@ mod tests {

assert_eq!(actual_schema.as_ref(), &expected_schema);
}

// `TableMetadataV2.json` has a default partition spec with a single identity
// partition field named "x".

#[tokio::test]
async fn test_partitioned_by_matching_partition_spec() {
let factory = IcebergTableProviderFactory::new();
let state = SessionStateBuilder::new().build();
let cmd = create_external_table_cmd_with_partition_cols(vec!["x".to_string()]);

let table_provider = factory
.create(&state, &cmd)
.await
.expect("create table with matching PARTITIONED BY should succeed");

assert_eq!(
table_provider.schema().as_ref(),
&table_metadata_v2_schema()
);
}

#[tokio::test]
async fn test_partitioned_by_mismatching_partition_spec() {
let factory = IcebergTableProviderFactory::new();
let state = SessionStateBuilder::new().build();
let cmd = create_external_table_cmd_with_partition_cols(vec!["y".to_string()]);

let err = factory
.create(&state, &cmd)
.await
.expect_err("create table with mismatching PARTITIONED BY should fail");

assert!(
err.to_string()
.contains("do not match the table's identity partition columns"),
"unexpected error: {err}"
);
}

#[tokio::test]
async fn test_partitioned_by_multiple_identity_columns() {
let factory = IcebergTableProviderFactory::new();
let state = SessionStateBuilder::new().build();

// The table is partitioned by identity(x), identity(y).
let mut cmd =
create_external_table_cmd_with_partition_cols(vec!["x".to_string(), "y".to_string()]);
cmd.location = multi_identity_partitioned_table_metadata_location();

factory
.create(&state, &cmd)
.await
.expect("PARTITIONED BY (x, y) should match the multi-column partition spec");
}

#[tokio::test]
async fn test_partitioned_by_identity_columns_mismatch_is_rejected() {
// All cases run against the table partitioned by identity(x), identity(y).
// Each declares partition columns that differ from the spec and must be rejected.
let cases: [(&[&str], &str); 3] = [
(&["y", "x"], "wrong order"),
(&["x"], "subset / missing column"),
(&["x", "y", "z"], "extra column"),
];

for (declared_cols, description) in cases {
let factory = IcebergTableProviderFactory::new();
let state = SessionStateBuilder::new().build();

let mut cmd = create_external_table_cmd_with_partition_cols(
declared_cols.iter().map(|s| s.to_string()).collect(),
);
cmd.location = multi_identity_partitioned_table_metadata_location();

let err = factory.create(&state, &cmd).await.unwrap_err();

assert!(
err.to_string()
.contains("do not match the table's identity partition columns"),
"case '{description}': unexpected error: {err}"
);
}
}

#[tokio::test]
async fn test_partitioned_by_rejects_non_identity_transform() {
let factory = IcebergTableProviderFactory::new();
let state = SessionStateBuilder::new().build();

// The table is partitioned by `bucket[4](x)`, which cannot be expressed as a
// plain column name in `PARTITIONED BY`.
let mut cmd = create_external_table_cmd_with_partition_cols(vec!["x_bucket".to_string()]);
cmd.location = bucket_partitioned_table_metadata_location();

let err = factory
.create(&state, &cmd)
.await
.expect_err("PARTITIONED BY on a bucket-partitioned table should fail");

let msg = err.to_string();
assert!(
msg.contains("only supports identity-partitioned tables") && msg.contains("bucket[4]"),
"unexpected error: {msg}"
);
}

#[tokio::test]
async fn test_non_identity_partition_table_registers_without_partitioned_by() {
// Omitting PARTITIONED BY must still allow registering a non-identity partitioned
// table for read-only access.
let factory = IcebergTableProviderFactory::new();
let state = SessionStateBuilder::new().build();

let mut cmd = create_external_table_cmd();
cmd.location = bucket_partitioned_table_metadata_location();

factory
.create(&state, &cmd)
.await
.expect("registering a bucket-partitioned table without PARTITIONED BY should succeed");
}

/// Registers the `IcebergTableProviderFactory` and runs the statement through the full
/// SQL pipeline (parser + planner + factory).
async fn iceberg_external_table_ctx() -> SessionContext {
let mut state = SessionStateBuilder::new().with_default_features().build();
state.table_factories_mut().insert(
"ICEBERG".to_string(),
Arc::new(IcebergTableProviderFactory::new()),
);
SessionContext::new_with_state(state)
}

#[tokio::test]
async fn test_partitioned_by_via_sql() {
let ctx = iceberg_external_table_ctx().await;

// The table is partitioned by identity(x).
let sql = format!(
"CREATE EXTERNAL TABLE static_table STORED AS ICEBERG LOCATION '{}' PARTITIONED BY (x)",
table_metadata_location()
);
ctx.sql(&sql)
.await
.expect("CREATE EXTERNAL TABLE ... PARTITIONED BY (x) should succeed");

let table_provider = ctx
.table_provider(TableReference::bare("static_table"))
.await
.expect("table not found");

assert_eq!(
table_provider.schema().as_ref(),
&table_metadata_v2_schema()
);
}

#[tokio::test]
async fn test_partitioned_by_via_sql_rejects_non_identity_transform() {
let ctx = iceberg_external_table_ctx().await;

// The table is partitioned by bucket[4](x), which `PARTITIONED BY` cannot express.
let sql = format!(
"CREATE EXTERNAL TABLE static_table STORED AS ICEBERG LOCATION '{}' PARTITIONED BY (x_bucket)",
bucket_partitioned_table_metadata_location()
);

let err = ctx
.sql(&sql)
.await
.expect_err("PARTITIONED BY on a bucket-partitioned table should fail");

assert!(
err.to_string()
.contains("only supports identity-partitioned tables"),
"unexpected error: {err}"
);
}
}
Loading
Loading