diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index 626a47f3..2289083a 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -29,6 +29,7 @@ use paimon::catalog::{Catalog, Identifier}; use crate::error::to_datafusion_error; use crate::runtime::{await_with_runtime, block_on_with_runtime}; +use crate::system_tables; use crate::table::PaimonTableProvider; /// Provides an interface to manage and access multiple schemas (databases) @@ -130,8 +131,19 @@ impl SchemaProvider for PaimonSchemaProvider { } async fn table(&self, name: &str) -> DFResult>> { + let (base, system_name) = system_tables::split_object_name(name); + if let Some(system_name) = system_name { + return await_with_runtime(system_tables::load( + Arc::clone(&self.catalog), + self.database.clone(), + base.to_string(), + system_name.to_string(), + )) + .await; + } + let catalog = Arc::clone(&self.catalog); - let identifier = Identifier::new(self.database.clone(), name); + let identifier = Identifier::new(self.database.clone(), base); await_with_runtime(async move { match catalog.get_table(&identifier).await { Ok(table) => { @@ -146,8 +158,15 @@ impl SchemaProvider for PaimonSchemaProvider { } fn table_exist(&self, name: &str) -> bool { + let (base, system_name) = system_tables::split_object_name(name); + if let Some(system_name) = system_name { + if !system_tables::is_registered(system_name) { + return false; + } + } + let catalog = Arc::clone(&self.catalog); - let identifier = Identifier::new(self.database.clone(), name); + let identifier = Identifier::new(self.database.clone(), base.to_string()); block_on_with_runtime( async move { match catalog.get_table(&identifier).await { diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index abcf7448..2af7a2e9 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -44,6 +44,7 @@ mod full_text_search; mod physical_plan; mod relation_planner; pub mod runtime; +mod system_tables; mod table; pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider}; diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs b/crates/integrations/datafusion/src/system_tables/mod.rs new file mode 100644 index 00000000..fddb3b9c --- /dev/null +++ b/crates/integrations/datafusion/src/system_tables/mod.rs @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Paimon system tables (`$`) as DataFusion table providers. +//! +//! Mirrors Java [SystemTableLoader](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java): +//! `TABLES` maps each system-table name to its builder function. + +use std::sync::Arc; + +use datafusion::datasource::TableProvider; +use datafusion::error::{DataFusionError, Result as DFResult}; +use paimon::catalog::{Catalog, Identifier, SYSTEM_BRANCH_PREFIX, SYSTEM_TABLE_SPLITTER}; +use paimon::table::Table; + +use crate::error::to_datafusion_error; + +mod options; + +type Builder = fn(Table) -> DFResult>; + +const TABLES: &[(&str, Builder)] = &[("options", options::build)]; + +/// Parse a Paimon object name into `(base_table, optional system_table_name)`. +/// +/// Mirrors Java [Identifier.splitObjectName](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/catalog/Identifier.java). +/// +/// - `t` → `("t", None)` +/// - `t$options` → `("t", Some("options"))` +/// - `t$branch_main` → `("t", None)` (branch reference, not a system table) +/// - `t$branch_main$options` → `("t", Some("options"))` (branch + system table) +pub(crate) fn split_object_name(name: &str) -> (&str, Option<&str>) { + let mut parts = name.splitn(3, SYSTEM_TABLE_SPLITTER); + let base = parts.next().unwrap_or(name); + match (parts.next(), parts.next()) { + (None, _) => (base, None), + (Some(second), None) => { + if second.starts_with(SYSTEM_BRANCH_PREFIX) { + (base, None) + } else { + (base, Some(second)) + } + } + (Some(second), Some(third)) => { + if second.starts_with(SYSTEM_BRANCH_PREFIX) { + (base, Some(third)) + } else { + // `$` is legal in table names, so `t$foo$bar` falls through as + // plain `t` and errors later as "table not found". + (base, None) + } + } + } +} + +/// Returns true if `name` is a recognised Paimon system table suffix. +pub(crate) fn is_registered(name: &str) -> bool { + TABLES.iter().any(|(n, _)| name.eq_ignore_ascii_case(n)) +} + +/// Wraps an already-loaded base table as the system table `name`. +fn wrap_to_system_table(name: &str, base_table: Table) -> Option>> { + TABLES + .iter() + .find(|(n, _)| name.eq_ignore_ascii_case(n)) + .map(|(_, build)| build(base_table)) +} + +/// Loads `$` from the catalog and wraps it as a system +/// table provider. +/// +/// - Unknown `system_name` → `Ok(None)` (DataFusion reports "table not found") +/// - Base table missing → `Err(Plan)` so users can distinguish it from an +/// unknown system name +pub(crate) async fn load( + catalog: Arc, + database: String, + base: String, + system_name: String, +) -> DFResult>> { + if !is_registered(&system_name) { + return Ok(None); + } + let identifier = Identifier::new(database, base.clone()); + match catalog.get_table(&identifier).await { + Ok(table) => wrap_to_system_table(&system_name, table) + .expect("is_registered guarantees a builder") + .map(Some), + Err(paimon::Error::TableNotExist { .. }) => Err(DataFusionError::Plan(format!( + "Cannot read system table `${system_name}`: \ + base table `{base}` does not exist" + ))), + Err(e) => Err(to_datafusion_error(e)), + } +} + +#[cfg(test)] +mod tests { + use super::{is_registered, split_object_name}; + + #[test] + fn is_registered_is_case_insensitive() { + assert!(is_registered("options")); + assert!(is_registered("Options")); + assert!(is_registered("OPTIONS")); + assert!(!is_registered("nonsense")); + } + + #[test] + fn plain_table_name() { + assert_eq!(split_object_name("orders"), ("orders", None)); + } + + #[test] + fn system_table_only() { + assert_eq!( + split_object_name("orders$options"), + ("orders", Some("options")) + ); + } + + #[test] + fn branch_reference_is_not_a_system_table() { + assert_eq!(split_object_name("orders$branch_main"), ("orders", None)); + } + + #[test] + fn branch_plus_system_table() { + assert_eq!( + split_object_name("orders$branch_main$options"), + ("orders", Some("options")) + ); + } + + #[test] + fn three_parts_without_branch_prefix_is_not_a_system_table() { + assert_eq!(split_object_name("orders$foo$bar"), ("orders", None)); + } + + #[test] + fn system_table_name_preserves_case() { + assert_eq!( + split_object_name("orders$Options"), + ("orders", Some("Options")) + ); + } +} diff --git a/crates/integrations/datafusion/src/system_tables/options.rs b/crates/integrations/datafusion/src/system_tables/options.rs new file mode 100644 index 00000000..fe86c92c --- /dev/null +++ b/crates/integrations/datafusion/src/system_tables/options.rs @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Mirrors Java [OptionsTable](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java). + +use std::any::Any; +use std::sync::{Arc, OnceLock}; + +use async_trait::async_trait; +use datafusion::arrow::array::{RecordBatch, StringArray}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::catalog::Session; +use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result as DFResult; +use datafusion::logical_expr::Expr; +use datafusion::physical_plan::ExecutionPlan; +use paimon::table::Table; + +pub(super) fn build(table: Table) -> DFResult> { + Ok(Arc::new(OptionsTable { table })) +} + +fn options_schema() -> SchemaRef { + static SCHEMA: OnceLock = OnceLock::new(); + SCHEMA + .get_or_init(|| { + Arc::new(Schema::new(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, false), + ])) + }) + .clone() +} + +#[derive(Debug)] +struct OptionsTable { + table: Table, +} + +#[async_trait] +impl TableProvider for OptionsTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + options_schema() + } + + fn table_type(&self) -> TableType { + TableType::View + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DFResult> { + // Java uses LinkedHashMap insertion order; HashMap has none — sort for stable output. + let mut entries: Vec<(&String, &String)> = self.table.schema().options().iter().collect(); + entries.sort_by(|a, b| a.0.cmp(b.0)); + + let keys = StringArray::from_iter_values(entries.iter().map(|(k, _)| k.as_str())); + let values = StringArray::from_iter_values(entries.iter().map(|(_, v)| v.as_str())); + + let schema = options_schema(); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(keys), Arc::new(values)])?; + + Ok(MemorySourceConfig::try_new_exec( + &[vec![batch]], + schema, + projection.cloned(), + )?) + } +} diff --git a/crates/integrations/datafusion/tests/system_tables.rs b/crates/integrations/datafusion/tests/system_tables.rs new file mode 100644 index 00000000..c3292e39 --- /dev/null +++ b/crates/integrations/datafusion/tests/system_tables.rs @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Paimon `$options` system table end-to-end via DataFusion SQL. + +use std::sync::Arc; + +use datafusion::arrow::array::{Array, StringArray}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::prelude::SessionContext; +use paimon::catalog::Identifier; +use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options}; +use paimon_datafusion::PaimonCatalogProvider; + +const FIXTURE_TABLE: &str = "test_tantivy_fulltext"; + +fn extract_test_warehouse() -> (tempfile::TempDir, String) { + let archive_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .join("testdata/test_tantivy_fulltext.tar.gz"); + let file = std::fs::File::open(&archive_path) + .unwrap_or_else(|e| panic!("Failed to open {}: {e}", archive_path.display())); + let decoder = flate2::read::GzDecoder::new(file); + let mut archive = tar::Archive::new(decoder); + + let tmp = tempfile::tempdir().expect("Failed to create temp dir"); + let db_dir = tmp.path().join("default.db"); + std::fs::create_dir_all(&db_dir).unwrap(); + archive.unpack(&db_dir).unwrap(); + + let warehouse = format!("file://{}", tmp.path().display()); + (tmp, warehouse) +} + +async fn create_context() -> (SessionContext, Arc, tempfile::TempDir) { + let (tmp, warehouse) = extract_test_warehouse(); + let mut options = Options::new(); + options.set(CatalogOptions::WAREHOUSE, warehouse); + let catalog = FileSystemCatalog::new(options).expect("Failed to create catalog"); + let catalog: Arc = Arc::new(catalog); + + let ctx = SessionContext::new(); + ctx.register_catalog( + "paimon", + Arc::new(PaimonCatalogProvider::new(Arc::clone(&catalog))), + ); + (ctx, catalog, tmp) +} + +async fn run_sql(ctx: &SessionContext, sql: &str) -> Vec { + ctx.sql(sql) + .await + .unwrap_or_else(|e| panic!("Failed to plan `{sql}`: {e}")) + .collect() + .await + .unwrap_or_else(|e| panic!("Failed to execute `{sql}`: {e}")) +} + +#[tokio::test] +async fn test_options_system_table() { + let (ctx, catalog, _tmp) = create_context().await; + let sql = format!("SELECT key, value FROM paimon.default.{FIXTURE_TABLE}$options"); + let batches = run_sql(&ctx, &sql).await; + + assert!(!batches.is_empty(), "$options should return ≥1 batch"); + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), "key"); + assert_eq!(schema.field(1).name(), "value"); + + let mut actual: Vec<(String, String)> = Vec::new(); + for batch in &batches { + let keys = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("key column is Utf8"); + let values = batch + .column(1) + .as_any() + .downcast_ref::() + .expect("value column is Utf8"); + for i in 0..batch.num_rows() { + actual.push((keys.value(i).to_string(), values.value(i).to_string())); + } + } + actual.sort(); + + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); + let table = catalog + .get_table(&identifier) + .await + .expect("fixture table should load"); + let mut expected: Vec<(String, String)> = table + .schema() + .options() + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + expected.sort(); + + assert_eq!(actual, expected, "$options rows should match table options"); +} + +#[tokio::test] +async fn test_unknown_system_table_name_returns_not_found() { + let (ctx, _catalog, _tmp) = create_context().await; + let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$nonsense"); + let err = ctx + .sql(&sql) + .await + .expect_err("unknown system table should not resolve"); + let msg = err.to_string(); + assert!( + msg.contains("nonsense") || msg.to_lowercase().contains("not found"), + "unexpected error for unknown system table: {msg}" + ); +} + +#[tokio::test] +async fn test_missing_base_table_for_system_table_errors() { + let (ctx, _catalog, _tmp) = create_context().await; + let sql = "SELECT * FROM paimon.default.does_not_exist$options"; + let err = ctx + .sql(sql) + .await + .expect_err("missing base table should error"); + let msg = err.to_string(); + assert!( + msg.contains("does_not_exist") && msg.contains("$options"), + "expected error to mention both base table and system name, got: {msg}" + ); +}