diff --git a/crates/integrations/datafusion/src/table/table_provider_factory.rs b/crates/integrations/datafusion/src/table/table_provider_factory.rs index 9692340cac..e18e7f6f6e 100644 --- a/crates/integrations/datafusion/src/table/table_provider_factory.rs +++ b/crates/integrations/datafusion/src/table/table_provider_factory.rs @@ -25,7 +25,8 @@ use datafusion::error::Result as DFResult; use datafusion::logical_expr::CreateExternalTable; use datafusion::sql::TableReference; use iceberg::io::{FileIOBuilder, LocalFsStorageFactory, StorageFactory}; -use iceberg::table::StaticTable; +use iceberg::spec::Transform; +use iceberg::table::{StaticTable, Table}; use iceberg::{Error, ErrorKind, Result, TableIdent}; use super::IcebergStaticTableProvider; @@ -87,6 +88,20 @@ use crate::to_datafusion_error; /// } /// ``` /// +/// A `PARTITIONED BY` clause is supported for identity-partitioned tables. Because the +/// table's partitioning is defined entirely by the Iceberg metadata, and DataFusion only +/// accepts plain column names here (not transforms like `bucket[N]` or `day`), the clause +/// must list the identity partition columns of the table's default partition spec, in order: +/// +/// ```sql +/// CREATE EXTERNAL TABLE my_iceberg_table +/// STORED AS ICEBERG LOCATION '/path/to/metadata.json' +/// PARTITIONED BY (event_date) +/// ``` +/// +/// Tables partitioned with non-identity transforms can still be registered by omitting the +/// `PARTITIONED BY` clause. +/// /// # Note /// This factory is designed to work with the DataFusion query engine, /// specifically for handling Iceberg tables in external table commands. @@ -94,8 +109,10 @@ use crate::to_datafusion_error; /// the creation of new tables not yet available. /// /// # Errors -/// An error will be returned if any unsupported feature, such as partition columns, -/// order expressions, constraints, or column defaults, is detected in the table creation command. +/// An error will be returned if any unsupported feature, such as order expressions, +/// constraints, or column defaults, is detected in the table creation command, if a +/// `PARTITIONED BY` clause is used on a table with a non-identity transform, or if the +/// `PARTITIONED BY` columns do not match the table's identity partition columns. #[derive(Debug, Default)] pub struct IcebergTableProviderFactory { storage_factory: Option>, @@ -146,6 +163,12 @@ impl TableProviderFactory for IcebergTableProviderFactory { .map_err(to_datafusion_error)? .into_table(); + // The Iceberg metadata is the source of truth for partitioning, so a + // `PARTITIONED BY` clause is only accepted when it agrees with the + // table's default partition spec. + validate_partition_columns(&table, &cmd.table_partition_cols) + .map_err(to_datafusion_error)?; + let provider = IcebergStaticTableProvider::try_new_from_table(table) .await .map_err(to_datafusion_error)?; @@ -157,7 +180,6 @@ impl TableProviderFactory for IcebergTableProviderFactory { fn check_cmd(cmd: &CreateExternalTable) -> Result<()> { let CreateExternalTable { schema, - table_partition_cols, order_exprs, constraints, column_defaults, @@ -166,7 +188,6 @@ fn check_cmd(cmd: &CreateExternalTable) -> Result<()> { // Check if any of the fields violate the constraints in a single condition let is_invalid = !schema.fields().is_empty() - || !table_partition_cols.is_empty() || !order_exprs.is_empty() || !constraints.is_empty() || !column_defaults.is_empty(); @@ -181,6 +202,64 @@ fn check_cmd(cmd: &CreateExternalTable) -> Result<()> { Ok(()) } +/// Validates the `PARTITIONED BY` columns from a `CREATE EXTERNAL TABLE` command +/// against the table's default partition spec. +/// +/// `CREATE EXTERNAL TABLE ... STORED AS ICEBERG` loads an existing table, so the +/// partitioning is fully determined by the Iceberg metadata. DataFusion's grammar only +/// accepts plain column names in `PARTITIONED BY` (it cannot express transforms such as +/// `bucket[N]` or `day`), so the clause is only supported for identity-partitioned tables. +/// For an identity transform the partition field name equals its source column name, so the +/// clause must list those column names in order. +/// +/// An empty clause (no `PARTITIONED BY`) skips this check: any table, including one with +/// non-identity transforms, can still be registered for read-only access without declaring +/// its partitioning. +fn validate_partition_columns(table: &Table, declared_partition_cols: &[String]) -> Result<()> { + if declared_partition_cols.is_empty() { + return Ok(()); + } + + let spec = table.metadata().default_partition_spec(); + + // DataFusion cannot express non-identity transforms in `PARTITIONED BY`, so a table + // partitioned by any other transform cannot be described by this clause. + // + // TODO: support non-identity transforms (bucket/truncate/year/month/day/hour) once + // DataFusion's `PARTITIONED BY` grammar can express transform functions such as + // `bucket(16, id)` or `days(ts)`. + if let Some(field) = spec + .fields() + .iter() + .find(|f| f.transform != Transform::Identity) + { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "PARTITIONED BY only supports identity-partitioned tables, but partition field \ + '{}' uses the '{}' transform. Omit the PARTITIONED BY clause to register this table.", + field.name, field.transform + ), + )); + } + + let actual: Vec<&str> = spec.fields().iter().map(|f| f.name.as_str()).collect(); + let declared: Vec<&str> = declared_partition_cols.iter().map(String::as_str).collect(); + + if declared != actual { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "PARTITIONED BY columns {declared:?} do not match the table's identity partition \ + columns {actual:?}. The PARTITIONED BY clause must list those columns in order, \ + or be omitted." + ), + )); + } + + Ok(()) +} + /// Complements the namespace of a table name if necessary. /// /// # Note @@ -244,15 +323,32 @@ mod tests { ]) } - fn table_metadata_location() -> String { + fn metadata_location(file_name: &str) -> String { format!( - "{}/testdata/table_metadata/{}", - env!("CARGO_MANIFEST_DIR"), - "TableMetadataV2.json" + "{}/testdata/table_metadata/{file_name}", + env!("CARGO_MANIFEST_DIR") ) } + fn table_metadata_location() -> String { + metadata_location("TableMetadataV2.json") + } + + fn bucket_partitioned_table_metadata_location() -> String { + metadata_location("TableMetadataV2WithBucketPartition.json") + } + + fn multi_identity_partitioned_table_metadata_location() -> String { + metadata_location("TableMetadataV2WithMultiIdentityPartition.json") + } + fn create_external_table_cmd() -> CreateExternalTable { + create_external_table_cmd_with_partition_cols(Default::default()) + } + + fn create_external_table_cmd_with_partition_cols( + table_partition_cols: Vec, + ) -> CreateExternalTable { let metadata_file_path = table_metadata_location(); CreateExternalTable { @@ -261,7 +357,7 @@ mod tests { schema: Arc::new(DFSchema::empty()), file_type: "iceberg".to_string(), options: Default::default(), - table_partition_cols: Default::default(), + table_partition_cols, order_exprs: Default::default(), constraints: Constraints::default(), column_defaults: Default::default(), @@ -324,4 +420,182 @@ mod tests { assert_eq!(actual_schema.as_ref(), &expected_schema); } + + // `TableMetadataV2.json` has a default partition spec with a single identity + // partition field named "x". + + #[tokio::test] + async fn test_partitioned_by_matching_partition_spec() { + let factory = IcebergTableProviderFactory::new(); + let state = SessionStateBuilder::new().build(); + let cmd = create_external_table_cmd_with_partition_cols(vec!["x".to_string()]); + + let table_provider = factory + .create(&state, &cmd) + .await + .expect("create table with matching PARTITIONED BY should succeed"); + + assert_eq!( + table_provider.schema().as_ref(), + &table_metadata_v2_schema() + ); + } + + #[tokio::test] + async fn test_partitioned_by_mismatching_partition_spec() { + let factory = IcebergTableProviderFactory::new(); + let state = SessionStateBuilder::new().build(); + let cmd = create_external_table_cmd_with_partition_cols(vec!["y".to_string()]); + + let err = factory + .create(&state, &cmd) + .await + .expect_err("create table with mismatching PARTITIONED BY should fail"); + + assert!( + err.to_string() + .contains("do not match the table's identity partition columns"), + "unexpected error: {err}" + ); + } + + #[tokio::test] + async fn test_partitioned_by_multiple_identity_columns() { + let factory = IcebergTableProviderFactory::new(); + let state = SessionStateBuilder::new().build(); + + // The table is partitioned by identity(x), identity(y). + let mut cmd = + create_external_table_cmd_with_partition_cols(vec!["x".to_string(), "y".to_string()]); + cmd.location = multi_identity_partitioned_table_metadata_location(); + + factory + .create(&state, &cmd) + .await + .expect("PARTITIONED BY (x, y) should match the multi-column partition spec"); + } + + #[tokio::test] + async fn test_partitioned_by_identity_columns_mismatch_is_rejected() { + // All cases run against the table partitioned by identity(x), identity(y). + // Each declares partition columns that differ from the spec and must be rejected. + let cases: [(&[&str], &str); 3] = [ + (&["y", "x"], "wrong order"), + (&["x"], "subset / missing column"), + (&["x", "y", "z"], "extra column"), + ]; + + for (declared_cols, description) in cases { + let factory = IcebergTableProviderFactory::new(); + let state = SessionStateBuilder::new().build(); + + let mut cmd = create_external_table_cmd_with_partition_cols( + declared_cols.iter().map(|s| s.to_string()).collect(), + ); + cmd.location = multi_identity_partitioned_table_metadata_location(); + + let err = factory.create(&state, &cmd).await.unwrap_err(); + + assert!( + err.to_string() + .contains("do not match the table's identity partition columns"), + "case '{description}': unexpected error: {err}" + ); + } + } + + #[tokio::test] + async fn test_partitioned_by_rejects_non_identity_transform() { + let factory = IcebergTableProviderFactory::new(); + let state = SessionStateBuilder::new().build(); + + // The table is partitioned by `bucket[4](x)`, which cannot be expressed as a + // plain column name in `PARTITIONED BY`. + let mut cmd = create_external_table_cmd_with_partition_cols(vec!["x_bucket".to_string()]); + cmd.location = bucket_partitioned_table_metadata_location(); + + let err = factory + .create(&state, &cmd) + .await + .expect_err("PARTITIONED BY on a bucket-partitioned table should fail"); + + let msg = err.to_string(); + assert!( + msg.contains("only supports identity-partitioned tables") && msg.contains("bucket[4]"), + "unexpected error: {msg}" + ); + } + + #[tokio::test] + async fn test_non_identity_partition_table_registers_without_partitioned_by() { + // Omitting PARTITIONED BY must still allow registering a non-identity partitioned + // table for read-only access. + let factory = IcebergTableProviderFactory::new(); + let state = SessionStateBuilder::new().build(); + + let mut cmd = create_external_table_cmd(); + cmd.location = bucket_partitioned_table_metadata_location(); + + factory + .create(&state, &cmd) + .await + .expect("registering a bucket-partitioned table without PARTITIONED BY should succeed"); + } + + /// Registers the `IcebergTableProviderFactory` and runs the statement through the full + /// SQL pipeline (parser + planner + factory). + async fn iceberg_external_table_ctx() -> SessionContext { + let mut state = SessionStateBuilder::new().with_default_features().build(); + state.table_factories_mut().insert( + "ICEBERG".to_string(), + Arc::new(IcebergTableProviderFactory::new()), + ); + SessionContext::new_with_state(state) + } + + #[tokio::test] + async fn test_partitioned_by_via_sql() { + let ctx = iceberg_external_table_ctx().await; + + // The table is partitioned by identity(x). + let sql = format!( + "CREATE EXTERNAL TABLE static_table STORED AS ICEBERG LOCATION '{}' PARTITIONED BY (x)", + table_metadata_location() + ); + ctx.sql(&sql) + .await + .expect("CREATE EXTERNAL TABLE ... PARTITIONED BY (x) should succeed"); + + let table_provider = ctx + .table_provider(TableReference::bare("static_table")) + .await + .expect("table not found"); + + assert_eq!( + table_provider.schema().as_ref(), + &table_metadata_v2_schema() + ); + } + + #[tokio::test] + async fn test_partitioned_by_via_sql_rejects_non_identity_transform() { + let ctx = iceberg_external_table_ctx().await; + + // The table is partitioned by bucket[4](x), which `PARTITIONED BY` cannot express. + let sql = format!( + "CREATE EXTERNAL TABLE static_table STORED AS ICEBERG LOCATION '{}' PARTITIONED BY (x_bucket)", + bucket_partitioned_table_metadata_location() + ); + + let err = ctx + .sql(&sql) + .await + .expect_err("PARTITIONED BY on a bucket-partitioned table should fail"); + + assert!( + err.to_string() + .contains("only supports identity-partitioned tables"), + "unexpected error: {err}" + ); + } } diff --git a/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2WithBucketPartition.json b/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2WithBucketPartition.json new file mode 100644 index 0000000000..ff4d3690e0 --- /dev/null +++ b/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2WithBucketPartition.json @@ -0,0 +1,78 @@ +{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c2", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 1, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + } + ] + }, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [ + 1, + 2 + ], + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + }, + { + "id": 2, + "name": "y", + "required": true, + "type": "long" + }, + { + "id": 3, + "name": "z", + "required": true, + "type": "long" + } + ] + } + ], + "default-spec-id": 0, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "name": "x_bucket", + "transform": "bucket[4]", + "source-id": 1, + "field-id": 1000 + } + ] + } + ], + "last-partition-id": 1000, + "default-sort-order-id": 0, + "sort-orders": [ + { + "order-id": 0, + "fields": [] + } + ], + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [] +} diff --git a/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2WithMultiIdentityPartition.json b/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2WithMultiIdentityPartition.json new file mode 100644 index 0000000000..90ebfb9c38 --- /dev/null +++ b/crates/integrations/datafusion/testdata/table_metadata/TableMetadataV2WithMultiIdentityPartition.json @@ -0,0 +1,84 @@ +{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c3", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 1, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + } + ] + }, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [ + 1, + 2 + ], + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + }, + { + "id": 2, + "name": "y", + "required": true, + "type": "long" + }, + { + "id": 3, + "name": "z", + "required": true, + "type": "long" + } + ] + } + ], + "default-spec-id": 0, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "name": "x", + "transform": "identity", + "source-id": 1, + "field-id": 1000 + }, + { + "name": "y", + "transform": "identity", + "source-id": 2, + "field-id": 1001 + } + ] + } + ], + "last-partition-id": 1001, + "default-sort-order-id": 0, + "sort-orders": [ + { + "order-id": 0, + "fields": [] + } + ], + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [] +}