From 7bee23cb56c5c6b2011b71ae43895a15e747ae8f Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sat, 11 Apr 2026 17:25:06 +0800 Subject: [PATCH 1/5] feat(datafusion): Add DDL support with PRIMARY KEY constraint syntax - Add PaimonDdlHandler for CREATE TABLE, ALTER TABLE (ADD/DROP/RENAME COLUMN, RENAME TABLE) - Add PaimonTableFactory for CREATE EXTERNAL TABLE via DataFusion TableProviderFactory - Extend PaimonCatalogProvider/SchemaProvider with CREATE/DROP SCHEMA and DROP TABLE - Add arrow_to_paimon_type and arrow_fields_to_paimon to paimon::arrow for Arrow-to-Paimon type conversion - Support PRIMARY KEY (col, ...) constraint syntax in CREATE TABLE DDL --- crates/integrations/datafusion/src/catalog.rs | 78 ++++ crates/integrations/datafusion/src/ddl.rs | 387 +++++++++++++++++ crates/integrations/datafusion/src/lib.rs | 4 + .../datafusion/src/table_factory.rs | 137 ++++++ .../datafusion/tests/ddl_tests.rs | 402 ++++++++++++++++++ crates/paimon/src/arrow/mod.rs | 324 +++++++++++++- 6 files changed, 1331 insertions(+), 1 deletion(-) create mode 100644 crates/integrations/datafusion/src/ddl.rs create mode 100644 crates/integrations/datafusion/src/table_factory.rs create mode 100644 crates/integrations/datafusion/tests/ddl_tests.rs diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index 626a47f3..be75ec44 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -18,6 +18,7 @@ //! Paimon catalog integration for DataFusion. use std::any::Any; +use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -86,6 +87,50 @@ impl CatalogProvider for PaimonCatalogProvider { "paimon catalog access thread panicked", ) } + + fn register_schema( + &self, + name: &str, + _schema: Arc, + ) -> DFResult>> { + let catalog = Arc::clone(&self.catalog); + let name = name.to_string(); + block_on_with_runtime( + async move { + catalog + .create_database(&name, false, HashMap::new()) + .await + .map_err(to_datafusion_error)?; + Ok(Some( + Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog), name)) + as Arc, + )) + }, + "paimon catalog access thread panicked", + ) + } + + fn deregister_schema( + &self, + name: &str, + cascade: bool, + ) -> DFResult>> { + let catalog = Arc::clone(&self.catalog); + let name = name.to_string(); + block_on_with_runtime( + async move { + catalog + .drop_database(&name, false, cascade) + .await + .map_err(to_datafusion_error)?; + Ok(Some( + Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog), name)) + as Arc, + )) + }, + "paimon catalog access thread panicked", + ) + } } /// Represents a [`SchemaProvider`] for the Paimon [`Catalog`], managing @@ -159,4 +204,37 @@ impl SchemaProvider for PaimonSchemaProvider { "paimon catalog access thread panicked", ) } + + fn register_table( + &self, + _name: String, + table: Arc, + ) -> DFResult>> { + // The table is already created in the Paimon catalog by PaimonTableFactory. + // DataFusion calls register_table after the factory returns, so we just + // acknowledge it here. + Ok(Some(table)) + } + + fn deregister_table(&self, name: &str) -> DFResult>> { + let catalog = Arc::clone(&self.catalog); + let identifier = Identifier::new(self.database.clone(), name); + block_on_with_runtime( + async move { + // Try to get the table first so we can return it. + let table = match catalog.get_table(&identifier).await { + Ok(t) => t, + Err(paimon::Error::TableNotExist { .. }) => return Ok(None), + Err(e) => return Err(to_datafusion_error(e)), + }; + let provider = PaimonTableProvider::try_new(table)?; + catalog + .drop_table(&identifier, false) + .await + .map_err(to_datafusion_error)?; + Ok(Some(Arc::new(provider) as Arc)) + }, + "paimon catalog access thread panicked", + ) + } } diff --git a/crates/integrations/datafusion/src/ddl.rs b/crates/integrations/datafusion/src/ddl.rs new file mode 100644 index 00000000..02a319f4 --- /dev/null +++ b/crates/integrations/datafusion/src/ddl.rs @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! DDL support for Paimon tables. +//! +//! DataFusion does not natively support all DDL statements needed by Paimon. +//! This module provides [`PaimonDdlHandler`] which intercepts CREATE TABLE and +//! ALTER TABLE SQL, translates them to Paimon catalog operations, and delegates +//! everything else (SELECT, CREATE/DROP SCHEMA, DROP TABLE, etc.) to the +//! underlying [`SessionContext`]. +//! +//! Supported DDL: +//! - `CREATE TABLE db.t (col TYPE, ..., PRIMARY KEY (col, ...)) [PARTITIONED BY (col TYPE, ...)] [WITH ('key' = 'val')]` +//! - `ALTER TABLE db.t ADD COLUMN col TYPE` +//! - `ALTER TABLE db.t DROP COLUMN col` +//! - `ALTER TABLE db.t RENAME COLUMN old TO new` +//! - `ALTER TABLE db.t RENAME TO new_name` + +use std::sync::Arc; + +use datafusion::arrow::array::StringArray; +use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::{DataFusionError, Result as DFResult}; +use datafusion::prelude::{DataFrame, SessionContext}; +use datafusion::sql::sqlparser::ast::{ + AlterTableOperation, ColumnDef, CreateTable, CreateTableOptions, HiveDistributionStyle, + ObjectName, RenameTableNameKind, SqlOption, Statement, +}; +use datafusion::sql::sqlparser::dialect::GenericDialect; +use datafusion::sql::sqlparser::parser::Parser; +use paimon::catalog::{Catalog, Identifier}; +use paimon::spec::SchemaChange; + +use crate::error::to_datafusion_error; +use paimon::arrow::arrow_to_paimon_type; + +/// Wraps a [`SessionContext`] and a Paimon [`Catalog`] to handle DDL statements +/// that DataFusion does not natively support (e.g. ALTER TABLE). +/// +/// For all other SQL, it delegates to the inner `SessionContext`. +/// +/// # Example +/// ```ignore +/// let handler = PaimonDdlHandler::new(ctx, catalog); +/// let df = handler.sql("ALTER TABLE paimon.db.t ADD COLUMN age INT").await?; +/// ``` +pub struct PaimonDdlHandler { + ctx: SessionContext, + catalog: Arc, + /// The catalog name registered in the SessionContext (used to strip the catalog prefix). + catalog_name: String, +} + +impl PaimonDdlHandler { + pub fn new( + ctx: SessionContext, + catalog: Arc, + catalog_name: impl Into, + ) -> Self { + Self { + ctx, + catalog, + catalog_name: catalog_name.into(), + } + } + + /// Returns a reference to the inner [`SessionContext`]. + pub fn ctx(&self) -> &SessionContext { + &self.ctx + } + + /// Execute a SQL statement. ALTER TABLE is handled by Paimon directly; + /// everything else is delegated to DataFusion. + pub async fn sql(&self, sql: &str) -> DFResult { + let dialect = GenericDialect {}; + let statements = Parser::parse_sql(&dialect, sql) + .map_err(|e| DataFusionError::Plan(format!("SQL parse error: {e}")))?; + + if statements.len() != 1 { + return Err(DataFusionError::Plan( + "Expected exactly one SQL statement".to_string(), + )); + } + + match &statements[0] { + Statement::CreateTable(create_table) => self.handle_create_table(create_table).await, + Statement::AlterTable { + name, operations, .. + } => self.handle_alter_table(name, operations).await, + _ => self.ctx.sql(sql).await, + } + } + + async fn handle_create_table(&self, ct: &CreateTable) -> DFResult { + if ct.location.is_some() { + return Err(DataFusionError::Plan( + "LOCATION is not supported for Paimon tables. Table path is determined by the catalog warehouse.".to_string(), + )); + } + if ct.query.is_some() { + return Err(DataFusionError::Plan( + "CREATE TABLE AS SELECT is not yet supported for Paimon tables.".to_string(), + )); + } + + let identifier = self.resolve_table_name(&ct.name)?; + + let mut builder = paimon::spec::Schema::builder(); + + // Columns + for col in &ct.columns { + let arrow_type = sql_data_type_to_arrow(&col.data_type)?; + let nullable = !col.options.iter().any(|opt| { + matches!( + opt.option, + datafusion::sql::sqlparser::ast::ColumnOption::NotNull + ) + }); + let paimon_type = + arrow_to_paimon_type(&arrow_type, nullable).map_err(to_datafusion_error)?; + builder = builder.column(col.name.value.clone(), paimon_type); + } + + // Primary key from constraints: PRIMARY KEY (col, ...) + for constraint in &ct.constraints { + if let datafusion::sql::sqlparser::ast::TableConstraint::PrimaryKey { + columns, .. + } = constraint + { + let pk_cols: Vec = + columns.iter().map(|c| c.column.expr.to_string()).collect(); + builder = builder.primary_key(pk_cols); + } + } + + // Partition keys from PARTITIONED BY (col, ...) + if let HiveDistributionStyle::PARTITIONED { columns } = &ct.hive_distribution { + let partition_keys: Vec = + columns.iter().map(|c| c.name.value.clone()).collect(); + builder = builder.partition_keys(partition_keys); + } + + // Table options from WITH ('key' = 'value', ...) + for (k, v) in extract_options(&ct.table_options)? { + builder = builder.option(k, v); + } + + let schema = builder.build().map_err(to_datafusion_error)?; + + self.catalog + .create_table(&identifier, schema, ct.if_not_exists) + .await + .map_err(to_datafusion_error)?; + + ok_result(&self.ctx) + } + + async fn handle_alter_table( + &self, + name: &ObjectName, + operations: &[AlterTableOperation], + ) -> DFResult { + let identifier = self.resolve_table_name(name)?; + + let mut changes = Vec::new(); + let mut rename_to: Option = None; + + for op in operations { + match op { + AlterTableOperation::AddColumn { column_def, .. } => { + let change = column_def_to_add_column(column_def)?; + changes.push(change); + } + AlterTableOperation::DropColumn { + column_names, + if_exists: _, + .. + } => { + for col in column_names { + changes.push(SchemaChange::drop_column(col.value.clone())); + } + } + AlterTableOperation::RenameColumn { + old_column_name, + new_column_name, + } => { + changes.push(SchemaChange::rename_column( + old_column_name.value.clone(), + new_column_name.value.clone(), + )); + } + AlterTableOperation::RenameTable { table_name } => { + let new_name = match table_name { + RenameTableNameKind::To(name) | RenameTableNameKind::As(name) => { + object_name_to_string(name) + } + }; + rename_to = Some(Identifier::new(identifier.database().to_string(), new_name)); + } + other => { + return Err(DataFusionError::Plan(format!( + "Unsupported ALTER TABLE operation: {other}" + ))); + } + } + } + + if let Some(new_identifier) = rename_to { + self.catalog + .rename_table(&identifier, &new_identifier, false) + .await + .map_err(to_datafusion_error)?; + } + + if !changes.is_empty() { + self.catalog + .alter_table(&identifier, changes, false) + .await + .map_err(to_datafusion_error)?; + } + + ok_result(&self.ctx) + } + + /// Resolve an ObjectName like `paimon.db.table` or `db.table` to a Paimon Identifier. + fn resolve_table_name(&self, name: &ObjectName) -> DFResult { + let parts: Vec = name + .0 + .iter() + .filter_map(|p| p.as_ident().map(|id| id.value.clone())) + .collect(); + match parts.len() { + 3 => { + // catalog.database.table — strip catalog prefix + if parts[0] != self.catalog_name { + return Err(DataFusionError::Plan(format!( + "Unknown catalog '{}', expected '{}'", + parts[0], self.catalog_name + ))); + } + Ok(Identifier::new(parts[1].clone(), parts[2].clone())) + } + 2 => Ok(Identifier::new(parts[0].clone(), parts[1].clone())), + 1 => Err(DataFusionError::Plan(format!( + "ALTER TABLE requires at least database.table, got: {}", + parts[0] + ))), + _ => Err(DataFusionError::Plan(format!( + "Invalid table reference: {name}" + ))), + } + } +} + +/// Convert a sqlparser [`ColumnDef`] to a Paimon [`SchemaChange::AddColumn`]. +fn column_def_to_add_column(col: &ColumnDef) -> DFResult { + let arrow_type = sql_data_type_to_arrow(&col.data_type)?; + let nullable = !col.options.iter().any(|opt| { + matches!( + opt.option, + datafusion::sql::sqlparser::ast::ColumnOption::NotNull + ) + }); + let paimon_type = arrow_to_paimon_type(&arrow_type, nullable).map_err(to_datafusion_error)?; + Ok(SchemaChange::add_column( + col.name.value.clone(), + paimon_type, + )) +} + +/// Minimal conversion from sqlparser SQL data types to Arrow data types. +fn sql_data_type_to_arrow( + sql_type: &datafusion::sql::sqlparser::ast::DataType, +) -> DFResult { + use datafusion::sql::sqlparser::ast::DataType as SqlType; + match sql_type { + SqlType::Boolean => Ok(ArrowDataType::Boolean), + SqlType::TinyInt(_) => Ok(ArrowDataType::Int8), + SqlType::SmallInt(_) => Ok(ArrowDataType::Int16), + SqlType::Int(_) | SqlType::Integer(_) => Ok(ArrowDataType::Int32), + SqlType::BigInt(_) => Ok(ArrowDataType::Int64), + SqlType::Float(_) => Ok(ArrowDataType::Float32), + SqlType::Real => Ok(ArrowDataType::Float32), + SqlType::Double(_) | SqlType::DoublePrecision => Ok(ArrowDataType::Float64), + SqlType::Varchar(_) | SqlType::CharVarying(_) | SqlType::Text | SqlType::String(_) => { + Ok(ArrowDataType::Utf8) + } + SqlType::Char(_) | SqlType::Character(_) => Ok(ArrowDataType::Utf8), + SqlType::Binary(_) | SqlType::Varbinary(_) | SqlType::Blob(_) | SqlType::Bytea => { + Ok(ArrowDataType::Binary) + } + SqlType::Date => Ok(ArrowDataType::Date32), + SqlType::Timestamp(precision, tz_info) => { + use datafusion::sql::sqlparser::ast::TimezoneInfo; + let unit = match precision { + Some(0) => datafusion::arrow::datatypes::TimeUnit::Second, + Some(1..=3) | None => datafusion::arrow::datatypes::TimeUnit::Millisecond, + Some(4..=6) => datafusion::arrow::datatypes::TimeUnit::Microsecond, + _ => datafusion::arrow::datatypes::TimeUnit::Nanosecond, + }; + let tz = match tz_info { + TimezoneInfo::None | TimezoneInfo::WithoutTimeZone => None, + _ => Some("UTC".into()), + }; + Ok(ArrowDataType::Timestamp(unit, tz)) + } + SqlType::Decimal(info) => { + use datafusion::sql::sqlparser::ast::ExactNumberInfo; + let (p, s) = match info { + ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as i8), + ExactNumberInfo::Precision(p) => (*p as u8, 0), + ExactNumberInfo::None => (10, 0), + }; + Ok(ArrowDataType::Decimal128(p, s)) + } + _ => Err(DataFusionError::Plan(format!( + "Unsupported SQL data type for ALTER TABLE: {sql_type}" + ))), + } +} + +fn object_name_to_string(name: &ObjectName) -> String { + name.0 + .iter() + .filter_map(|p| p.as_ident().map(|id| id.value.clone())) + .collect::>() + .join(".") +} + +/// Extract key-value pairs from [`CreateTableOptions`]. +fn extract_options(opts: &CreateTableOptions) -> DFResult> { + let sql_options = match opts { + CreateTableOptions::With(options) + | CreateTableOptions::Options(options) + | CreateTableOptions::TableProperties(options) + | CreateTableOptions::Plain(options) => options, + CreateTableOptions::None => return Ok(Vec::new()), + }; + sql_options + .iter() + .map(|opt| match opt { + SqlOption::KeyValue { key, value } => { + let v = value.to_string(); + // Strip surrounding quotes from the value if present. + let v = v + .strip_prefix('\'') + .and_then(|s| s.strip_suffix('\'')) + .unwrap_or(&v) + .to_string(); + Ok((key.value.clone(), v)) + } + other => Err(DataFusionError::Plan(format!( + "Unsupported table option: {other}" + ))), + }) + .collect() +} + +/// Return an empty DataFrame with a single "result" column containing "OK". +fn ok_result(ctx: &SessionContext) -> DFResult { + let schema = Arc::new(Schema::new(vec![Field::new( + "result", + ArrowDataType::Utf8, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(vec!["OK"]))], + )?; + let df = ctx.read_batch(batch)?; + Ok(df) +} diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index 4e9fdb3b..82755382 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -37,6 +37,7 @@ //! translatable partition-only conjuncts from DataFusion filters. mod catalog; +mod ddl; mod error; mod filter_pushdown; #[cfg(feature = "fulltext")] @@ -45,11 +46,14 @@ mod physical_plan; mod relation_planner; pub mod runtime; mod table; +mod table_factory; pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider}; +pub use ddl::PaimonDdlHandler; pub use error::to_datafusion_error; #[cfg(feature = "fulltext")] pub use full_text_search::{register_full_text_search, FullTextSearchFunction}; pub use physical_plan::PaimonTableScan; pub use relation_planner::PaimonRelationPlanner; pub use table::PaimonTableProvider; +pub use table_factory::PaimonTableFactory; diff --git a/crates/integrations/datafusion/src/table_factory.rs b/crates/integrations/datafusion/src/table_factory.rs new file mode 100644 index 00000000..d6e12c62 --- /dev/null +++ b/crates/integrations/datafusion/src/table_factory.rs @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`TableProviderFactory`] implementation for creating Paimon tables via +//! `CREATE EXTERNAL TABLE`. + +use std::fmt::Debug; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::catalog::{Session, TableProvider, TableProviderFactory}; +use datafusion::common::TableReference; +use datafusion::error::{DataFusionError, Result as DFResult}; +use datafusion::logical_expr::CreateExternalTable; +use paimon::catalog::{Catalog, Identifier}; +use paimon::spec::Schema; + +use crate::error::to_datafusion_error; +use crate::table::PaimonTableProvider; +use paimon::arrow::arrow_to_paimon_type; + +/// A [`TableProviderFactory`] that creates Paimon tables. +/// +/// Register with: +/// ```ignore +/// ctx.state_mut().table_factories_mut() +/// .insert("PAIMON".to_string(), Arc::new(PaimonTableFactory::new(catalog))); +/// ``` +/// +/// Then use: +/// ```sql +/// CREATE EXTERNAL TABLE paimon.my_db.my_table ( +/// id INT NOT NULL, +/// name STRING, +/// dt STRING, +/// PRIMARY KEY (id, dt) +/// ) PARTITIONED BY (dt) +/// WITH ('bucket' = '2'); +/// ``` +pub struct PaimonTableFactory { + catalog: Arc, +} + +impl Debug for PaimonTableFactory { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PaimonTableFactory").finish() + } +} + +impl PaimonTableFactory { + pub fn new(catalog: Arc) -> Self { + Self { catalog } + } +} + +#[async_trait] +impl TableProviderFactory for PaimonTableFactory { + async fn create( + &self, + _state: &dyn Session, + cmd: &CreateExternalTable, + ) -> DFResult> { + if !cmd.location.is_empty() { + return Err(DataFusionError::Plan( + "LOCATION is not supported for Paimon tables. Table path is determined by the catalog warehouse.".to_string(), + )); + } + + let identifier = resolve_identifier(&cmd.name)?; + + // Build Paimon schema from the CREATE EXTERNAL TABLE command. + let arrow_schema = cmd.schema.as_arrow(); + let mut builder = Schema::builder(); + + for field in arrow_schema.fields() { + let paimon_type = arrow_to_paimon_type(field.data_type(), field.is_nullable()) + .map_err(to_datafusion_error)?; + builder = builder.column(field.name().clone(), paimon_type); + } + + if !cmd.table_partition_cols.is_empty() { + builder = builder.partition_keys(cmd.table_partition_cols.clone()); + } + + // Pass all OPTIONS through to Paimon (includes 'bucket', etc.). + // DataFusion prefixes options with "format." — strip that prefix for Paimon. + for (k, v) in &cmd.options { + let key = k.strip_prefix("format.").unwrap_or(k); + builder = builder.option(key.to_string(), v.clone()); + } + + let schema = builder.build().map_err(to_datafusion_error)?; + + self.catalog + .create_table(&identifier, schema, cmd.if_not_exists) + .await + .map_err(to_datafusion_error)?; + + // Return the newly created table as a provider. + let table = self + .catalog + .get_table(&identifier) + .await + .map_err(to_datafusion_error)?; + let provider = PaimonTableProvider::try_new(table)?; + Ok(Arc::new(provider)) + } +} + +/// Extract a Paimon [`Identifier`] (database, table) from a DataFusion [`TableReference`]. +fn resolve_identifier(name: &TableReference) -> DFResult { + match name { + TableReference::Full { + schema, table, .. + } => Ok(Identifier::new(schema.to_string(), table.to_string())), + TableReference::Partial { schema, table } => { + Ok(Identifier::new(schema.to_string(), table.to_string())) + } + TableReference::Bare { table } => Err(DataFusionError::Plan(format!( + "CREATE EXTERNAL TABLE requires a fully qualified name (catalog.database.table or database.table), got: {table}" + ))), + } +} diff --git a/crates/integrations/datafusion/tests/ddl_tests.rs b/crates/integrations/datafusion/tests/ddl_tests.rs new file mode 100644 index 00000000..207583d6 --- /dev/null +++ b/crates/integrations/datafusion/tests/ddl_tests.rs @@ -0,0 +1,402 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! DDL integration tests for paimon-datafusion. + +use std::sync::Arc; + +use datafusion::catalog::CatalogProvider; +use datafusion::prelude::SessionContext; +use paimon::catalog::Identifier; +use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options}; +use paimon_datafusion::{PaimonCatalogProvider, PaimonDdlHandler, PaimonRelationPlanner}; +use tempfile::TempDir; + +fn create_test_env() -> (TempDir, Arc) { + let temp_dir = TempDir::new().expect("Failed to create temp dir"); + let warehouse = format!("file://{}", temp_dir.path().display()); + let mut options = Options::new(); + options.set(CatalogOptions::WAREHOUSE, warehouse); + let catalog = FileSystemCatalog::new(options).expect("Failed to create catalog"); + (temp_dir, Arc::new(catalog)) +} + +fn create_handler(catalog: Arc) -> PaimonDdlHandler { + let ctx = SessionContext::new(); + ctx.register_catalog( + "paimon", + Arc::new(PaimonCatalogProvider::new(catalog.clone())), + ); + ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new())) + .expect("Failed to register relation planner"); + PaimonDdlHandler::new(ctx, catalog, "paimon") +} + +// ======================= CREATE / DROP SCHEMA ======================= + +#[tokio::test] +async fn test_create_schema() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + handler + .sql("CREATE SCHEMA paimon.test_db") + .await + .expect("CREATE SCHEMA should succeed"); + + let databases = catalog.list_databases().await.unwrap(); + assert!( + databases.contains(&"test_db".to_string()), + "Database test_db should exist after CREATE SCHEMA" + ); +} + +#[tokio::test] +async fn test_drop_schema() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("drop_me", false, Default::default()) + .await + .unwrap(); + + handler + .sql("DROP SCHEMA paimon.drop_me CASCADE") + .await + .expect("DROP SCHEMA should succeed"); + + let databases = catalog.list_databases().await.unwrap(); + assert!( + !databases.contains(&"drop_me".to_string()), + "Database drop_me should not exist after DROP SCHEMA" + ); +} + +#[tokio::test] +async fn test_schema_names_via_catalog_provider() { + let (_tmp, catalog) = create_test_env(); + let provider = PaimonCatalogProvider::new(catalog.clone()); + + catalog + .create_database("db_a", false, Default::default()) + .await + .unwrap(); + catalog + .create_database("db_b", false, Default::default()) + .await + .unwrap(); + + let names = provider.schema_names(); + assert!(names.contains(&"db_a".to_string())); + assert!(names.contains(&"db_b".to_string())); +} + +// ======================= CREATE TABLE ======================= + +#[tokio::test] +async fn test_create_table() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + handler + .sql( + "CREATE TABLE paimon.mydb.users ( + id INT NOT NULL, + name STRING, + age INT, + PRIMARY KEY (id) + )", + ) + .await + .expect("CREATE TABLE should succeed"); + + let tables = catalog.list_tables("mydb").await.unwrap(); + assert!( + tables.contains(&"users".to_string()), + "Table users should exist after CREATE TABLE" + ); + + // Verify schema + let table = catalog + .get_table(&Identifier::new("mydb", "users")) + .await + .unwrap(); + let schema = table.schema(); + assert_eq!(schema.fields().len(), 3); + assert_eq!(schema.primary_keys(), &["id"]); +} + +#[tokio::test] +async fn test_create_table_with_partition() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + handler + .sql( + "CREATE TABLE paimon.mydb.events ( + id INT NOT NULL, + name STRING, + dt STRING, + PRIMARY KEY (id, dt) + ) PARTITIONED BY (dt STRING) + WITH ('bucket' = '2')", + ) + .await + .expect("CREATE TABLE with partition should succeed"); + + let table = catalog + .get_table(&Identifier::new("mydb", "events")) + .await + .unwrap(); + let schema = table.schema(); + assert_eq!(schema.partition_keys(), &["dt"]); + assert_eq!(schema.primary_keys(), &["id", "dt"]); + assert_eq!( + schema.options().get("bucket"), + Some(&"2".to_string()), + "Table option 'bucket' should be preserved" + ); +} + +#[tokio::test] +async fn test_create_table_if_not_exists() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + let sql = "CREATE TABLE IF NOT EXISTS paimon.mydb.t1 ( + id INT NOT NULL + )"; + + // First create should succeed + handler.sql(sql).await.expect("First CREATE should succeed"); + + // Second create with IF NOT EXISTS should also succeed + handler + .sql(sql) + .await + .expect("Second CREATE with IF NOT EXISTS should succeed"); +} + +#[tokio::test] +async fn test_create_table_with_location_rejected() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + // EXTERNAL TABLE with LOCATION is the only way sqlparser populates the location field + let result = handler + .sql( + "CREATE EXTERNAL TABLE paimon.mydb.bad ( + id INT NOT NULL + ) STORED AS PARQUET + LOCATION '/some/path'", + ) + .await; + + assert!( + result.is_err(), + "LOCATION should be rejected for Paimon tables" + ); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("LOCATION is not supported"), + "Error should mention LOCATION is not supported, got: {err_msg}" + ); +} + +// ======================= DROP TABLE ======================= + +#[tokio::test] +async fn test_drop_table() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + // Create a table first + let schema = paimon::spec::Schema::builder() + .column( + "id", + paimon::spec::DataType::Int(paimon::spec::IntType::new()), + ) + .build() + .unwrap(); + catalog + .create_table(&Identifier::new("mydb", "to_drop"), schema, false) + .await + .unwrap(); + + assert!(catalog + .list_tables("mydb") + .await + .unwrap() + .contains(&"to_drop".to_string())); + + handler + .sql("DROP TABLE paimon.mydb.to_drop") + .await + .expect("DROP TABLE should succeed"); + + assert!( + !catalog + .list_tables("mydb") + .await + .unwrap() + .contains(&"to_drop".to_string()), + "Table should not exist after DROP TABLE" + ); +} + +// ======================= ALTER TABLE ======================= + +#[tokio::test] +async fn test_alter_table_add_column() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + let schema = paimon::spec::Schema::builder() + .column( + "id", + paimon::spec::DataType::Int(paimon::spec::IntType::new()), + ) + .column( + "name", + paimon::spec::DataType::VarChar(paimon::spec::VarCharType::string_type()), + ) + .build() + .unwrap(); + catalog + .create_table(&Identifier::new("mydb", "alter_test"), schema, false) + .await + .unwrap(); + + // ALTER TABLE is not yet implemented in FileSystemCatalog, so we expect an error + let result = handler + .sql("ALTER TABLE paimon.mydb.alter_test ADD COLUMN age INT") + .await; + + // FileSystemCatalog returns Unsupported for alter_table, which is expected + assert!( + result.is_err(), + "ALTER TABLE should fail because FileSystemCatalog does not implement alter_table yet" + ); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("not yet implemented") || err_msg.contains("Unsupported"), + "Error should indicate alter_table is not implemented, got: {err_msg}" + ); +} + +#[tokio::test] +async fn test_alter_table_rename() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + let schema = paimon::spec::Schema::builder() + .column( + "id", + paimon::spec::DataType::Int(paimon::spec::IntType::new()), + ) + .build() + .unwrap(); + catalog + .create_table(&Identifier::new("mydb", "old_name"), schema, false) + .await + .unwrap(); + + handler + .sql("ALTER TABLE mydb.old_name RENAME TO new_name") + .await + .expect("ALTER TABLE RENAME should succeed"); + + let tables = catalog.list_tables("mydb").await.unwrap(); + assert!( + !tables.contains(&"old_name".to_string()), + "old_name should not exist after rename" + ); + assert!( + tables.contains(&"new_name".to_string()), + "new_name should exist after rename" + ); +} + +#[tokio::test] +async fn test_ddl_handler_delegates_select() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + let schema = paimon::spec::Schema::builder() + .column( + "id", + paimon::spec::DataType::Int(paimon::spec::IntType::new()), + ) + .build() + .unwrap(); + catalog + .create_table(&Identifier::new("mydb", "t1"), schema, false) + .await + .unwrap(); + + // SELECT should be delegated to DataFusion + let df = handler + .sql("SELECT * FROM paimon.mydb.t1") + .await + .expect("SELECT should be delegated to DataFusion"); + + let batches = df.collect().await.expect("SELECT should execute"); + // Empty table, but should succeed + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 0, "Empty table should return 0 rows"); +} diff --git a/crates/paimon/src/arrow/mod.rs b/crates/paimon/src/arrow/mod.rs index f48b9da3..e2f60fc4 100644 --- a/crates/paimon/src/arrow/mod.rs +++ b/crates/paimon/src/arrow/mod.rs @@ -22,7 +22,11 @@ pub(crate) mod schema_evolution; pub use crate::arrow::reader::ArrowReaderBuilder; -use crate::spec::{DataField, DataType as PaimonDataType}; +use crate::spec::{ + ArrayType, BigIntType, BooleanType, DataField, DataType as PaimonDataType, DateType, + DecimalType, DoubleType, FloatType, IntType, LocalZonedTimestampType, MapType, RowType, + SmallIntType, TimeType, TimestampType, TinyIntType, VarBinaryType, VarCharType, +}; use arrow_schema::DataType as ArrowDataType; use arrow_schema::{Field as ArrowField, Schema as ArrowSchema, TimeUnit}; use std::sync::Arc; @@ -128,6 +132,114 @@ fn timestamp_time_unit(precision: u32) -> crate::Result { } } +/// Convert an Arrow [`DataType`](ArrowDataType) to a Paimon [`DataType`](PaimonDataType). +pub fn arrow_to_paimon_type( + arrow_type: &ArrowDataType, + nullable: bool, +) -> crate::Result { + match arrow_type { + ArrowDataType::Boolean => Ok(PaimonDataType::Boolean(BooleanType::with_nullable( + nullable, + ))), + ArrowDataType::Int8 => Ok(PaimonDataType::TinyInt(TinyIntType::with_nullable( + nullable, + ))), + ArrowDataType::Int16 => Ok(PaimonDataType::SmallInt(SmallIntType::with_nullable( + nullable, + ))), + ArrowDataType::Int32 => Ok(PaimonDataType::Int(IntType::with_nullable(nullable))), + ArrowDataType::Int64 => Ok(PaimonDataType::BigInt(BigIntType::with_nullable(nullable))), + ArrowDataType::Float32 => Ok(PaimonDataType::Float(FloatType::with_nullable(nullable))), + ArrowDataType::Float64 => Ok(PaimonDataType::Double(DoubleType::with_nullable(nullable))), + ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View => { + Ok(PaimonDataType::VarChar(VarCharType::with_nullable( + nullable, + VarCharType::MAX_LENGTH, + )?)) + } + ArrowDataType::Binary | ArrowDataType::LargeBinary | ArrowDataType::BinaryView => Ok( + PaimonDataType::VarBinary(VarBinaryType::try_new(nullable, VarBinaryType::MAX_LENGTH)?), + ), + ArrowDataType::Date32 => Ok(PaimonDataType::Date(DateType::with_nullable(nullable))), + ArrowDataType::Timestamp(unit, tz) => { + let precision = match unit { + TimeUnit::Second => 0, + TimeUnit::Millisecond => 3, + TimeUnit::Microsecond => 6, + TimeUnit::Nanosecond => 9, + }; + if tz.is_some() { + Ok(PaimonDataType::LocalZonedTimestamp( + LocalZonedTimestampType::with_nullable(nullable, precision)?, + )) + } else { + Ok(PaimonDataType::Timestamp(TimestampType::with_nullable( + nullable, precision, + )?)) + } + } + ArrowDataType::Time32(_) | ArrowDataType::Time64(_) => { + let precision = match arrow_type { + ArrowDataType::Time32(TimeUnit::Second) => 0, + ArrowDataType::Time32(TimeUnit::Millisecond) => 3, + ArrowDataType::Time64(TimeUnit::Microsecond) => 6, + ArrowDataType::Time64(TimeUnit::Nanosecond) => 9, + _ => 0, + }; + Ok(PaimonDataType::Time(TimeType::with_nullable( + nullable, precision, + )?)) + } + ArrowDataType::Decimal128(p, s) => Ok(PaimonDataType::Decimal(DecimalType::with_nullable( + nullable, *p as u32, *s as u32, + )?)), + ArrowDataType::List(field) | ArrowDataType::LargeList(field) => { + let element = arrow_to_paimon_type(field.data_type(), field.is_nullable())?; + Ok(PaimonDataType::Array(ArrayType::with_nullable( + nullable, element, + ))) + } + ArrowDataType::Map(entries_field, _) => { + if let ArrowDataType::Struct(fields) = entries_field.data_type() { + if fields.len() == 2 { + let key = arrow_to_paimon_type(fields[0].data_type(), fields[0].is_nullable())?; + let value = + arrow_to_paimon_type(fields[1].data_type(), fields[1].is_nullable())?; + return Ok(PaimonDataType::Map(MapType::with_nullable( + nullable, key, value, + ))); + } + } + Err(crate::Error::Unsupported { + message: format!("Unsupported Map structure: {arrow_type:?}"), + }) + } + ArrowDataType::Struct(fields) => { + let field_slice: Vec = fields.iter().map(|f| f.as_ref().clone()).collect(); + let paimon_fields = arrow_fields_to_paimon(&field_slice)?; + Ok(PaimonDataType::Row(RowType::with_nullable( + nullable, + paimon_fields, + ))) + } + _ => Err(crate::Error::Unsupported { + message: format!("Unsupported Arrow type for Paimon conversion: {arrow_type:?}"), + }), + } +} + +/// Convert Arrow fields to Paimon [`DataField`]s with auto-assigned IDs starting from 0. +pub fn arrow_fields_to_paimon(fields: &[ArrowField]) -> crate::Result> { + fields + .iter() + .enumerate() + .map(|(i, f)| { + let paimon_type = arrow_to_paimon_type(f.data_type(), f.is_nullable())?; + Ok(DataField::new(i as i32, f.name().clone(), paimon_type)) + }) + .collect() +} + /// Build an Arrow [`Schema`](ArrowSchema) from Paimon [`DataField`]s. pub fn build_target_arrow_schema(fields: &[DataField]) -> crate::Result> { let arrow_fields: Vec = fields @@ -143,3 +255,213 @@ pub fn build_target_arrow_schema(fields: &[DataField]) -> crate::Result>>()?; Ok(Arc::new(ArrowSchema::new(arrow_fields))) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::spec::*; + + /// Helper: paimon -> arrow -> paimon roundtrip, assert the arrow type matches expected. + fn assert_paimon_to_arrow(paimon: &PaimonDataType, expected_arrow: &ArrowDataType) { + let arrow = paimon_type_to_arrow(paimon).unwrap(); + assert_eq!(&arrow, expected_arrow, "paimon_type_to_arrow mismatch"); + } + + /// Helper: arrow -> paimon, assert the paimon type variant matches. + fn assert_arrow_to_paimon( + arrow: &ArrowDataType, + nullable: bool, + expected_paimon: &PaimonDataType, + ) { + let paimon = arrow_to_paimon_type(arrow, nullable).unwrap(); + assert_eq!(&paimon, expected_paimon, "arrow_to_paimon_type mismatch"); + } + + #[test] + fn test_primitive_roundtrip() { + let cases: Vec<(PaimonDataType, ArrowDataType)> = vec![ + ( + PaimonDataType::Boolean(BooleanType::new()), + ArrowDataType::Boolean, + ), + ( + PaimonDataType::TinyInt(TinyIntType::new()), + ArrowDataType::Int8, + ), + ( + PaimonDataType::SmallInt(SmallIntType::new()), + ArrowDataType::Int16, + ), + (PaimonDataType::Int(IntType::new()), ArrowDataType::Int32), + ( + PaimonDataType::BigInt(BigIntType::new()), + ArrowDataType::Int64, + ), + ( + PaimonDataType::Float(FloatType::new()), + ArrowDataType::Float32, + ), + ( + PaimonDataType::Double(DoubleType::new()), + ArrowDataType::Float64, + ), + (PaimonDataType::Date(DateType::new()), ArrowDataType::Date32), + ]; + for (paimon, arrow) in &cases { + assert_paimon_to_arrow(paimon, arrow); + assert_arrow_to_paimon(arrow, true, paimon); + } + } + + #[test] + fn test_string_types() { + let varchar = PaimonDataType::VarChar(VarCharType::new(VarCharType::MAX_LENGTH).unwrap()); + assert_paimon_to_arrow(&varchar, &ArrowDataType::Utf8); + + // All string-like arrow types map to VarChar + for arrow in &[ + ArrowDataType::Utf8, + ArrowDataType::LargeUtf8, + ArrowDataType::Utf8View, + ] { + assert_arrow_to_paimon(arrow, true, &varchar); + } + } + + #[test] + fn test_binary_types() { + let varbinary = PaimonDataType::VarBinary( + VarBinaryType::try_new(true, VarBinaryType::MAX_LENGTH).unwrap(), + ); + assert_paimon_to_arrow(&varbinary, &ArrowDataType::Binary); + + for arrow in &[ + ArrowDataType::Binary, + ArrowDataType::LargeBinary, + ArrowDataType::BinaryView, + ] { + assert_arrow_to_paimon(arrow, true, &varbinary); + } + } + + #[test] + fn test_timestamp_roundtrip() { + // millisecond precision + let ts3 = PaimonDataType::Timestamp(TimestampType::new(3).unwrap()); + assert_paimon_to_arrow(&ts3, &ArrowDataType::Timestamp(TimeUnit::Millisecond, None)); + assert_arrow_to_paimon( + &ArrowDataType::Timestamp(TimeUnit::Millisecond, None), + true, + &ts3, + ); + + // microsecond precision + let ts6 = PaimonDataType::Timestamp(TimestampType::new(6).unwrap()); + assert_paimon_to_arrow(&ts6, &ArrowDataType::Timestamp(TimeUnit::Microsecond, None)); + assert_arrow_to_paimon( + &ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + true, + &ts6, + ); + + // nanosecond precision + let ts9 = PaimonDataType::Timestamp(TimestampType::new(9).unwrap()); + assert_paimon_to_arrow(&ts9, &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)); + assert_arrow_to_paimon( + &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + true, + &ts9, + ); + } + + #[test] + fn test_local_zoned_timestamp() { + let lzts = PaimonDataType::LocalZonedTimestamp(LocalZonedTimestampType::new(3).unwrap()); + let arrow = ArrowDataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())); + assert_paimon_to_arrow(&lzts, &arrow); + assert_arrow_to_paimon(&arrow, true, &lzts); + } + + #[test] + fn test_decimal_roundtrip() { + let dec = PaimonDataType::Decimal(DecimalType::new(10, 2).unwrap()); + assert_paimon_to_arrow(&dec, &ArrowDataType::Decimal128(10, 2)); + assert_arrow_to_paimon(&ArrowDataType::Decimal128(10, 2), true, &dec); + } + + #[test] + fn test_array_roundtrip() { + let paimon_arr = PaimonDataType::Array(ArrayType::new(PaimonDataType::Int(IntType::new()))); + let arrow_list = ArrowDataType::List(Arc::new(ArrowField::new( + "element", + ArrowDataType::Int32, + true, + ))); + assert_paimon_to_arrow(&paimon_arr, &arrow_list); + + // arrow -> paimon: element field name doesn't matter + let arrow_list2 = ArrowDataType::List(Arc::new(ArrowField::new( + "item", + ArrowDataType::Int32, + true, + ))); + let result = arrow_to_paimon_type(&arrow_list2, true).unwrap(); + assert!(matches!(result, PaimonDataType::Array(_))); + } + + #[test] + fn test_map_roundtrip() { + let paimon_map = PaimonDataType::Map(MapType::new( + PaimonDataType::VarChar(VarCharType::new(VarCharType::MAX_LENGTH).unwrap()), + PaimonDataType::Int(IntType::new()), + )); + let arrow_map = paimon_type_to_arrow(&paimon_map).unwrap(); + let back = arrow_to_paimon_type(&arrow_map, true).unwrap(); + assert!(matches!(back, PaimonDataType::Map(_))); + } + + #[test] + fn test_row_roundtrip() { + let row = PaimonDataType::Row(RowType::new(vec![ + DataField::new(0, "a".to_string(), PaimonDataType::Int(IntType::new())), + DataField::new( + 1, + "b".to_string(), + PaimonDataType::VarChar(VarCharType::new(VarCharType::MAX_LENGTH).unwrap()), + ), + ])); + let arrow = paimon_type_to_arrow(&row).unwrap(); + let back = arrow_to_paimon_type(&arrow, true).unwrap(); + assert!(matches!(back, PaimonDataType::Row(_))); + } + + #[test] + fn test_not_nullable() { + let paimon = arrow_to_paimon_type(&ArrowDataType::Int32, false).unwrap(); + assert!(!paimon.is_nullable()); + + let paimon = arrow_to_paimon_type(&ArrowDataType::Int32, true).unwrap(); + assert!(paimon.is_nullable()); + } + + #[test] + fn test_unsupported_arrow_type() { + let result = arrow_to_paimon_type(&ArrowDataType::Duration(TimeUnit::Second), true); + assert!(result.is_err()); + } + + #[test] + fn test_arrow_fields_to_paimon_ids() { + let fields = vec![ + ArrowField::new("x", ArrowDataType::Int32, true), + ArrowField::new("y", ArrowDataType::Utf8, false), + ]; + let paimon_fields = arrow_fields_to_paimon(&fields).unwrap(); + assert_eq!(paimon_fields.len(), 2); + assert_eq!(paimon_fields[0].id(), 0); + assert_eq!(paimon_fields[0].name(), "x"); + assert_eq!(paimon_fields[1].id(), 1); + assert_eq!(paimon_fields[1].name(), "y"); + assert!(!paimon_fields[1].data_type().is_nullable()); + } +} From b5a59ef7cda639bbba4b07839b257c0a0bdef80e Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sun, 12 Apr 2026 09:17:44 +0800 Subject: [PATCH 2/5] Fix comments --- crates/integrations/datafusion/src/ddl.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/crates/integrations/datafusion/src/ddl.rs b/crates/integrations/datafusion/src/ddl.rs index 02a319f4..2dcb6d10 100644 --- a/crates/integrations/datafusion/src/ddl.rs +++ b/crates/integrations/datafusion/src/ddl.rs @@ -100,8 +100,11 @@ impl PaimonDdlHandler { match &statements[0] { Statement::CreateTable(create_table) => self.handle_create_table(create_table).await, Statement::AlterTable { - name, operations, .. - } => self.handle_alter_table(name, operations).await, + name, + operations, + if_exists, + .. + } => self.handle_alter_table(name, operations, *if_exists).await, _ => self.ctx.sql(sql).await, } } @@ -174,6 +177,7 @@ impl PaimonDdlHandler { &self, name: &ObjectName, operations: &[AlterTableOperation], + if_exists: bool, ) -> DFResult { let identifier = self.resolve_table_name(name)?; @@ -222,14 +226,14 @@ impl PaimonDdlHandler { if let Some(new_identifier) = rename_to { self.catalog - .rename_table(&identifier, &new_identifier, false) + .rename_table(&identifier, &new_identifier, if_exists) .await .map_err(to_datafusion_error)?; } if !changes.is_empty() { self.catalog - .alter_table(&identifier, changes, false) + .alter_table(&identifier, changes, if_exists) .await .map_err(to_datafusion_error)?; } From cd1780ffb5c9c337e62abfe0e0c9b3dd29e6e996 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sun, 12 Apr 2026 09:41:00 +0800 Subject: [PATCH 3/5] Add more tests --- crates/integrations/datafusion/src/ddl.rs | 701 ++++++++++++++++++++++ 1 file changed, 701 insertions(+) diff --git a/crates/integrations/datafusion/src/ddl.rs b/crates/integrations/datafusion/src/ddl.rs index 2dcb6d10..08868b3f 100644 --- a/crates/integrations/datafusion/src/ddl.rs +++ b/crates/integrations/datafusion/src/ddl.rs @@ -389,3 +389,704 @@ fn ok_result(ctx: &SessionContext) -> DFResult { let df = ctx.read_batch(batch)?; Ok(df) } + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + use std::sync::Mutex; + + use async_trait::async_trait; + use datafusion::arrow::datatypes::TimeUnit; + use paimon::catalog::Database; + use paimon::spec::Schema as PaimonSchema; + use paimon::table::Table; + + // ==================== Mock Catalog ==================== + + #[allow(clippy::enum_variant_names)] + #[derive(Debug)] + enum CatalogCall { + CreateTable { + identifier: Identifier, + schema: PaimonSchema, + ignore_if_exists: bool, + }, + AlterTable { + identifier: Identifier, + changes: Vec, + ignore_if_not_exists: bool, + }, + RenameTable { + from: Identifier, + to: Identifier, + ignore_if_not_exists: bool, + }, + } + + struct MockCatalog { + calls: Mutex>, + } + + impl MockCatalog { + fn new() -> Self { + Self { + calls: Mutex::new(Vec::new()), + } + } + + fn take_calls(&self) -> Vec { + std::mem::take(&mut *self.calls.lock().unwrap()) + } + } + + #[async_trait] + impl Catalog for MockCatalog { + async fn list_databases(&self) -> paimon::Result> { + Ok(vec![]) + } + async fn create_database( + &self, + _name: &str, + _ignore_if_exists: bool, + _properties: HashMap, + ) -> paimon::Result<()> { + Ok(()) + } + async fn get_database(&self, _name: &str) -> paimon::Result { + unimplemented!() + } + async fn drop_database( + &self, + _name: &str, + _ignore_if_not_exists: bool, + _cascade: bool, + ) -> paimon::Result<()> { + Ok(()) + } + async fn get_table(&self, _identifier: &Identifier) -> paimon::Result { + unimplemented!() + } + async fn list_tables(&self, _database_name: &str) -> paimon::Result> { + Ok(vec![]) + } + async fn create_table( + &self, + identifier: &Identifier, + creation: PaimonSchema, + ignore_if_exists: bool, + ) -> paimon::Result<()> { + self.calls.lock().unwrap().push(CatalogCall::CreateTable { + identifier: identifier.clone(), + schema: creation, + ignore_if_exists, + }); + Ok(()) + } + async fn drop_table( + &self, + _identifier: &Identifier, + _ignore_if_not_exists: bool, + ) -> paimon::Result<()> { + Ok(()) + } + async fn rename_table( + &self, + from: &Identifier, + to: &Identifier, + ignore_if_not_exists: bool, + ) -> paimon::Result<()> { + self.calls.lock().unwrap().push(CatalogCall::RenameTable { + from: from.clone(), + to: to.clone(), + ignore_if_not_exists, + }); + Ok(()) + } + async fn alter_table( + &self, + identifier: &Identifier, + changes: Vec, + ignore_if_not_exists: bool, + ) -> paimon::Result<()> { + self.calls.lock().unwrap().push(CatalogCall::AlterTable { + identifier: identifier.clone(), + changes, + ignore_if_not_exists, + }); + Ok(()) + } + } + + fn make_handler(catalog: Arc) -> PaimonDdlHandler { + PaimonDdlHandler::new(SessionContext::new(), catalog, "paimon") + } + + // ==================== sql_data_type_to_arrow tests ==================== + + #[test] + fn test_sql_type_boolean() { + use datafusion::sql::sqlparser::ast::DataType as SqlType; + assert_eq!( + sql_data_type_to_arrow(&SqlType::Boolean).unwrap(), + ArrowDataType::Boolean + ); + } + + #[test] + fn test_sql_type_integers() { + use datafusion::sql::sqlparser::ast::DataType as SqlType; + assert_eq!( + sql_data_type_to_arrow(&SqlType::TinyInt(None)).unwrap(), + ArrowDataType::Int8 + ); + assert_eq!( + sql_data_type_to_arrow(&SqlType::SmallInt(None)).unwrap(), + ArrowDataType::Int16 + ); + assert_eq!( + sql_data_type_to_arrow(&SqlType::Int(None)).unwrap(), + ArrowDataType::Int32 + ); + assert_eq!( + sql_data_type_to_arrow(&SqlType::Integer(None)).unwrap(), + ArrowDataType::Int32 + ); + assert_eq!( + sql_data_type_to_arrow(&SqlType::BigInt(None)).unwrap(), + ArrowDataType::Int64 + ); + } + + #[test] + fn test_sql_type_floats() { + use datafusion::sql::sqlparser::ast::{DataType as SqlType, ExactNumberInfo}; + assert_eq!( + sql_data_type_to_arrow(&SqlType::Float(ExactNumberInfo::None)).unwrap(), + ArrowDataType::Float32 + ); + assert_eq!( + sql_data_type_to_arrow(&SqlType::Real).unwrap(), + ArrowDataType::Float32 + ); + assert_eq!( + sql_data_type_to_arrow(&SqlType::DoublePrecision).unwrap(), + ArrowDataType::Float64 + ); + } + + #[test] + fn test_sql_type_string_variants() { + use datafusion::sql::sqlparser::ast::DataType as SqlType; + for sql_type in [SqlType::Varchar(None), SqlType::Text, SqlType::String(None)] { + assert_eq!( + sql_data_type_to_arrow(&sql_type).unwrap(), + ArrowDataType::Utf8, + "failed for {sql_type:?}" + ); + } + } + + #[test] + fn test_sql_type_binary() { + use datafusion::sql::sqlparser::ast::DataType as SqlType; + assert_eq!( + sql_data_type_to_arrow(&SqlType::Bytea).unwrap(), + ArrowDataType::Binary + ); + } + + #[test] + fn test_sql_type_date() { + use datafusion::sql::sqlparser::ast::DataType as SqlType; + assert_eq!( + sql_data_type_to_arrow(&SqlType::Date).unwrap(), + ArrowDataType::Date32 + ); + } + + #[test] + fn test_sql_type_timestamp_default() { + use datafusion::sql::sqlparser::ast::{DataType as SqlType, TimezoneInfo}; + let result = sql_data_type_to_arrow(&SqlType::Timestamp(None, TimezoneInfo::None)).unwrap(); + assert_eq!( + result, + ArrowDataType::Timestamp(TimeUnit::Millisecond, None) + ); + } + + #[test] + fn test_sql_type_timestamp_with_precision() { + use datafusion::sql::sqlparser::ast::{DataType as SqlType, TimezoneInfo}; + // precision 0 => Second + assert_eq!( + sql_data_type_to_arrow(&SqlType::Timestamp(Some(0), TimezoneInfo::None)).unwrap(), + ArrowDataType::Timestamp(TimeUnit::Second, None) + ); + // precision 3 => Millisecond + assert_eq!( + sql_data_type_to_arrow(&SqlType::Timestamp(Some(3), TimezoneInfo::None)).unwrap(), + ArrowDataType::Timestamp(TimeUnit::Millisecond, None) + ); + // precision 6 => Microsecond + assert_eq!( + sql_data_type_to_arrow(&SqlType::Timestamp(Some(6), TimezoneInfo::None)).unwrap(), + ArrowDataType::Timestamp(TimeUnit::Microsecond, None) + ); + // precision 9 => Nanosecond + assert_eq!( + sql_data_type_to_arrow(&SqlType::Timestamp(Some(9), TimezoneInfo::None)).unwrap(), + ArrowDataType::Timestamp(TimeUnit::Nanosecond, None) + ); + } + + #[test] + fn test_sql_type_timestamp_with_tz() { + use datafusion::sql::sqlparser::ast::{DataType as SqlType, TimezoneInfo}; + let result = + sql_data_type_to_arrow(&SqlType::Timestamp(None, TimezoneInfo::WithTimeZone)).unwrap(); + assert_eq!( + result, + ArrowDataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())) + ); + } + + #[test] + fn test_sql_type_decimal() { + use datafusion::sql::sqlparser::ast::{DataType as SqlType, ExactNumberInfo}; + assert_eq!( + sql_data_type_to_arrow(&SqlType::Decimal(ExactNumberInfo::PrecisionAndScale(18, 2))) + .unwrap(), + ArrowDataType::Decimal128(18, 2) + ); + assert_eq!( + sql_data_type_to_arrow(&SqlType::Decimal(ExactNumberInfo::Precision(10))).unwrap(), + ArrowDataType::Decimal128(10, 0) + ); + assert_eq!( + sql_data_type_to_arrow(&SqlType::Decimal(ExactNumberInfo::None)).unwrap(), + ArrowDataType::Decimal128(10, 0) + ); + } + + #[test] + fn test_sql_type_unsupported() { + use datafusion::sql::sqlparser::ast::DataType as SqlType; + assert!(sql_data_type_to_arrow(&SqlType::Regclass).is_err()); + } + + // ==================== resolve_table_name tests ==================== + + #[test] + fn test_resolve_three_part_name() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog); + let dialect = GenericDialect {}; + let stmts = Parser::parse_sql(&dialect, "SELECT * FROM paimon.mydb.mytable").unwrap(); + if let Statement::Query(q) = &stmts[0] { + if let datafusion::sql::sqlparser::ast::SetExpr::Select(sel) = q.body.as_ref() { + if let datafusion::sql::sqlparser::ast::TableFactor::Table { name, .. } = + &sel.from[0].relation + { + let id = handler.resolve_table_name(name).unwrap(); + assert_eq!(id.database(), "mydb"); + assert_eq!(id.object(), "mytable"); + } + } + } + } + + #[test] + fn test_resolve_two_part_name() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog); + let dialect = GenericDialect {}; + let stmts = Parser::parse_sql(&dialect, "SELECT * FROM mydb.mytable").unwrap(); + if let Statement::Query(q) = &stmts[0] { + if let datafusion::sql::sqlparser::ast::SetExpr::Select(sel) = q.body.as_ref() { + if let datafusion::sql::sqlparser::ast::TableFactor::Table { name, .. } = + &sel.from[0].relation + { + let id = handler.resolve_table_name(name).unwrap(); + assert_eq!(id.database(), "mydb"); + assert_eq!(id.object(), "mytable"); + } + } + } + } + + #[test] + fn test_resolve_wrong_catalog_name() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog); + let dialect = GenericDialect {}; + let stmts = Parser::parse_sql(&dialect, "SELECT * FROM other.mydb.mytable").unwrap(); + if let Statement::Query(q) = &stmts[0] { + if let datafusion::sql::sqlparser::ast::SetExpr::Select(sel) = q.body.as_ref() { + if let datafusion::sql::sqlparser::ast::TableFactor::Table { name, .. } = + &sel.from[0].relation + { + let err = handler.resolve_table_name(name).unwrap_err(); + assert!(err.to_string().contains("Unknown catalog")); + } + } + } + } + + #[test] + fn test_resolve_single_part_name_error() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog); + let dialect = GenericDialect {}; + let stmts = Parser::parse_sql(&dialect, "SELECT * FROM mytable").unwrap(); + if let Statement::Query(q) = &stmts[0] { + if let datafusion::sql::sqlparser::ast::SetExpr::Select(sel) = q.body.as_ref() { + if let datafusion::sql::sqlparser::ast::TableFactor::Table { name, .. } = + &sel.from[0].relation + { + let err = handler.resolve_table_name(name).unwrap_err(); + assert!(err.to_string().contains("at least database.table")); + } + } + } + } + + // ==================== extract_options tests ==================== + + #[test] + fn test_extract_options_none() { + let opts = extract_options(&CreateTableOptions::None).unwrap(); + assert!(opts.is_empty()); + } + + #[test] + fn test_extract_options_with_kv() { + // Parse a CREATE TABLE with WITH options to get a real CreateTableOptions + let dialect = GenericDialect {}; + let stmts = + Parser::parse_sql(&dialect, "CREATE TABLE t (id INT) WITH ('bucket' = '4')").unwrap(); + if let Statement::CreateTable(ct) = &stmts[0] { + let opts = extract_options(&ct.table_options).unwrap(); + assert_eq!(opts.len(), 1); + assert_eq!(opts[0].0, "bucket"); + assert_eq!(opts[0].1, "4"); + } else { + panic!("expected CreateTable"); + } + } + + // ==================== PaimonDdlHandler::sql integration tests ==================== + + #[tokio::test] + async fn test_create_table_basic() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog.clone()); + + handler + .sql("CREATE TABLE mydb.t1 (id INT NOT NULL, name VARCHAR, PRIMARY KEY (id))") + .await + .unwrap(); + + let calls = catalog.take_calls(); + assert_eq!(calls.len(), 1); + if let CatalogCall::CreateTable { + identifier, + schema, + ignore_if_exists, + } = &calls[0] + { + assert_eq!(identifier.database(), "mydb"); + assert_eq!(identifier.object(), "t1"); + assert!(!ignore_if_exists); + assert_eq!(schema.primary_keys(), &["id"]); + } else { + panic!("expected CreateTable call"); + } + } + + #[tokio::test] + async fn test_create_table_if_not_exists() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog.clone()); + + handler + .sql("CREATE TABLE IF NOT EXISTS mydb.t1 (id INT)") + .await + .unwrap(); + + let calls = catalog.take_calls(); + assert_eq!(calls.len(), 1); + if let CatalogCall::CreateTable { + ignore_if_exists, .. + } = &calls[0] + { + assert!(ignore_if_exists); + } else { + panic!("expected CreateTable call"); + } + } + + #[tokio::test] + async fn test_create_table_with_options() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog.clone()); + + handler + .sql("CREATE TABLE mydb.t1 (id INT) WITH ('bucket' = '4', 'file.format' = 'parquet')") + .await + .unwrap(); + + let calls = catalog.take_calls(); + assert_eq!(calls.len(), 1); + if let CatalogCall::CreateTable { schema, .. } = &calls[0] { + let opts = schema.options(); + assert_eq!(opts.get("bucket").unwrap(), "4"); + assert_eq!(opts.get("file.format").unwrap(), "parquet"); + } else { + panic!("expected CreateTable call"); + } + } + + #[tokio::test] + async fn test_create_table_three_part_name() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog.clone()); + + handler + .sql("CREATE TABLE paimon.mydb.t1 (id INT)") + .await + .unwrap(); + + let calls = catalog.take_calls(); + if let CatalogCall::CreateTable { identifier, .. } = &calls[0] { + assert_eq!(identifier.database(), "mydb"); + assert_eq!(identifier.object(), "t1"); + } else { + panic!("expected CreateTable call"); + } + } + + #[tokio::test] + async fn test_alter_table_add_column() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog.clone()); + + handler + .sql("ALTER TABLE mydb.t1 ADD COLUMN age INT") + .await + .unwrap(); + + let calls = catalog.take_calls(); + assert_eq!(calls.len(), 1); + if let CatalogCall::AlterTable { + identifier, + changes, + .. + } = &calls[0] + { + assert_eq!(identifier.database(), "mydb"); + assert_eq!(identifier.object(), "t1"); + assert_eq!(changes.len(), 1); + assert!( + matches!(&changes[0], SchemaChange::AddColumn { field_name, .. } if field_name == "age") + ); + } else { + panic!("expected AlterTable call"); + } + } + + #[tokio::test] + async fn test_alter_table_drop_column() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog.clone()); + + handler + .sql("ALTER TABLE mydb.t1 DROP COLUMN age") + .await + .unwrap(); + + let calls = catalog.take_calls(); + assert_eq!(calls.len(), 1); + if let CatalogCall::AlterTable { changes, .. } = &calls[0] { + assert_eq!(changes.len(), 1); + assert!( + matches!(&changes[0], SchemaChange::DropColumn { field_name } if field_name == "age") + ); + } else { + panic!("expected AlterTable call"); + } + } + + #[tokio::test] + async fn test_alter_table_rename_column() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog.clone()); + + handler + .sql("ALTER TABLE mydb.t1 RENAME COLUMN old_name TO new_name") + .await + .unwrap(); + + let calls = catalog.take_calls(); + assert_eq!(calls.len(), 1); + if let CatalogCall::AlterTable { changes, .. } = &calls[0] { + assert_eq!(changes.len(), 1); + assert!(matches!( + &changes[0], + SchemaChange::RenameColumn { field_name, new_name } + if field_name == "old_name" && new_name == "new_name" + )); + } else { + panic!("expected AlterTable call"); + } + } + + #[tokio::test] + async fn test_alter_table_rename_table() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog.clone()); + + handler + .sql("ALTER TABLE mydb.t1 RENAME TO t2") + .await + .unwrap(); + + let calls = catalog.take_calls(); + assert_eq!(calls.len(), 1); + if let CatalogCall::RenameTable { from, to, .. } = &calls[0] { + assert_eq!(from.database(), "mydb"); + assert_eq!(from.object(), "t1"); + assert_eq!(to.database(), "mydb"); + assert_eq!(to.object(), "t2"); + } else { + panic!("expected RenameTable call"); + } + } + + #[tokio::test] + async fn test_alter_table_if_exists_add_column() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog.clone()); + + handler + .sql("ALTER TABLE IF EXISTS mydb.t1 ADD COLUMN age INT") + .await + .unwrap(); + + let calls = catalog.take_calls(); + assert_eq!(calls.len(), 1); + if let CatalogCall::AlterTable { + ignore_if_not_exists, + .. + } = &calls[0] + { + assert!(ignore_if_not_exists); + } else { + panic!("expected AlterTable call"); + } + } + + #[tokio::test] + async fn test_alter_table_without_if_exists() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog.clone()); + + handler + .sql("ALTER TABLE mydb.t1 ADD COLUMN age INT") + .await + .unwrap(); + + let calls = catalog.take_calls(); + if let CatalogCall::AlterTable { + ignore_if_not_exists, + .. + } = &calls[0] + { + assert!(!ignore_if_not_exists); + } else { + panic!("expected AlterTable call"); + } + } + + #[tokio::test] + async fn test_alter_table_if_exists_rename() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog.clone()); + + handler + .sql("ALTER TABLE IF EXISTS mydb.t1 RENAME TO t2") + .await + .unwrap(); + + let calls = catalog.take_calls(); + assert_eq!(calls.len(), 1); + if let CatalogCall::RenameTable { + from, + to, + ignore_if_not_exists, + } = &calls[0] + { + assert!(ignore_if_not_exists); + assert_eq!(from.object(), "t1"); + assert_eq!(to.object(), "t2"); + } else { + panic!("expected RenameTable call"); + } + } + + #[tokio::test] + async fn test_alter_table_rename_three_part_name() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog.clone()); + + handler + .sql("ALTER TABLE paimon.mydb.t1 RENAME TO t2") + .await + .unwrap(); + + let calls = catalog.take_calls(); + assert_eq!(calls.len(), 1); + if let CatalogCall::RenameTable { from, to, .. } = &calls[0] { + assert_eq!(from.database(), "mydb"); + assert_eq!(from.object(), "t1"); + assert_eq!(to.database(), "mydb"); + assert_eq!(to.object(), "t2"); + } else { + panic!("expected RenameTable call"); + } + } + + #[tokio::test] + async fn test_sql_parse_error() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog); + let result = handler.sql("NOT VALID SQL !!!").await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("SQL parse error")); + } + + #[tokio::test] + async fn test_multiple_statements_error() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog); + let result = handler.sql("SELECT 1; SELECT 2").await; + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("exactly one SQL statement")); + } + + #[tokio::test] + async fn test_non_ddl_delegates_to_datafusion() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog.clone()); + // SELECT should be delegated to DataFusion, not intercepted + let df = handler.sql("SELECT 1 AS x").await.unwrap(); + let batches = df.collect().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 1); + // No catalog calls + assert!(catalog.take_calls().is_empty()); + } +} From 9e9106864ebfe2df5bd4f2350b6ed787e7ff1d9e Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sun, 12 Apr 2026 17:13:40 +0800 Subject: [PATCH 4/5] ban external --- crates/integrations/datafusion/src/catalog.rs | 3 +- crates/integrations/datafusion/src/ddl.rs | 19 +++ crates/integrations/datafusion/src/lib.rs | 2 - .../datafusion/src/table_factory.rs | 137 ------------------ .../datafusion/tests/ddl_tests.rs | 12 +- 5 files changed, 24 insertions(+), 149 deletions(-) delete mode 100644 crates/integrations/datafusion/src/table_factory.rs diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index be75ec44..7ac66d43 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -210,8 +210,7 @@ impl SchemaProvider for PaimonSchemaProvider { _name: String, table: Arc, ) -> DFResult>> { - // The table is already created in the Paimon catalog by PaimonTableFactory. - // DataFusion calls register_table after the factory returns, so we just + // DataFusion calls register_table after table creation, so we just // acknowledge it here. Ok(Some(table)) } diff --git a/crates/integrations/datafusion/src/ddl.rs b/crates/integrations/datafusion/src/ddl.rs index 08868b3f..75bcf305 100644 --- a/crates/integrations/datafusion/src/ddl.rs +++ b/crates/integrations/datafusion/src/ddl.rs @@ -110,6 +110,11 @@ impl PaimonDdlHandler { } async fn handle_create_table(&self, ct: &CreateTable) -> DFResult { + if ct.external { + return Err(DataFusionError::Plan( + "CREATE EXTERNAL TABLE is not supported. Use CREATE TABLE instead.".to_string(), + )); + } if ct.location.is_some() { return Err(DataFusionError::Plan( "LOCATION is not supported for Paimon tables. Table path is determined by the catalog warehouse.".to_string(), @@ -1077,6 +1082,20 @@ mod tests { .contains("exactly one SQL statement")); } + #[tokio::test] + async fn test_create_external_table_rejected() { + let catalog = Arc::new(MockCatalog::new()); + let handler = make_handler(catalog); + let result = handler + .sql("CREATE EXTERNAL TABLE mydb.t1 (id INT) STORED AS PARQUET") + .await; + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("CREATE EXTERNAL TABLE is not supported")); + } + #[tokio::test] async fn test_non_ddl_delegates_to_datafusion() { let catalog = Arc::new(MockCatalog::new()); diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index 82755382..e803153e 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -46,7 +46,6 @@ mod physical_plan; mod relation_planner; pub mod runtime; mod table; -mod table_factory; pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider}; pub use ddl::PaimonDdlHandler; @@ -56,4 +55,3 @@ pub use full_text_search::{register_full_text_search, FullTextSearchFunction}; pub use physical_plan::PaimonTableScan; pub use relation_planner::PaimonRelationPlanner; pub use table::PaimonTableProvider; -pub use table_factory::PaimonTableFactory; diff --git a/crates/integrations/datafusion/src/table_factory.rs b/crates/integrations/datafusion/src/table_factory.rs deleted file mode 100644 index d6e12c62..00000000 --- a/crates/integrations/datafusion/src/table_factory.rs +++ /dev/null @@ -1,137 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! [`TableProviderFactory`] implementation for creating Paimon tables via -//! `CREATE EXTERNAL TABLE`. - -use std::fmt::Debug; -use std::sync::Arc; - -use async_trait::async_trait; -use datafusion::catalog::{Session, TableProvider, TableProviderFactory}; -use datafusion::common::TableReference; -use datafusion::error::{DataFusionError, Result as DFResult}; -use datafusion::logical_expr::CreateExternalTable; -use paimon::catalog::{Catalog, Identifier}; -use paimon::spec::Schema; - -use crate::error::to_datafusion_error; -use crate::table::PaimonTableProvider; -use paimon::arrow::arrow_to_paimon_type; - -/// A [`TableProviderFactory`] that creates Paimon tables. -/// -/// Register with: -/// ```ignore -/// ctx.state_mut().table_factories_mut() -/// .insert("PAIMON".to_string(), Arc::new(PaimonTableFactory::new(catalog))); -/// ``` -/// -/// Then use: -/// ```sql -/// CREATE EXTERNAL TABLE paimon.my_db.my_table ( -/// id INT NOT NULL, -/// name STRING, -/// dt STRING, -/// PRIMARY KEY (id, dt) -/// ) PARTITIONED BY (dt) -/// WITH ('bucket' = '2'); -/// ``` -pub struct PaimonTableFactory { - catalog: Arc, -} - -impl Debug for PaimonTableFactory { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PaimonTableFactory").finish() - } -} - -impl PaimonTableFactory { - pub fn new(catalog: Arc) -> Self { - Self { catalog } - } -} - -#[async_trait] -impl TableProviderFactory for PaimonTableFactory { - async fn create( - &self, - _state: &dyn Session, - cmd: &CreateExternalTable, - ) -> DFResult> { - if !cmd.location.is_empty() { - return Err(DataFusionError::Plan( - "LOCATION is not supported for Paimon tables. Table path is determined by the catalog warehouse.".to_string(), - )); - } - - let identifier = resolve_identifier(&cmd.name)?; - - // Build Paimon schema from the CREATE EXTERNAL TABLE command. - let arrow_schema = cmd.schema.as_arrow(); - let mut builder = Schema::builder(); - - for field in arrow_schema.fields() { - let paimon_type = arrow_to_paimon_type(field.data_type(), field.is_nullable()) - .map_err(to_datafusion_error)?; - builder = builder.column(field.name().clone(), paimon_type); - } - - if !cmd.table_partition_cols.is_empty() { - builder = builder.partition_keys(cmd.table_partition_cols.clone()); - } - - // Pass all OPTIONS through to Paimon (includes 'bucket', etc.). - // DataFusion prefixes options with "format." — strip that prefix for Paimon. - for (k, v) in &cmd.options { - let key = k.strip_prefix("format.").unwrap_or(k); - builder = builder.option(key.to_string(), v.clone()); - } - - let schema = builder.build().map_err(to_datafusion_error)?; - - self.catalog - .create_table(&identifier, schema, cmd.if_not_exists) - .await - .map_err(to_datafusion_error)?; - - // Return the newly created table as a provider. - let table = self - .catalog - .get_table(&identifier) - .await - .map_err(to_datafusion_error)?; - let provider = PaimonTableProvider::try_new(table)?; - Ok(Arc::new(provider)) - } -} - -/// Extract a Paimon [`Identifier`] (database, table) from a DataFusion [`TableReference`]. -fn resolve_identifier(name: &TableReference) -> DFResult { - match name { - TableReference::Full { - schema, table, .. - } => Ok(Identifier::new(schema.to_string(), table.to_string())), - TableReference::Partial { schema, table } => { - Ok(Identifier::new(schema.to_string(), table.to_string())) - } - TableReference::Bare { table } => Err(DataFusionError::Plan(format!( - "CREATE EXTERNAL TABLE requires a fully qualified name (catalog.database.table or database.table), got: {table}" - ))), - } -} diff --git a/crates/integrations/datafusion/tests/ddl_tests.rs b/crates/integrations/datafusion/tests/ddl_tests.rs index 207583d6..90211edd 100644 --- a/crates/integrations/datafusion/tests/ddl_tests.rs +++ b/crates/integrations/datafusion/tests/ddl_tests.rs @@ -208,7 +208,7 @@ async fn test_create_table_if_not_exists() { } #[tokio::test] -async fn test_create_table_with_location_rejected() { +async fn test_create_external_table_rejected() { let (_tmp, catalog) = create_test_env(); let handler = create_handler(catalog.clone()); @@ -217,7 +217,6 @@ async fn test_create_table_with_location_rejected() { .await .unwrap(); - // EXTERNAL TABLE with LOCATION is the only way sqlparser populates the location field let result = handler .sql( "CREATE EXTERNAL TABLE paimon.mydb.bad ( @@ -227,14 +226,11 @@ async fn test_create_table_with_location_rejected() { ) .await; - assert!( - result.is_err(), - "LOCATION should be rejected for Paimon tables" - ); + assert!(result.is_err(), "CREATE EXTERNAL TABLE should be rejected"); let err_msg = result.unwrap_err().to_string(); assert!( - err_msg.contains("LOCATION is not supported"), - "Error should mention LOCATION is not supported, got: {err_msg}" + err_msg.contains("CREATE EXTERNAL TABLE is not supported"), + "Error should mention CREATE EXTERNAL TABLE is not supported, got: {err_msg}" ); } From 6456b51985879b3df7acfb3bc3f955a4fea7c358 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sun, 12 Apr 2026 19:32:22 +0800 Subject: [PATCH 5/5] Fix comments --- crates/integrations/datafusion/src/ddl.rs | 129 +++++++++++++++++- .../datafusion/tests/ddl_tests.rs | 99 ++++++++++++++ crates/paimon/src/spec/schema.rs | 68 +++++++-- 3 files changed, 278 insertions(+), 18 deletions(-) diff --git a/crates/integrations/datafusion/src/ddl.rs b/crates/integrations/datafusion/src/ddl.rs index 75bcf305..97eb900c 100644 --- a/crates/integrations/datafusion/src/ddl.rs +++ b/crates/integrations/datafusion/src/ddl.rs @@ -292,11 +292,11 @@ fn column_def_to_add_column(col: &ColumnDef) -> DFResult { )) } -/// Minimal conversion from sqlparser SQL data types to Arrow data types. +/// Convert a sqlparser SQL data type to an Arrow data type. fn sql_data_type_to_arrow( sql_type: &datafusion::sql::sqlparser::ast::DataType, ) -> DFResult { - use datafusion::sql::sqlparser::ast::DataType as SqlType; + use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as SqlType}; match sql_type { SqlType::Boolean => Ok(ArrowDataType::Boolean), SqlType::TinyInt(_) => Ok(ArrowDataType::Int8), @@ -337,8 +337,54 @@ fn sql_data_type_to_arrow( }; Ok(ArrowDataType::Decimal128(p, s)) } + SqlType::Array(elem_def) => { + let elem_type = match elem_def { + ArrayElemTypeDef::AngleBracket(t) + | ArrayElemTypeDef::SquareBracket(t, _) + | ArrayElemTypeDef::Parenthesis(t) => sql_data_type_to_arrow(t)?, + ArrayElemTypeDef::None => { + return Err(DataFusionError::Plan( + "ARRAY type requires an element type".to_string(), + )); + } + }; + Ok(ArrowDataType::List(Arc::new(Field::new( + "element", elem_type, true, + )))) + } + SqlType::Map(key_type, value_type) => { + let key = sql_data_type_to_arrow(key_type)?; + let value = sql_data_type_to_arrow(value_type)?; + let entries = Field::new( + "entries", + ArrowDataType::Struct( + vec![ + Field::new("key", key, false), + Field::new("value", value, true), + ] + .into(), + ), + false, + ); + Ok(ArrowDataType::Map(Arc::new(entries), false)) + } + SqlType::Struct(fields, _) => { + let arrow_fields: Vec = fields + .iter() + .map(|f| { + let name = f + .field_name + .as_ref() + .map(|n| n.value.clone()) + .unwrap_or_default(); + let dt = sql_data_type_to_arrow(&f.field_type)?; + Ok(Field::new(name, dt, true)) + }) + .collect::>()?; + Ok(ArrowDataType::Struct(arrow_fields.into())) + } _ => Err(DataFusionError::Plan(format!( - "Unsupported SQL data type for ALTER TABLE: {sql_type}" + "Unsupported SQL data type: {sql_type}" ))), } } @@ -680,6 +726,83 @@ mod tests { assert!(sql_data_type_to_arrow(&SqlType::Regclass).is_err()); } + #[test] + fn test_sql_type_array() { + use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as SqlType}; + let result = sql_data_type_to_arrow(&SqlType::Array(ArrayElemTypeDef::AngleBracket( + Box::new(SqlType::Int(None)), + ))) + .unwrap(); + assert_eq!( + result, + ArrowDataType::List(Arc::new(Field::new("element", ArrowDataType::Int32, true))) + ); + } + + #[test] + fn test_sql_type_array_no_element() { + use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as SqlType}; + assert!(sql_data_type_to_arrow(&SqlType::Array(ArrayElemTypeDef::None)).is_err()); + } + + #[test] + fn test_sql_type_map() { + use datafusion::sql::sqlparser::ast::DataType as SqlType; + let result = sql_data_type_to_arrow(&SqlType::Map( + Box::new(SqlType::Varchar(None)), + Box::new(SqlType::Int(None)), + )) + .unwrap(); + let expected = ArrowDataType::Map( + Arc::new(Field::new( + "entries", + ArrowDataType::Struct( + vec![ + Field::new("key", ArrowDataType::Utf8, false), + Field::new("value", ArrowDataType::Int32, true), + ] + .into(), + ), + false, + )), + false, + ); + assert_eq!(result, expected); + } + + #[test] + fn test_sql_type_struct() { + use datafusion::sql::sqlparser::ast::{ + DataType as SqlType, Ident, StructBracketKind, StructField, + }; + let result = sql_data_type_to_arrow(&SqlType::Struct( + vec![ + StructField { + field_name: Some(Ident::new("name")), + field_type: SqlType::Varchar(None), + options: None, + }, + StructField { + field_name: Some(Ident::new("age")), + field_type: SqlType::Int(None), + options: None, + }, + ], + StructBracketKind::AngleBrackets, + )) + .unwrap(); + assert_eq!( + result, + ArrowDataType::Struct( + vec![ + Field::new("name", ArrowDataType::Utf8, true), + Field::new("age", ArrowDataType::Int32, true), + ] + .into() + ) + ); + } + // ==================== resolve_table_name tests ==================== #[test] diff --git a/crates/integrations/datafusion/tests/ddl_tests.rs b/crates/integrations/datafusion/tests/ddl_tests.rs index 90211edd..c520f18e 100644 --- a/crates/integrations/datafusion/tests/ddl_tests.rs +++ b/crates/integrations/datafusion/tests/ddl_tests.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use datafusion::catalog::CatalogProvider; use datafusion::prelude::SessionContext; use paimon::catalog::Identifier; +use paimon::spec::{ArrayType, DataType, IntType, MapType, VarCharType}; use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options}; use paimon_datafusion::{PaimonCatalogProvider, PaimonDdlHandler, PaimonRelationPlanner}; use tempfile::TempDir; @@ -234,6 +235,104 @@ async fn test_create_external_table_rejected() { ); } +// ======================= CREATE TABLE with complex types ======================= + +#[tokio::test] +async fn test_create_table_with_array_and_map() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + handler + .sql( + "CREATE TABLE paimon.mydb.complex_types ( + id INT NOT NULL, + tags ARRAY, + props MAP(STRING, INT), + PRIMARY KEY (id) + )", + ) + .await + .expect("CREATE TABLE with ARRAY and MAP should succeed"); + + let table = catalog + .get_table(&Identifier::new("mydb", "complex_types")) + .await + .unwrap(); + let schema = table.schema(); + assert_eq!(schema.fields().len(), 3); + assert_eq!(schema.primary_keys(), &["id"]); + + // Verify ARRAY column + let tags_field = &schema.fields()[1]; + assert_eq!(tags_field.name(), "tags"); + assert_eq!( + *tags_field.data_type(), + DataType::Array(ArrayType::new( + DataType::VarChar(VarCharType::string_type()) + )) + ); + + // Verify MAP(STRING, INT) column + let props_field = &schema.fields()[2]; + assert_eq!(props_field.name(), "props"); + assert_eq!( + *props_field.data_type(), + DataType::Map(MapType::new( + DataType::VarChar(VarCharType::string_type()) + .copy_with_nullable(false) + .unwrap(), + DataType::Int(IntType::new()), + )) + ); +} + +#[tokio::test] +async fn test_create_table_with_row_type() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + handler + .sql( + "CREATE TABLE paimon.mydb.row_table ( + id INT NOT NULL, + address STRUCT, + PRIMARY KEY (id) + )", + ) + .await + .expect("CREATE TABLE with STRUCT should succeed"); + + let table = catalog + .get_table(&Identifier::new("mydb", "row_table")) + .await + .unwrap(); + let schema = table.schema(); + assert_eq!(schema.fields().len(), 2); + + // Verify STRUCT column + let address_field = &schema.fields()[1]; + assert_eq!(address_field.name(), "address"); + if let DataType::Row(row) = address_field.data_type() { + assert_eq!(row.fields().len(), 2); + assert_eq!(row.fields()[0].name(), "city"); + assert!(matches!(row.fields()[0].data_type(), DataType::VarChar(_))); + assert_eq!(row.fields()[1].name(), "zip"); + assert!(matches!(row.fields()[1].data_type(), DataType::Int(_))); + } else { + panic!("expected Row type for address column"); + } +} + // ======================= DROP TABLE ======================= #[tokio::test] diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs index b0baf662..f1dba002 100644 --- a/crates/paimon/src/spec/schema.rs +++ b/crates/paimon/src/spec/schema.rs @@ -16,7 +16,7 @@ // under the License. use crate::spec::core_options::CoreOptions; -use crate::spec::types::{DataType, RowType}; +use crate::spec::types::{ArrayType, DataType, MapType, MultisetType, RowType}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use std::collections::{HashMap, HashSet}; @@ -473,23 +473,16 @@ impl SchemaBuilder { } /// Add a column with optional description. - /// - /// TODO: Support RowType in schema columns with field ID assignment for nested fields. - /// See . pub fn column_with_description( mut self, column_name: impl Into, data_type: DataType, description: Option, ) -> Self { - if data_type.contains_row_type() { - todo!( - "Column type containing RowType is not supported yet: field ID assignment for nested row fields is not implemented. See https://github.com/apache/paimon/pull/1547" - ); - } let name = column_name.into(); let id = self.next_field_id; self.next_field_id += 1; + let data_type = Self::assign_nested_field_ids(data_type, &mut self.next_field_id); self.columns .push(DataField::new(id, name, data_type).with_description(description)); self @@ -535,6 +528,40 @@ impl SchemaBuilder { self.comment, ) } + + /// Recursively assign field IDs to nested fields in complex types. + fn assign_nested_field_ids(data_type: DataType, next_id: &mut i32) -> DataType { + let nullable = data_type.is_nullable(); + match data_type { + DataType::Row(row) => { + let fields = row + .fields() + .iter() + .map(|f| { + let id = *next_id; + *next_id += 1; + let typ = Self::assign_nested_field_ids(f.data_type().clone(), next_id); + DataField::new(id, f.name().to_string(), typ) + }) + .collect(); + DataType::Row(RowType::with_nullable(nullable, fields)) + } + DataType::Array(arr) => { + let element = Self::assign_nested_field_ids(arr.element_type().clone(), next_id); + DataType::Array(ArrayType::with_nullable(nullable, element)) + } + DataType::Map(map) => { + let key = Self::assign_nested_field_ids(map.key_type().clone(), next_id); + let value = Self::assign_nested_field_ids(map.value_type().clone(), next_id); + DataType::Map(MapType::with_nullable(nullable, key, value)) + } + DataType::Multiset(ms) => { + let element = Self::assign_nested_field_ids(ms.element_type().clone(), next_id); + DataType::Multiset(MultisetType::with_nullable(nullable, element)) + } + other => other, + } + } } impl Default for SchemaBuilder { @@ -718,18 +745,29 @@ mod tests { assert_eq!(schema.primary_keys(), &["a", "b"]); } - /// Adding a column whose type is or contains RowType panics (todo! until field ID assignment for nested row fields). - /// See . #[test] - #[should_panic(expected = "RowType")] - fn test_schema_builder_column_row_type_panics() { + fn test_schema_builder_column_row_type() { let row_type = RowType::new(vec![DataField::new( 0, "nested".into(), DataType::Int(IntType::new()), )]); - Schema::builder() + let schema = Schema::builder() .column("id", DataType::Int(IntType::new())) - .column("payload", DataType::Row(row_type)); + .column("payload", DataType::Row(row_type)) + .build() + .unwrap(); + + assert_eq!(schema.fields().len(), 2); + // id gets field_id=0, payload gets field_id=1, nested gets field_id=2 + assert_eq!(schema.fields()[0].id(), 0); + assert_eq!(schema.fields()[1].id(), 1); + if let DataType::Row(row) = schema.fields()[1].data_type() { + assert_eq!(row.fields().len(), 1); + assert_eq!(row.fields()[0].id(), 2); + assert_eq!(row.fields()[0].name(), "nested"); + } else { + panic!("expected Row type"); + } } }