From 9f99d2acb528586f3df5f48c6083b54236c5b650 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sat, 11 Apr 2026 10:42:10 +0800 Subject: [PATCH 1/4] feat(write): add write pipeline with DataFusion INSERT INTO/OVERWRITE support Add TableWrite for writing Arrow RecordBatches to Paimon append-only tables. Each (partition, bucket) pair gets its own DataFileWriter with direct writes (matching delta-rs DeltaWriter pattern). File rolling uses tokio::spawn for background close, and prepare_commit uses try_join_all for parallel finalization across partition writers. Key components: - TableWrite: routes batches by partition/bucket, holds DataFileWriters - DataFileWriter: manages parquet file lifecycle with rolling support - WriteBuilder: creates TableWrite and TableCommit instances - PaimonDataSink: DataFusion DataSink integration for INSERT/OVERWRITE - FormatFileWriter: extended with flush() and in_progress_size() Configurable options via CoreOptions: - file.compression (default: zstd) - target-file-size (default: 256MB) - write.parquet-buffer-size (default: 256MB) Includes E2E integration tests for unpartitioned, partitioned, fixed-bucket, multi-commit, column projection, and bucket filtering. --- Cargo.toml | 2 + crates/integration_tests/Cargo.toml | 1 + .../integration_tests/tests/append_tables.rs | 614 ++++++++++++++ crates/integrations/datafusion/src/lib.rs | 2 +- .../datafusion/src/physical_plan/mod.rs | 2 + .../datafusion/src/physical_plan/sink.rs | 109 +++ .../integrations/datafusion/src/table/mod.rs | 271 +++++- crates/paimon/Cargo.toml | 4 +- crates/paimon/src/arrow/format/mod.rs | 47 +- crates/paimon/src/arrow/format/parquet.rs | 173 +++- crates/paimon/src/io/file_io.rs | 20 +- crates/paimon/src/lib.rs | 3 +- crates/paimon/src/spec/binary_row.rs | 168 ++++ crates/paimon/src/spec/core_options.rs | 61 +- crates/paimon/src/spec/partition_utils.rs | 1 + crates/paimon/src/spec/schema.rs | 20 + crates/paimon/src/table/mod.rs | 2 + crates/paimon/src/table/table_commit.rs | 120 ++- crates/paimon/src/table/table_write.rs | 783 ++++++++++++++++++ crates/paimon/src/table/write_builder.rs | 24 +- 20 files changed, 2357 insertions(+), 70 deletions(-) create mode 100644 crates/integration_tests/tests/append_tables.rs create mode 100644 crates/integrations/datafusion/src/physical_plan/sink.rs create mode 100644 crates/paimon/src/table/table_write.rs diff --git a/Cargo.toml b/Cargo.toml index 5677a5eb..83fa44b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,9 @@ arrow-buffer = "57.0" arrow-schema = "57.0" arrow-cast = "57.0" arrow-ord = "57.0" +arrow-select = "57.0" datafusion = "52.3.0" datafusion-ffi = "52.3.0" parquet = "57.0" tokio = "1.39.2" +tokio-util = "0.7" diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml index 092ad949..7c60a532 100644 --- a/crates/integration_tests/Cargo.toml +++ b/crates/integration_tests/Cargo.toml @@ -26,6 +26,7 @@ homepage.workspace = true [dependencies] paimon = { path = "../paimon" } arrow-array = { workspace = true } +arrow-schema = { workspace = true } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } futures = "0.3" diff --git a/crates/integration_tests/tests/append_tables.rs b/crates/integration_tests/tests/append_tables.rs new file mode 100644 index 00000000..b2185de9 --- /dev/null +++ b/crates/integration_tests/tests/append_tables.rs @@ -0,0 +1,614 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! E2E integration tests for append-only (no primary key) tables. +//! +//! Covers: unpartitioned, partitioned, bucket=-1, fixed bucket, +//! multiple commits, column projection, and bucket predicate filtering. + +use arrow_array::{Array, Int32Array, RecordBatch, StringArray}; +use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; +use futures::TryStreamExt; +use paimon::catalog::Identifier; +use paimon::io::FileIOBuilder; +use paimon::spec::{DataType, IntType, Schema, TableSchema, VarCharType}; +use paimon::table::Table; +use std::sync::Arc; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn memory_file_io() -> paimon::io::FileIO { + FileIOBuilder::new("memory").build().unwrap() +} + +async fn setup_dirs(file_io: &paimon::io::FileIO, table_path: &str) { + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + file_io + .mkdirs(&format!("{table_path}/manifest/")) + .await + .unwrap(); +} + +fn make_table(file_io: &paimon::io::FileIO, table_path: &str, schema: TableSchema) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test"), + table_path.to_string(), + schema, + None, + ) +} + +fn int_batch(ids: Vec, values: Vec) -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", ArrowDataType::Int32, false), + ArrowField::new("value", ArrowDataType::Int32, false), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(ids)), + Arc::new(Int32Array::from(values)), + ], + ) + .unwrap() +} + +fn partitioned_batch(pts: Vec<&str>, ids: Vec) -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("pt", ArrowDataType::Utf8, false), + ArrowField::new("id", ArrowDataType::Int32, false), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(pts)), + Arc::new(Int32Array::from(ids)), + ], + ) + .unwrap() +} + +fn collect_int_col(batches: &[RecordBatch], col: &str) -> Vec { + let mut vals: Vec = batches + .iter() + .flat_map(|b| { + let idx = b.schema().index_of(col).unwrap(); + b.column(idx) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec() + }) + .collect(); + vals.sort(); + vals +} + +fn collect_string_col(batches: &[RecordBatch], col: &str) -> Vec { + let mut vals: Vec = batches + .iter() + .flat_map(|b| { + let idx = b.schema().index_of(col).unwrap(); + let arr = b + .column(idx) + .as_any() + .downcast_ref::() + .unwrap(); + (0..arr.len()) + .map(|i| arr.value(i).to_string()) + .collect::>() + }) + .collect(); + vals.sort(); + vals +} + +/// Write batches → commit → scan → read, return all batches. +async fn write_commit_read(table: &Table, batches: Vec) -> Vec { + let wb = table.new_write_builder(); + let mut tw = wb.new_write().unwrap(); + for batch in &batches { + tw.write_arrow_batch(batch).await.unwrap(); + } + wb.new_commit() + .commit(tw.prepare_commit().await.unwrap()) + .await + .unwrap(); + + let rb = table.new_read_builder(); + let plan = rb.new_scan().plan().await.unwrap(); + let read = rb.new_read().unwrap(); + read.to_arrow(plan.splits()) + .unwrap() + .try_collect() + .await + .unwrap() +} + +// --------------------------------------------------------------------------- +// Unpartitioned, bucket = -1 (default) +// --------------------------------------------------------------------------- + +fn unpartitioned_schema() -> TableSchema { + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .build() + .unwrap(); + TableSchema::new(0, &schema) +} + +#[tokio::test] +async fn test_unpartitioned_single_batch() { + let file_io = memory_file_io(); + let path = "memory:/append_unpart_single"; + setup_dirs(&file_io, path).await; + let table = make_table(&file_io, path, unpartitioned_schema()); + + let result = write_commit_read(&table, vec![int_batch(vec![1, 2, 3], vec![10, 20, 30])]).await; + + assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3]); + assert_eq!(collect_int_col(&result, "value"), vec![10, 20, 30]); +} + +#[tokio::test] +async fn test_unpartitioned_multiple_batches() { + let file_io = memory_file_io(); + let path = "memory:/append_unpart_multi"; + setup_dirs(&file_io, path).await; + let table = make_table(&file_io, path, unpartitioned_schema()); + + let result = write_commit_read( + &table, + vec![ + int_batch(vec![1, 2], vec![10, 20]), + int_batch(vec![3, 4, 5], vec![30, 40, 50]), + ], + ) + .await; + + assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4, 5]); +} + +#[tokio::test] +async fn test_unpartitioned_two_commits() { + let file_io = memory_file_io(); + let path = "memory:/append_unpart_two_commits"; + setup_dirs(&file_io, path).await; + let table = make_table(&file_io, path, unpartitioned_schema()); + + // First commit + let wb = table.new_write_builder(); + let mut tw = wb.new_write().unwrap(); + tw.write_arrow_batch(&int_batch(vec![1, 2], vec![10, 20])) + .await + .unwrap(); + wb.new_commit() + .commit(tw.prepare_commit().await.unwrap()) + .await + .unwrap(); + + // Second commit + let mut tw2 = wb.new_write().unwrap(); + tw2.write_arrow_batch(&int_batch(vec![3, 4], vec![30, 40])) + .await + .unwrap(); + wb.new_commit() + .commit(tw2.prepare_commit().await.unwrap()) + .await + .unwrap(); + + // Read all + let rb = table.new_read_builder(); + let plan = rb.new_scan().plan().await.unwrap(); + let read = rb.new_read().unwrap(); + let result: Vec = read + .to_arrow(plan.splits()) + .unwrap() + .try_collect() + .await + .unwrap(); + + assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4]); +} + +#[tokio::test] +async fn test_unpartitioned_projection() { + let file_io = memory_file_io(); + let path = "memory:/append_unpart_proj"; + setup_dirs(&file_io, path).await; + let table = make_table(&file_io, path, unpartitioned_schema()); + + // Write + let wb = table.new_write_builder(); + let mut tw = wb.new_write().unwrap(); + tw.write_arrow_batch(&int_batch(vec![1, 2, 3], vec![10, 20, 30])) + .await + .unwrap(); + wb.new_commit() + .commit(tw.prepare_commit().await.unwrap()) + .await + .unwrap(); + + // Read with projection + let mut rb = table.new_read_builder(); + rb.with_projection(&["value"]); + let plan = rb.new_scan().plan().await.unwrap(); + let read = rb.new_read().unwrap(); + let result: Vec = read + .to_arrow(plan.splits()) + .unwrap() + .try_collect() + .await + .unwrap(); + + assert_eq!(result[0].schema().fields().len(), 1); + assert_eq!(result[0].schema().field(0).name(), "value"); + assert_eq!(collect_int_col(&result, "value"), vec![10, 20, 30]); +} + +// --------------------------------------------------------------------------- +// Unpartitioned, fixed bucket +// --------------------------------------------------------------------------- + +fn fixed_bucket_schema(buckets: i32) -> TableSchema { + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .option("bucket", buckets.to_string()) + .option("bucket-key", "id") + .build() + .unwrap(); + TableSchema::new(0, &schema) +} + +#[tokio::test] +async fn test_fixed_bucket_write_read() { + let file_io = memory_file_io(); + let path = "memory:/append_fixed_bucket"; + setup_dirs(&file_io, path).await; + let table = make_table(&file_io, path, fixed_bucket_schema(4)); + + let result = write_commit_read( + &table, + vec![int_batch( + vec![1, 2, 3, 4, 5, 6, 7, 8], + vec![10, 20, 30, 40, 50, 60, 70, 80], + )], + ) + .await; + + assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4, 5, 6, 7, 8]); +} + +#[tokio::test] +async fn test_fixed_bucket_scan_filters_by_bucket() { + use paimon::spec::{Datum, PredicateBuilder}; + + let file_io = memory_file_io(); + let path = "memory:/append_bucket_filter"; + setup_dirs(&file_io, path).await; + let table = make_table(&file_io, path, fixed_bucket_schema(4)); + + // Write enough data to spread across buckets + let wb = table.new_write_builder(); + let mut tw = wb.new_write().unwrap(); + tw.write_arrow_batch(&int_batch( + vec![1, 2, 3, 4, 5, 6, 7, 8], + vec![10, 20, 30, 40, 50, 60, 70, 80], + )) + .await + .unwrap(); + wb.new_commit() + .commit(tw.prepare_commit().await.unwrap()) + .await + .unwrap(); + + // Full scan — should have multiple buckets + let full_rb = table.new_read_builder(); + let full_plan = full_rb.new_scan().plan().await.unwrap(); + let all_buckets: std::collections::HashSet = + full_plan.splits().iter().map(|s| s.bucket()).collect(); + + if all_buckets.len() <= 1 { + // All rows hashed to same bucket — can't test filtering + return; + } + + // Filter by id = 1 — should narrow to one bucket + let pb = PredicateBuilder::new(table.schema().fields()); + let filter = pb.equal("id", Datum::Int(1)).unwrap(); + + let mut rb = table.new_read_builder(); + rb.with_filter(filter); + let plan = rb.new_scan().plan().await.unwrap(); + let filtered_buckets: std::collections::HashSet = + plan.splits().iter().map(|s| s.bucket()).collect(); + + assert_eq!( + filtered_buckets.len(), + 1, + "Bucket predicate should narrow to one bucket, got: {filtered_buckets:?}" + ); + assert!(filtered_buckets.is_subset(&all_buckets)); + + // Read and verify id=1 is in the result + let read = rb.new_read().unwrap(); + let result: Vec = read + .to_arrow(plan.splits()) + .unwrap() + .try_collect() + .await + .unwrap(); + let ids = collect_int_col(&result, "id"); + assert!(ids.contains(&1)); +} + +// --------------------------------------------------------------------------- +// Partitioned, bucket = -1 +// --------------------------------------------------------------------------- + +fn partitioned_schema() -> TableSchema { + let schema = Schema::builder() + .column("pt", DataType::VarChar(VarCharType::string_type())) + .column("id", DataType::Int(IntType::new())) + .partition_keys(["pt"]) + .build() + .unwrap(); + TableSchema::new(0, &schema) +} + +#[tokio::test] +async fn test_partitioned_write_read() { + let file_io = memory_file_io(); + let path = "memory:/append_partitioned"; + setup_dirs(&file_io, path).await; + let table = make_table(&file_io, path, partitioned_schema()); + + let result = write_commit_read( + &table, + vec![partitioned_batch( + vec!["a", "b", "a", "b"], + vec![1, 2, 3, 4], + )], + ) + .await; + + let total: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total, 4); + assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4]); + assert_eq!(collect_string_col(&result, "pt"), vec!["a", "a", "b", "b"]); +} + +#[tokio::test] +async fn test_partitioned_two_commits() { + let file_io = memory_file_io(); + let path = "memory:/append_part_two_commits"; + setup_dirs(&file_io, path).await; + let table = make_table(&file_io, path, partitioned_schema()); + + let wb = table.new_write_builder(); + + // First commit: partition "a" + let mut tw1 = wb.new_write().unwrap(); + tw1.write_arrow_batch(&partitioned_batch(vec!["a", "a"], vec![1, 2])) + .await + .unwrap(); + wb.new_commit() + .commit(tw1.prepare_commit().await.unwrap()) + .await + .unwrap(); + + // Second commit: partition "b" + let mut tw2 = wb.new_write().unwrap(); + tw2.write_arrow_batch(&partitioned_batch(vec!["b", "b"], vec![3, 4])) + .await + .unwrap(); + wb.new_commit() + .commit(tw2.prepare_commit().await.unwrap()) + .await + .unwrap(); + + // Read all + let rb = table.new_read_builder(); + let plan = rb.new_scan().plan().await.unwrap(); + let read = rb.new_read().unwrap(); + let result: Vec = read + .to_arrow(plan.splits()) + .unwrap() + .try_collect() + .await + .unwrap(); + + assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4]); + assert_eq!(collect_string_col(&result, "pt"), vec!["a", "a", "b", "b"]); +} + +#[tokio::test] +async fn test_partitioned_scan_partition_filter() { + use paimon::spec::{Datum, PredicateBuilder}; + + let file_io = memory_file_io(); + let path = "memory:/append_part_filter"; + setup_dirs(&file_io, path).await; + let table = make_table(&file_io, path, partitioned_schema()); + + // Write data to two partitions + let wb = table.new_write_builder(); + let mut tw = wb.new_write().unwrap(); + tw.write_arrow_batch(&partitioned_batch( + vec!["a", "b", "a", "b"], + vec![1, 2, 3, 4], + )) + .await + .unwrap(); + wb.new_commit() + .commit(tw.prepare_commit().await.unwrap()) + .await + .unwrap(); + + // Filter by pt = "a" + let pb = PredicateBuilder::new(table.schema().fields()); + let filter = pb.equal("pt", Datum::String("a".into())).unwrap(); + + let mut rb = table.new_read_builder(); + rb.with_filter(filter); + let plan = rb.new_scan().plan().await.unwrap(); + + // Only partition "a" splits should survive + for split in plan.splits() { + let pt = split.partition().get_string(0).unwrap().to_string(); + assert_eq!(pt, "a"); + } + + let read = rb.new_read().unwrap(); + let result: Vec = read + .to_arrow(plan.splits()) + .unwrap() + .try_collect() + .await + .unwrap(); + + assert_eq!(collect_int_col(&result, "id"), vec![1, 3]); + assert_eq!(collect_string_col(&result, "pt"), vec!["a", "a"]); +} + +// --------------------------------------------------------------------------- +// Partitioned + fixed bucket +// --------------------------------------------------------------------------- + +fn partitioned_bucket_schema(buckets: i32) -> TableSchema { + let schema = Schema::builder() + .column("pt", DataType::VarChar(VarCharType::string_type())) + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .partition_keys(["pt"]) + .option("bucket", buckets.to_string()) + .option("bucket-key", "id") + .build() + .unwrap(); + TableSchema::new(0, &schema) +} + +fn partitioned_value_batch(pts: Vec<&str>, ids: Vec, values: Vec) -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("pt", ArrowDataType::Utf8, false), + ArrowField::new("id", ArrowDataType::Int32, false), + ArrowField::new("value", ArrowDataType::Int32, false), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(pts)), + Arc::new(Int32Array::from(ids)), + Arc::new(Int32Array::from(values)), + ], + ) + .unwrap() +} + +#[tokio::test] +async fn test_partitioned_fixed_bucket_write_read() { + let file_io = memory_file_io(); + let path = "memory:/append_part_bucket"; + setup_dirs(&file_io, path).await; + let table = make_table(&file_io, path, partitioned_bucket_schema(2)); + + let wb = table.new_write_builder(); + let mut tw = wb.new_write().unwrap(); + tw.write_arrow_batch(&partitioned_value_batch( + vec!["a", "a", "b", "b"], + vec![1, 2, 3, 4], + vec![10, 20, 30, 40], + )) + .await + .unwrap(); + wb.new_commit() + .commit(tw.prepare_commit().await.unwrap()) + .await + .unwrap(); + + let rb = table.new_read_builder(); + let plan = rb.new_scan().plan().await.unwrap(); + let read = rb.new_read().unwrap(); + let result: Vec = read + .to_arrow(plan.splits()) + .unwrap() + .try_collect() + .await + .unwrap(); + + assert_eq!(collect_int_col(&result, "id"), vec![1, 2, 3, 4]); + assert_eq!(collect_int_col(&result, "value"), vec![10, 20, 30, 40]); +} + +// --------------------------------------------------------------------------- +// Unsupported: primary key table should be rejected +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn test_reject_primary_key_table() { + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .build() + .unwrap(); + let table_schema = TableSchema::new(0, &schema); + + let file_io = memory_file_io(); + let path = "memory:/append_reject_pk"; + let table = make_table(&file_io, path, table_schema); + + let result = table.new_write_builder().new_write(); + assert!(result.is_err()); + let err = result.err().unwrap(); + assert!( + matches!(&err, paimon::Error::Unsupported { message } if message.contains("primary keys")), + "Expected Unsupported error for PK table, got: {err:?}" + ); +} + +#[tokio::test] +async fn test_reject_fixed_bucket_without_bucket_key() { + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .option("bucket", "4") + .build() + .unwrap(); + let table_schema = TableSchema::new(0, &schema); + + let file_io = memory_file_io(); + let path = "memory:/append_reject_no_bucket_key"; + let table = make_table(&file_io, path, table_schema); + + let result = table.new_write_builder().new_write(); + assert!(result.is_err()); + let err = result.err().unwrap(); + assert!( + matches!(&err, paimon::Error::Unsupported { message } if message.contains("bucket-key")), + "Expected Unsupported error for missing bucket-key, got: {err:?}" + ); +} diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index abcf7448..4e9fdb3b 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Apache Paimon DataFusion Integration (read-only). +//! Apache Paimon DataFusion Integration. //! //! Register a Paimon table as a DataFusion table provider to query it with SQL or DataFrame API. //! diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index 48aa5469..2fa35bf1 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -16,5 +16,7 @@ // under the License. pub(crate) mod scan; +pub(crate) mod sink; pub use scan::PaimonTableScan; +pub use sink::PaimonDataSink; diff --git a/crates/integrations/datafusion/src/physical_plan/sink.rs b/crates/integrations/datafusion/src/physical_plan/sink.rs new file mode 100644 index 00000000..a0c5c9b6 --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/sink.rs @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! DataSink implementation for writing to Paimon tables via DataFusion. + +use std::any::Any; +use std::fmt; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::datasource::sink::DataSink; +use datafusion::error::Result as DFResult; +use datafusion::execution::SendableRecordBatchStream; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::DisplayAs; +use futures::StreamExt; +use paimon::table::Table; + +use crate::error::to_datafusion_error; + +/// DataSink that writes RecordBatches to a Paimon table. +/// +/// Uses the Paimon write pipeline: `WriteBuilder` → `TableWrite` → `TableCommit`. +/// Internal parallelism is handled by `TableWrite` which routes each +/// (partition, bucket) to its own background tokio task. +#[derive(Debug)] +pub struct PaimonDataSink { + table: Table, + schema: ArrowSchemaRef, + overwrite: bool, +} + +impl PaimonDataSink { + pub fn new(table: Table, schema: ArrowSchemaRef, overwrite: bool) -> Self { + Self { + table, + schema, + overwrite, + } + } +} + +impl DisplayAs for PaimonDataSink { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut fmt::Formatter, + ) -> fmt::Result { + write!(f, "PaimonDataSink: table={}", self.table.identifier()) + } +} + +#[async_trait] +impl DataSink for PaimonDataSink { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> &ArrowSchemaRef { + &self.schema + } + + async fn write_all( + &self, + mut data: SendableRecordBatchStream, + _context: &Arc, + ) -> DFResult { + let wb = self.table.new_write_builder(); + let mut tw = wb.new_write().map_err(to_datafusion_error)?; + let mut row_count = 0u64; + + while let Some(batch) = data.next().await { + let batch = batch?; + row_count += batch.num_rows() as u64; + tw.write_arrow_batch(&batch) + .await + .map_err(to_datafusion_error)?; + } + + let messages = tw.prepare_commit().await.map_err(to_datafusion_error)?; + let commit = wb.new_commit(); + + if self.overwrite { + commit + .overwrite(messages) + .await + .map_err(to_datafusion_error)?; + } else { + commit.commit(messages).await.map_err(to_datafusion_error)?; + } + + Ok(row_count) + } +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 65eb07fc..275f943d 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Paimon table provider for DataFusion (read-only). +//! Paimon table provider for DataFusion. use std::any::Any; use std::sync::Arc; @@ -23,12 +23,16 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion::arrow::datatypes::{Field, Schema, SchemaRef as ArrowSchemaRef}; use datafusion::catalog::Session; +use datafusion::datasource::sink::DataSinkExec; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result as DFResult; +use datafusion::logical_expr::dml::InsertOp; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::ExecutionPlan; use paimon::table::Table; +use crate::physical_plan::PaimonDataSink; + use crate::error::to_datafusion_error; use crate::filter_pushdown::{build_pushed_predicate, classify_filter_pushdown}; use crate::physical_plan::PaimonTableScan; @@ -178,6 +182,25 @@ impl TableProvider for PaimonTableProvider { ) } + async fn insert_into( + &self, + _state: &dyn Session, + input: Arc, + insert_op: InsertOp, + ) -> DFResult> { + let overwrite = match insert_op { + InsertOp::Append => false, + InsertOp::Overwrite => true, + other => { + return Err(datafusion::error::DataFusionError::NotImplemented(format!( + "{other} is not supported for Paimon tables" + ))); + } + }; + let sink = PaimonDataSink::new(self.table.clone(), self.schema.clone(), overwrite); + Ok(Arc::new(DataSinkExec::new(input, Arc::new(sink), None))) + } + fn supports_filters_pushdown( &self, filters: &[&Expr], @@ -373,4 +396,250 @@ mod tests { assert_eq!(scan.pushed_predicate(), Some(&expected)); } + + #[tokio::test] + async fn test_insert_into_and_read_back() { + use paimon::io::FileIOBuilder; + use paimon::spec::{DataType, IntType, Schema as PaimonSchema, TableSchema}; + + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let table_path = "memory:/test_df_insert_into"; + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + file_io + .mkdirs(&format!("{table_path}/manifest/")) + .await + .unwrap(); + + let schema = PaimonSchema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .build() + .unwrap(); + let table_schema = TableSchema::new(0, &schema); + let table = paimon::table::Table::new( + file_io, + Identifier::new("default", "test_insert"), + table_path.to_string(), + table_schema, + None, + ); + + let provider = PaimonTableProvider::try_new(table).unwrap(); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::new(provider)).unwrap(); + + // INSERT INTO + let result = ctx + .sql("INSERT INTO t VALUES (1, 10), (2, 20), (3, 30)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Verify count output + let count_array = result[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(count_array.value(0), 3); + + // Read back + let batches = ctx + .sql("SELECT id, value FROM t ORDER BY id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut rows = Vec::new(); + for batch in &batches { + let ids = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let vals = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + rows.push((ids.value(i), vals.value(i))); + } + } + assert_eq!(rows, vec![(1, 10), (2, 20), (3, 30)]); + } + + #[tokio::test] + async fn test_insert_overwrite() { + use paimon::io::FileIOBuilder; + use paimon::spec::{DataType, IntType, Schema as PaimonSchema, TableSchema, VarCharType}; + + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let table_path = "memory:/test_df_insert_overwrite"; + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + file_io + .mkdirs(&format!("{table_path}/manifest/")) + .await + .unwrap(); + + let schema = PaimonSchema::builder() + .column("pt", DataType::VarChar(VarCharType::string_type())) + .column("id", DataType::Int(IntType::new())) + .partition_keys(["pt"]) + .build() + .unwrap(); + let table_schema = TableSchema::new(0, &schema); + let table = paimon::table::Table::new( + file_io, + Identifier::new("default", "test_overwrite"), + table_path.to_string(), + table_schema, + None, + ); + + let provider = PaimonTableProvider::try_new(table).unwrap(); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::new(provider)).unwrap(); + + // Initial INSERT: partition "a" and "b" + ctx.sql("INSERT INTO t VALUES ('a', 1), ('a', 2), ('b', 3), ('b', 4)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // INSERT OVERWRITE with only partition "a" data + // Should overwrite partition "a" but leave partition "b" intact + ctx.sql("INSERT OVERWRITE t VALUES ('a', 10), ('a', 20)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Read back + let batches = ctx + .sql("SELECT pt, id FROM t ORDER BY pt, id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut rows = Vec::new(); + for batch in &batches { + let pts = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let ids = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + rows.push((pts.value(i).to_string(), ids.value(i))); + } + } + // Partition "a" overwritten with new data, partition "b" untouched + assert_eq!( + rows, + vec![ + ("a".to_string(), 10), + ("a".to_string(), 20), + ("b".to_string(), 3), + ("b".to_string(), 4), + ] + ); + } + + #[tokio::test] + async fn test_insert_overwrite_unpartitioned() { + use paimon::io::FileIOBuilder; + use paimon::spec::{DataType, IntType, Schema as PaimonSchema, TableSchema}; + + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let table_path = "memory:/test_df_insert_overwrite_unpart"; + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + file_io + .mkdirs(&format!("{table_path}/manifest/")) + .await + .unwrap(); + + let schema = PaimonSchema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .build() + .unwrap(); + let table_schema = TableSchema::new(0, &schema); + let table = paimon::table::Table::new( + file_io, + Identifier::new("default", "test_overwrite_unpart"), + table_path.to_string(), + table_schema, + None, + ); + + let provider = PaimonTableProvider::try_new(table).unwrap(); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::new(provider)).unwrap(); + + // Initial INSERT + ctx.sql("INSERT INTO t VALUES (1, 10), (2, 20), (3, 30)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // INSERT OVERWRITE on unpartitioned table — full table overwrite + ctx.sql("INSERT OVERWRITE t VALUES (4, 40), (5, 50)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let batches = ctx + .sql("SELECT id, value FROM t ORDER BY id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut rows = Vec::new(); + for batch in &batches { + let ids = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let vals = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + rows.push((ids.value(i), vals.value(i))); + } + } + // Old data fully replaced + assert_eq!(rows, vec![(4, 40), (5, 50)]); + } } diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 10e5fa0d..908781f7 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -64,8 +64,10 @@ arrow-buffer = { workspace = true } arrow-cast = { workspace = true } arrow-ord = { workspace = true } arrow-schema = { workspace = true } +arrow-select = { workspace = true } futures = "0.3" -parquet = { workspace = true, features = ["async", "zstd"] } +tokio-util = { workspace = true, features = ["compat"] } +parquet = { workspace = true, features = ["async", "zstd", "lz4", "snap"] } orc-rust = "0.7.0" async-stream = "0.3.6" reqwest = { version = "0.12", features = ["json"] } diff --git a/crates/paimon/src/arrow/format/mod.rs b/crates/paimon/src/arrow/format/mod.rs index 63fd4e0e..454e6217 100644 --- a/crates/paimon/src/arrow/format/mod.rs +++ b/crates/paimon/src/arrow/format/mod.rs @@ -19,10 +19,12 @@ mod avro; mod orc; mod parquet; -use crate::io::FileRead; +use crate::io::{FileRead, OutputFile}; use crate::spec::{DataField, Predicate}; use crate::table::{ArrowRecordBatchStream, RowRange}; use crate::Error; +use arrow_array::RecordBatch; +use arrow_schema::SchemaRef; use async_trait::async_trait; /// Predicates with the file-level field context needed for pushdown. @@ -58,6 +60,30 @@ pub(crate) trait FormatFileReader: Send + Sync { ) -> crate::Result; } +/// Format-agnostic file writer that streams Arrow RecordBatches directly to storage. +/// +/// Each implementation (Parquet, ORC, ...) handles format-specific encoding. +/// Usage: create via [`create_format_writer`], call [`write`](FormatFileWriter::write) +/// for each batch, then [`close`](FormatFileWriter::close) to finalize the file. +#[async_trait] +pub(crate) trait FormatFileWriter: Send { + /// Write a RecordBatch to the underlying storage. + async fn write(&mut self, batch: &RecordBatch) -> crate::Result<()>; + + /// Number of bytes written so far (approximate, before close). + fn num_bytes(&self) -> usize; + + /// Number of bytes buffered in the current row group (not yet flushed). + fn in_progress_size(&self) -> usize; + + /// Flush the current row group to storage without closing the file. + async fn flush(&mut self) -> crate::Result<()>; + + /// Flush and close the writer, finalizing the file on storage. + /// Returns the total number of bytes written. + async fn close(self: Box) -> crate::Result; +} + /// Create a format reader based on the file extension. pub(crate) fn create_format_reader(path: &str) -> crate::Result> { if path.to_ascii_lowercase().ends_with(".parquet") { @@ -74,3 +100,22 @@ pub(crate) fn create_format_reader(path: &str) -> crate::Result crate::Result> { + let path = output.location(); + if path.to_ascii_lowercase().ends_with(".parquet") { + Ok(Box::new( + parquet::ParquetFormatWriter::new(output, schema, compression, zstd_level).await?, + )) + } else { + Err(Error::Unsupported { + message: format!("unsupported write format: expected .parquet, got: {path}"), + }) + } +} diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index b0aa0ec2..74de8a28 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -use super::{FilePredicates, FormatFileReader}; +use super::{FilePredicates, FormatFileReader, FormatFileWriter}; use crate::arrow::filtering::{predicates_may_match_with_schema, StatsAccessor}; -use crate::io::FileRead; +use crate::io::{FileRead, OutputFile}; use crate::spec::{DataField, DataType, Datum, Predicate, PredicateOperator}; use crate::table::{ArrowRecordBatchStream, RowRange}; use crate::Error; @@ -38,9 +38,11 @@ use parquet::arrow::arrow_reader::{ ArrowPredicate, ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, }; use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch}; -use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::basic::{Compression, ZstdLevel}; use parquet::file::metadata::ParquetMetaDataReader; use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; +use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics as ParquetStatistics; use std::collections::HashMap; use std::ops::Range; @@ -48,6 +50,89 @@ use std::sync::Arc; pub(crate) struct ParquetFormatReader; +/// Parquet implementation of [`FormatFileWriter`]. +/// Streams data directly to storage via `AsyncArrowWriter` + opendal. +pub(crate) struct ParquetFormatWriter { + inner: AsyncArrowWriter>, +} + +impl ParquetFormatWriter { + pub(crate) async fn new( + output: &OutputFile, + schema: arrow_schema::SchemaRef, + compression: &str, + zstd_level: i32, + ) -> crate::Result { + let async_write = output.async_writer().await?; + let codec = parse_compression(compression, zstd_level); + let props = WriterProperties::builder().set_compression(codec).build(); + let inner = AsyncArrowWriter::try_new(async_write, schema, Some(props)).map_err(|e| { + crate::Error::DataInvalid { + message: format!("Failed to create parquet writer: {e}"), + source: None, + } + })?; + Ok(Self { inner }) + } +} + +/// Map Paimon `file.compression` value to parquet [`Compression`]. +fn parse_compression(codec: &str, zstd_level: i32) -> Compression { + match codec.to_ascii_lowercase().as_str() { + "zstd" => { + let level = ZstdLevel::try_new(zstd_level).unwrap_or_default(); + Compression::ZSTD(level) + } + "lz4" => Compression::LZ4_RAW, + "snappy" => Compression::SNAPPY, + "gzip" | "gz" => Compression::GZIP(Default::default()), + "none" | "uncompressed" => Compression::UNCOMPRESSED, + _ => Compression::UNCOMPRESSED, + } +} + +#[async_trait] +impl FormatFileWriter for ParquetFormatWriter { + async fn write(&mut self, batch: &RecordBatch) -> crate::Result<()> { + self.inner + .write(batch) + .await + .map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to write parquet batch: {e}"), + source: None, + }) + } + + fn num_bytes(&self) -> usize { + self.inner.bytes_written() + self.inner.in_progress_size() + } + + fn in_progress_size(&self) -> usize { + self.inner.in_progress_size() + } + + async fn flush(&mut self) -> crate::Result<()> { + self.inner + .flush() + .await + .map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to flush parquet writer: {e}"), + source: None, + }) + } + + async fn close(mut self: Box) -> crate::Result { + self.inner + .finish() + .await + .map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to close parquet writer: {e}"), + source: None, + })?; + Ok(self.inner.bytes_written() as u64) + } +} + #[async_trait] impl FormatFileReader for ParquetFormatReader { async fn read_batch_stream( @@ -1050,7 +1135,12 @@ fn split_ranges_for_concurrency(merged: Vec>, concurrency: usize) -> #[cfg(test)] mod tests { use super::build_parquet_row_filter; + use super::ParquetFormatWriter; + use crate::arrow::format::FormatFileWriter; + use crate::io::FileIOBuilder; use crate::spec::{DataField, DataType, Datum, IntType, PredicateBuilder}; + use arrow_array::{Int32Array, RecordBatch}; + use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; use parquet::schema::{parser::parse_message_type, types::SchemaDescriptor}; use std::sync::Arc; @@ -1241,4 +1331,81 @@ mod tests { let result = super::split_ranges_for_concurrency(merged, 4); assert!(result.is_empty()); } + + fn writer_arrow_schema() -> Arc { + Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", ArrowDataType::Int32, false), + ArrowField::new("value", ArrowDataType::Int32, false), + ])) + } + + fn writer_test_batch( + schema: &Arc, + ids: Vec, + values: Vec, + ) -> RecordBatch { + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(ids)), + Arc::new(Int32Array::from(values)), + ], + ) + .unwrap() + } + + #[tokio::test] + async fn test_parquet_writer_write_and_close() { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let path = "memory:/test_parquet_writer_write_close.parquet"; + let output = file_io.new_output(path).unwrap(); + let schema = writer_arrow_schema(); + + let mut writer: Box = Box::new( + ParquetFormatWriter::new(&output, schema.clone(), "zstd", 1) + .await + .unwrap(), + ); + + let batch = writer_test_batch(&schema, vec![1, 2, 3], vec![10, 20, 30]); + writer.write(&batch).await.unwrap(); + writer.close().await.unwrap(); + + // Verify valid parquet by reading back + let bytes = file_io.new_input(path).unwrap().read().await.unwrap(); + let reader = + parquet::arrow::arrow_reader::ParquetRecordBatchReader::try_new(bytes, 1024).unwrap(); + let total_rows: usize = reader.into_iter().map(|r| r.unwrap().num_rows()).sum(); + assert_eq!(total_rows, 3); + } + + #[tokio::test] + async fn test_parquet_writer_multiple_batches() { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let path = "memory:/test_parquet_writer_multi.parquet"; + let output = file_io.new_output(path).unwrap(); + let schema = writer_arrow_schema(); + + let mut writer: Box = Box::new( + ParquetFormatWriter::new(&output, schema.clone(), "zstd", 1) + .await + .unwrap(), + ); + + writer + .write(&writer_test_batch(&schema, vec![1, 2], vec![10, 20])) + .await + .unwrap(); + writer + .write(&writer_test_batch(&schema, vec![3, 4, 5], vec![30, 40, 50])) + .await + .unwrap(); + writer.close().await.unwrap(); + + let bytes = file_io.new_input(path).unwrap().read().await.unwrap(); + let reader = + parquet::arrow::arrow_reader::ParquetRecordBatchReader::try_new(bytes, 1024).unwrap(); + let total_rows: usize = reader.into_iter().map(|r| r.unwrap().num_rows()).sum(); + assert_eq!(total_rows, 5); + } } diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs index 6f41f11c..93758e87 100644 --- a/crates/paimon/src/io/file_io.rs +++ b/crates/paimon/src/io/file_io.rs @@ -26,6 +26,7 @@ use chrono::{DateTime, Utc}; use opendal::raw::normalize_root; use opendal::Operator; use snafu::ResultExt; +use tokio_util::compat::FuturesAsyncWriteCompatExt; use url::Url; use super::Storage; @@ -309,6 +310,11 @@ impl FileWrite for opendal::Writer { } } +/// Async streaming writer trait for format-level writers (e.g. parquet). +pub trait AsyncFileWrite: tokio::io::AsyncWrite + Unpin + Send {} + +impl AsyncFileWrite for T {} + #[derive(Clone, Debug)] pub struct FileStatus { pub size: u64, @@ -390,10 +396,22 @@ impl OutputFile { } pub async fn writer(&self) -> crate::Result> { + Ok(Box::new(self.opendal_writer().await?)) + } + + /// Get an async streaming writer for format-level writes (e.g. parquet). + pub(crate) async fn async_writer(&self) -> crate::Result> { Ok(Box::new( - self.op.writer(&self.path[self.relative_path_pos..]).await?, + self.opendal_writer() + .await? + .into_futures_async_write() + .compat_write(), )) } + + async fn opendal_writer(&self) -> crate::Result { + Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?) + } } #[cfg(test)] diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index 3867d69b..b3223475 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -44,5 +44,6 @@ pub use catalog::FileSystemCatalog; pub use table::{ CommitMessage, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RESTEnv, RESTSnapshotCommit, ReadBuilder, RenamingSnapshotCommit, RowRange, SnapshotCommit, - SnapshotManager, Table, TableCommit, TableRead, TableScan, TagManager, WriteBuilder, + SnapshotManager, Table, TableCommit, TableRead, TableScan, TableWrite, TagManager, + WriteBuilder, }; diff --git a/crates/paimon/src/spec/binary_row.rs b/crates/paimon/src/spec/binary_row.rs index 0ba6b782..630f189a 100644 --- a/crates/paimon/src/spec/binary_row.rs +++ b/crates/paimon/src/spec/binary_row.rs @@ -20,10 +20,15 @@ use crate::spec::murmur_hash::hash_by_words; use crate::spec::{DataType, Datum}; +use arrow_array::RecordBatch; use serde::{Deserialize, Serialize}; +use std::sync::LazyLock; pub const EMPTY_BINARY_ROW: BinaryRow = BinaryRow::new(0); +pub static EMPTY_SERIALIZED_ROW: LazyLock> = + LazyLock::new(|| BinaryRowBuilder::new(0).build_serialized()); + /// Highest bit mask for detecting inline vs variable-length encoding. const HIGHEST_FIRST_BIT: u64 = 0x80 << 56; @@ -607,6 +612,169 @@ pub fn datums_to_binary_row(datums: &[(&Option, &DataType)]) -> Vec { builder.build_serialized() } +/// Extract a Datum from an Arrow RecordBatch column at the given row index. +pub fn extract_datum_from_arrow( + batch: &RecordBatch, + row_idx: usize, + col_idx: usize, + data_type: &DataType, +) -> crate::Result> { + use arrow_array::Array; + + let col = batch.column(col_idx); + if col.is_null(row_idx) { + return Ok(None); + } + + let datum = match data_type { + DataType::Boolean(_) => { + let arr = col + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch_err("Boolean", col_idx))?; + Datum::Bool(arr.value(row_idx)) + } + DataType::TinyInt(_) => { + let arr = col + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch_err("TinyInt", col_idx))?; + Datum::TinyInt(arr.value(row_idx)) + } + DataType::SmallInt(_) => { + let arr = col + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch_err("SmallInt", col_idx))?; + Datum::SmallInt(arr.value(row_idx)) + } + DataType::Int(_) => { + let arr = col + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch_err("Int", col_idx))?; + Datum::Int(arr.value(row_idx)) + } + DataType::BigInt(_) => { + let arr = col + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch_err("BigInt", col_idx))?; + Datum::Long(arr.value(row_idx)) + } + DataType::Float(_) => { + let arr = col + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch_err("Float", col_idx))?; + Datum::Float(arr.value(row_idx)) + } + DataType::Double(_) => { + let arr = col + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch_err("Double", col_idx))?; + Datum::Double(arr.value(row_idx)) + } + DataType::Char(_) | DataType::VarChar(_) => { + let arr = col + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch_err("String", col_idx))?; + Datum::String(arr.value(row_idx).to_string()) + } + DataType::Date(_) => { + let arr = col + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch_err("Date", col_idx))?; + Datum::Date(arr.value(row_idx)) + } + DataType::Decimal(d) => { + let arr = col + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch_err("Decimal", col_idx))?; + Datum::Decimal { + unscaled: arr.value(row_idx), + precision: d.precision(), + scale: d.scale(), + } + } + DataType::Binary(_) | DataType::VarBinary(_) => { + let arr = col + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch_err("Binary", col_idx))?; + Datum::Bytes(arr.value(row_idx).to_vec()) + } + DataType::Timestamp(ts) => { + if ts.precision() <= 3 { + let arr = col + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch_err("Timestamp(ms)", col_idx))?; + Datum::Timestamp { + millis: arr.value(row_idx), + nanos: 0, + } + } else { + let arr = col + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch_err("Timestamp(us)", col_idx))?; + let micros = arr.value(row_idx); + Datum::Timestamp { + millis: micros / 1000, + nanos: ((micros % 1000) * 1000) as i32, + } + } + } + DataType::LocalZonedTimestamp(ts) => { + if ts.precision() <= 3 { + let arr = col + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch_err("LocalZonedTimestamp(ms)", col_idx))?; + Datum::LocalZonedTimestamp { + millis: arr.value(row_idx), + nanos: 0, + } + } else { + let arr = col + .as_any() + .downcast_ref::() + .ok_or_else(|| type_mismatch_err("LocalZonedTimestamp(us)", col_idx))?; + let micros = arr.value(row_idx); + Datum::LocalZonedTimestamp { + millis: micros / 1000, + nanos: ((micros % 1000) * 1000) as i32, + } + } + } + _ => { + return Err(crate::Error::Unsupported { + message: format!( + "Unsupported data type {:?} for Arrow extraction at column {}", + data_type, col_idx + ), + }); + } + }; + + Ok(Some(datum)) +} + +fn type_mismatch_err(expected: &str, col_idx: usize) -> crate::Error { + crate::Error::DataInvalid { + message: format!( + "Arrow column {} type mismatch: expected {} compatible array", + col_idx, expected + ), + source: None, + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/paimon/src/spec/core_options.rs b/crates/paimon/src/spec/core_options.rs index 17993d9e..bb520226 100644 --- a/crates/paimon/src/spec/core_options.rs +++ b/crates/paimon/src/spec/core_options.rs @@ -27,12 +27,15 @@ const PARTITION_LEGACY_NAME_OPTION: &str = "partition.legacy-name"; const BUCKET_KEY_OPTION: &str = "bucket-key"; const BUCKET_FUNCTION_TYPE_OPTION: &str = "bucket-function.type"; const BUCKET_OPTION: &str = "bucket"; -const DEFAULT_BUCKET: i32 = 1; +const DEFAULT_BUCKET: i32 = -1; const COMMIT_MAX_RETRIES_OPTION: &str = "commit.max-retries"; const COMMIT_TIMEOUT_OPTION: &str = "commit.timeout"; const COMMIT_MIN_RETRY_WAIT_OPTION: &str = "commit.min-retry-wait"; const COMMIT_MAX_RETRY_WAIT_OPTION: &str = "commit.max-retry-wait"; +const FILE_COMPRESSION_OPTION: &str = "file.compression"; +const FILE_COMPRESSION_ZSTD_LEVEL_OPTION: &str = "file.compression.zstd-level"; const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled"; +const WRITE_PARQUET_BUFFER_SIZE_OPTION: &str = "write.parquet-buffer-size"; const DEFAULT_COMMIT_MAX_RETRIES: u32 = 10; const DEFAULT_COMMIT_TIMEOUT_MS: u64 = 120_000; const DEFAULT_COMMIT_MIN_RETRY_WAIT_MS: u64 = 1_000; @@ -43,6 +46,8 @@ pub const SCAN_TAG_NAME_OPTION: &str = "scan.tag-name"; const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024; const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024; const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__"; +const DEFAULT_TARGET_FILE_SIZE: i64 = 256 * 1024 * 1024; +const DEFAULT_WRITE_PARQUET_BUFFER_SIZE: i64 = 256 * 1024 * 1024; /// Typed accessors for common table options. /// @@ -264,6 +269,41 @@ impl<'a> CoreOptions<'a> { .map(|v| v.eq_ignore_ascii_case("default")) .unwrap_or(true) } + + /// Target file size for data files. Default is 128MB. + pub fn target_file_size(&self) -> i64 { + self.options + .get("target-file-size") + .and_then(|v| parse_memory_size(v)) + .unwrap_or(DEFAULT_TARGET_FILE_SIZE) + } + + /// File compression codec (e.g. "lz4", "zstd", "snappy", "none"). + /// Default is "zstd". + pub fn file_compression(&self) -> &str { + self.options + .get(FILE_COMPRESSION_OPTION) + .map(String::as_str) + .unwrap_or("zstd") + } + + /// Zstd compression level. Only meaningful when `file.compression` is `"zstd"`. + /// Default is 1 (matching Paimon Java). + pub fn file_compression_zstd_level(&self) -> i32 { + self.options + .get(FILE_COMPRESSION_ZSTD_LEVEL_OPTION) + .and_then(|v| v.parse().ok()) + .unwrap_or(1) + } + + /// Parquet writer in-progress buffer size limit. Default is 256MB. + /// When the buffered data exceeds this, the writer flushes the current row group. + pub fn write_parquet_buffer_size(&self) -> i64 { + self.options + .get(WRITE_PARQUET_BUFFER_SIZE_OPTION) + .and_then(|v| parse_memory_size(v)) + .unwrap_or(DEFAULT_WRITE_PARQUET_BUFFER_SIZE) + } } /// Parse a memory size string to bytes using binary (1024-based) semantics. @@ -421,7 +461,7 @@ mod tests { fn test_commit_options_defaults() { let options = HashMap::new(); let core = CoreOptions::new(&options); - assert_eq!(core.bucket(), 1); + assert_eq!(core.bucket(), -1); assert_eq!(core.commit_max_retries(), 10); assert_eq!(core.commit_timeout_ms(), 120_000); assert_eq!(core.commit_min_retry_wait_ms(), 1_000); @@ -477,4 +517,21 @@ mod tests { Some(TimeTravelSelector::TimestampMillis(1234)) ); } + + #[test] + fn test_write_options_defaults() { + let options = HashMap::new(); + let core = CoreOptions::new(&options); + assert_eq!(core.write_parquet_buffer_size(), 256 * 1024 * 1024); + } + + #[test] + fn test_write_options_custom() { + let options = HashMap::from([( + WRITE_PARQUET_BUFFER_SIZE_OPTION.to_string(), + "32mb".to_string(), + )]); + let core = CoreOptions::new(&options); + assert_eq!(core.write_parquet_buffer_size(), 32 * 1024 * 1024); + } } diff --git a/crates/paimon/src/spec/partition_utils.rs b/crates/paimon/src/spec/partition_utils.rs index fa41cfc7..5d05863c 100644 --- a/crates/paimon/src/spec/partition_utils.rs +++ b/crates/paimon/src/spec/partition_utils.rs @@ -38,6 +38,7 @@ const MILLIS_PER_DAY: i64 = 86_400_000; /// (escaped directory path). /// /// Reference: `org.apache.paimon.utils.InternalRowPartitionComputer` in Java Paimon. +#[derive(Debug)] pub(crate) struct PartitionComputer { partition_keys: Vec, partition_fields: Vec, diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs index 62ab3fe1..b0baf662 100644 --- a/crates/paimon/src/spec/schema.rs +++ b/crates/paimon/src/spec/schema.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::spec::core_options::CoreOptions; use crate::spec::types::{DataType, RowType}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -115,6 +116,25 @@ impl TableSchema { pub fn time_millis(&self) -> i64 { self.time_millis } + + /// Compute the effective bucket key columns. + /// + /// Priority: explicit `bucket-key` option > primary keys > all non-partition fields. + pub fn bucket_keys(&self) -> Vec { + let core_options = CoreOptions::new(&self.options); + if let Some(keys) = core_options.bucket_key() { + return keys; + } + if !self.primary_keys.is_empty() { + return self.primary_keys.clone(); + } + let partition_set: HashSet<&str> = self.partition_keys.iter().map(String::as_str).collect(); + self.fields + .iter() + .filter(|f| !partition_set.contains(f.name())) + .map(|f| f.name().to_string()) + .collect() + } } pub const ROW_ID_FIELD_NAME: &str = "_ROW_ID"; diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index c17ebbc7..2fbd0a86 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -33,6 +33,7 @@ mod source; mod stats_filter; pub(crate) mod table_commit; mod table_scan; +pub(crate) mod table_write; mod tag_manager; mod write_builder; @@ -52,6 +53,7 @@ pub use source::{ }; pub use table_commit::TableCommit; pub use table_scan::TableScan; +pub use table_write::TableWrite; pub use tag_manager::TagManager; pub use write_builder::WriteBuilder; diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index bc7f5c11..c36e9b46 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -48,7 +48,6 @@ pub struct TableCommit { snapshot_commit: Arc, commit_user: String, total_buckets: i32, - overwrite_partition: Option>, // commit config commit_max_retries: u32, commit_timeout_ms: u64, @@ -58,11 +57,7 @@ pub struct TableCommit { } impl TableCommit { - pub fn new( - table: Table, - commit_user: String, - overwrite_partition: Option>, - ) -> Self { + pub fn new(table: Table, commit_user: String) -> Self { let snapshot_manager = SnapshotManager::new(table.file_io.clone(), table.location.clone()); let snapshot_commit = if let Some(env) = &table.rest_env { env.snapshot_commit() @@ -84,7 +79,6 @@ impl TableCommit { snapshot_commit, commit_user, total_buckets, - overwrite_partition, commit_max_retries, commit_timeout_ms, commit_min_retry_wait_ms, @@ -93,36 +87,87 @@ impl TableCommit { } } - /// Commit new files. Uses OVERWRITE mode if overwrite_partition was set - /// in the constructor, otherwise uses APPEND mode. + /// Commit new files in APPEND mode. pub async fn commit(&self, commit_messages: Vec) -> Result<()> { if commit_messages.is_empty() { return Ok(()); } let commit_entries = self.messages_to_entries(&commit_messages); + self.try_commit( + CommitKind::APPEND, + CommitEntriesPlan::Static(commit_entries), + ) + .await + } - if let Some(overwrite_partition) = &self.overwrite_partition { - let partition_predicate = if overwrite_partition.is_empty() { - None - } else { - Some(self.build_partition_predicate(overwrite_partition)?) - }; - self.try_commit( - CommitKind::OVERWRITE, - CommitEntriesPlan::Overwrite { - partition_predicate, - new_entries: commit_entries, - }, - ) - .await - } else { - self.try_commit( - CommitKind::APPEND, - CommitEntriesPlan::Static(commit_entries), - ) - .await + /// Overwrite with dynamic partition detection. + /// + /// Extracts the set of partitions touched by `commit_messages` and overwrites + /// only those partitions. For unpartitioned tables this is a full table overwrite. + pub async fn overwrite(&self, commit_messages: Vec) -> Result<()> { + if commit_messages.is_empty() { + return Ok(()); + } + + let commit_entries = self.messages_to_entries(&commit_messages); + let partition_predicate = self.build_dynamic_partition_predicate(&commit_messages)?; + self.try_commit( + CommitKind::OVERWRITE, + CommitEntriesPlan::Overwrite { + partition_predicate, + new_entries: commit_entries, + }, + ) + .await + } + + /// Build a dynamic partition predicate from the partitions present in commit messages. + /// + /// Returns `None` for unpartitioned tables (full table overwrite). + fn build_dynamic_partition_predicate( + &self, + commit_messages: &[CommitMessage], + ) -> Result> { + let partition_fields = self.table.schema().partition_fields(); + if partition_fields.is_empty() { + return Ok(None); } + + let data_types: Vec<_> = partition_fields + .iter() + .map(|f| f.data_type().clone()) + .collect(); + let partition_keys: Vec<_> = self + .table + .schema() + .partition_keys() + .iter() + .map(|s| s.to_string()) + .collect(); + + // Collect unique partition bytes + let mut seen = std::collections::HashSet::new(); + let mut partition_specs: Vec> = Vec::new(); + for msg in commit_messages { + if seen.insert(msg.partition.clone()) { + let row = BinaryRow::from_serialized_bytes(&msg.partition)?; + let mut spec = HashMap::new(); + for (i, key) in partition_keys.iter().enumerate() { + if let Some(datum) = extract_datum(&row, i, &data_types[i])? { + spec.insert(key.clone(), datum); + } + } + partition_specs.push(spec); + } + } + + let predicates: Vec = partition_specs + .iter() + .map(|p| self.build_partition_predicate(p)) + .collect::>>()?; + + Ok(Some(Predicate::or(predicates))) } /// Build a partition predicate from key-value pairs. @@ -747,12 +792,12 @@ mod tests { fn setup_commit(file_io: &FileIO, table_path: &str) -> TableCommit { let table = test_table(file_io, table_path); - TableCommit::new(table, "test-user".to_string(), None) + TableCommit::new(table, "test-user".to_string()) } fn setup_partitioned_commit(file_io: &FileIO, table_path: &str) -> TableCommit { let table = test_partitioned_table(file_io, table_path); - TableCommit::new(table, "test-user".to_string(), None) + TableCommit::new(table, "test-user".to_string()) } fn partition_bytes(pt: &str) -> Vec { @@ -923,16 +968,9 @@ mod tests { .await .unwrap(); - // Overwrite partition "a" with new data - let mut overwrite_partition = HashMap::new(); - overwrite_partition.insert("pt".to_string(), Datum::String("a".to_string())); - - let table = test_partitioned_table(&file_io, table_path); - let overwrite_commit = - TableCommit::new(table, "test-user".to_string(), Some(overwrite_partition)); - - overwrite_commit - .commit(vec![CommitMessage::new( + // Overwrite partition "a" with new data (dynamic partition overwrite) + commit + .overwrite(vec![CommitMessage::new( partition_bytes("a"), 0, vec![test_data_file("data-a2.parquet", 50)], diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs new file mode 100644 index 00000000..eb0b58ee --- /dev/null +++ b/crates/paimon/src/table/table_write.rs @@ -0,0 +1,783 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! TableWrite for writing Arrow data to Paimon tables. +//! +//! Reference: [pypaimon TableWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/table_write.py) +//! and [pypaimon FileStoreWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/file_store_write.py) + +use crate::arrow::format::{create_format_writer, FormatFileWriter}; +use crate::io::FileIO; +use crate::spec::stats::BinaryTableStats; +use crate::spec::PartitionComputer; +use crate::spec::{ + extract_datum_from_arrow, BinaryRow, BinaryRowBuilder, CoreOptions, DataField, DataFileMeta, + DataType, Datum, EMPTY_SERIALIZED_ROW, +}; +use crate::table::commit_message::CommitMessage; +use crate::table::Table; +use crate::Result; +use arrow_array::RecordBatch; +use chrono::Utc; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::task::JoinSet; + +type PartitionBucketKey = (Vec, i32); + +/// TableWrite writes Arrow RecordBatches to Paimon data files. +/// +/// Each (partition, bucket) pair gets its own `DataFileWriter` held in a HashMap. +/// Batches are routed to the correct writer based on partition/bucket. +/// +/// Call `prepare_commit()` to close all writers and collect +/// `CommitMessage`s for use with `TableCommit`. +/// +/// Reference: [pypaimon BatchTableWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/table_write.py) +pub struct TableWrite { + table: Table, + partition_writers: HashMap, + partition_computer: PartitionComputer, + partition_keys: Vec, + partition_field_indices: Vec, + bucket_key_indices: Vec, + total_buckets: i32, + schema_id: i64, + target_file_size: i64, + file_compression: String, + file_compression_zstd_level: i32, + write_buffer_size: i64, +} + +impl TableWrite { + pub(crate) fn new(table: &Table) -> crate::Result { + let schema = table.schema(); + let core_options = CoreOptions::new(schema.options()); + + if !schema.primary_keys().is_empty() { + return Err(crate::Error::Unsupported { + message: "TableWrite does not support tables with primary keys".to_string(), + }); + } + if core_options.data_evolution_enabled() { + return Err(crate::Error::Unsupported { + message: "TableWrite does not support data-evolution.enabled mode".to_string(), + }); + } + + let total_buckets = core_options.bucket(); + if total_buckets != -1 && core_options.bucket_key().is_none() { + return Err(crate::Error::Unsupported { + message: "Append tables with fixed bucket must configure 'bucket-key'".to_string(), + }); + } + let target_file_size = core_options.target_file_size(); + let file_compression = core_options.file_compression().to_string(); + let file_compression_zstd_level = core_options.file_compression_zstd_level(); + let write_buffer_size = core_options.write_parquet_buffer_size(); + let partition_keys: Vec = schema.partition_keys().to_vec(); + let fields = schema.fields(); + + let partition_field_indices: Vec = partition_keys + .iter() + .filter_map(|pk| fields.iter().position(|f| f.name() == pk)) + .collect(); + + // Bucket keys: resolved by TableSchema + let bucket_keys = schema.bucket_keys(); + + let bucket_key_indices: Vec = bucket_keys + .iter() + .filter_map(|bk| fields.iter().position(|f| f.name() == bk)) + .collect(); + + let partition_computer = PartitionComputer::new( + &partition_keys, + fields, + core_options.partition_default_name(), + core_options.legacy_partition_name(), + ) + .unwrap(); + + Ok(Self { + table: table.clone(), + partition_writers: HashMap::new(), + partition_computer, + partition_keys, + partition_field_indices, + bucket_key_indices, + total_buckets, + schema_id: schema.id(), + target_file_size, + file_compression, + file_compression_zstd_level, + write_buffer_size, + }) + } + + /// Write an Arrow RecordBatch. Rows are routed to the correct partition and bucket. + pub async fn write_arrow_batch(&mut self, batch: &RecordBatch) -> Result<()> { + if batch.num_rows() == 0 { + return Ok(()); + } + + let grouped = self.divide_by_partition_bucket(batch)?; + for ((partition_bytes, bucket), sub_batch) in grouped { + self.write_bucket(partition_bytes, bucket, sub_batch) + .await?; + } + Ok(()) + } + + /// Group rows by (partition_bytes, bucket) and return sub-batches. + fn divide_by_partition_bucket( + &self, + batch: &RecordBatch, + ) -> Result> { + // Fast path: no partitions and single bucket — skip per-row routing + if self.partition_field_indices.is_empty() && self.total_buckets <= 1 { + return Ok(vec![((EMPTY_SERIALIZED_ROW.clone(), 0), batch.clone())]); + } + + let fields = self.table.schema().fields(); + let mut groups: HashMap> = HashMap::new(); + + for row_idx in 0..batch.num_rows() { + let (partition_bytes, bucket) = + self.extract_partition_bucket(batch, row_idx, fields)?; + groups + .entry((partition_bytes, bucket)) + .or_default() + .push(row_idx); + } + + let mut result = Vec::with_capacity(groups.len()); + for (key, row_indices) in groups { + let sub_batch = if row_indices.len() == batch.num_rows() { + batch.clone() + } else { + let indices = arrow_array::UInt32Array::from( + row_indices.iter().map(|&i| i as u32).collect::>(), + ); + let columns: Vec> = batch + .columns() + .iter() + .map(|col| arrow_select::take::take(col.as_ref(), &indices, None)) + .collect::, _>>() + .map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to take rows: {e}"), + source: None, + })?; + RecordBatch::try_new(batch.schema(), columns).map_err(|e| { + crate::Error::DataInvalid { + message: format!("Failed to create sub-batch: {e}"), + source: None, + } + })? + }; + result.push((key, sub_batch)); + } + Ok(result) + } + + /// Write a batch directly to the DataFileWriter for the given (partition, bucket). + async fn write_bucket( + &mut self, + partition_bytes: Vec, + bucket: i32, + batch: RecordBatch, + ) -> Result<()> { + let key = (partition_bytes, bucket); + if !self.partition_writers.contains_key(&key) { + self.create_writer(key.0.clone(), key.1)?; + } + let writer = self.partition_writers.get_mut(&key).unwrap(); + writer.write(&batch).await + } + + /// Write multiple Arrow RecordBatches. + pub async fn write_arrow(&mut self, batches: &[RecordBatch]) -> Result<()> { + for batch in batches { + self.write_arrow_batch(batch).await?; + } + Ok(()) + } + + /// Close all writers and collect CommitMessages for use with TableCommit. + /// Writers are cleared after this call, allowing the TableWrite to be reused. + pub async fn prepare_commit(&mut self) -> Result> { + let writers: Vec<(PartitionBucketKey, DataFileWriter)> = + self.partition_writers.drain().collect(); + + let futures: Vec<_> = writers + .into_iter() + .map(|((partition_bytes, bucket), mut writer)| async move { + let files = writer.prepare_commit().await?; + Ok::<_, crate::Error>((partition_bytes, bucket, files)) + }) + .collect(); + + let results = futures::future::try_join_all(futures).await?; + + let mut messages = Vec::new(); + for (partition_bytes, bucket, files) in results { + if !files.is_empty() { + messages.push(CommitMessage::new(partition_bytes, bucket, files)); + } + } + Ok(messages) + } + + /// Extract partition bytes and bucket for a single row. + fn extract_partition_bucket( + &self, + batch: &RecordBatch, + row_idx: usize, + fields: &[DataField], + ) -> Result { + // Build partition BinaryRow + let partition_bytes = if self.partition_field_indices.is_empty() { + EMPTY_SERIALIZED_ROW.clone() + } else { + let mut builder = BinaryRowBuilder::new(self.partition_field_indices.len() as i32); + for (pos, &field_idx) in self.partition_field_indices.iter().enumerate() { + let field = &fields[field_idx]; + match extract_datum_from_arrow(batch, row_idx, field_idx, field.data_type())? { + Some(datum) => builder.write_datum(pos, &datum, field.data_type()), + None => builder.set_null_at(pos), + } + } + builder.build_serialized() + }; + + // Compute bucket + let bucket = if self.total_buckets <= 1 || self.bucket_key_indices.is_empty() { + 0 + } else { + let mut datums: Vec<(Datum, DataType)> = Vec::new(); + for &field_idx in &self.bucket_key_indices { + let field = &fields[field_idx]; + let datum = extract_datum_from_arrow(batch, row_idx, field_idx, field.data_type())?; + if let Some(d) = datum { + datums.push((d, field.data_type().clone())); + } + } + let refs: Vec<(&Datum, &DataType)> = datums.iter().map(|(d, t)| (d, t)).collect(); + BinaryRow::compute_bucket_from_datums(&refs, self.total_buckets).unwrap_or(0) + }; + + Ok((partition_bytes, bucket)) + } + + fn create_writer(&mut self, partition_bytes: Vec, bucket: i32) -> Result<()> { + let partition_path = if self.partition_keys.is_empty() { + String::new() + } else { + let row = BinaryRow::from_serialized_bytes(&partition_bytes)?; + self.partition_computer.generate_partition_path(&row)? + }; + + let writer = DataFileWriter::new( + self.table.file_io().clone(), + self.table.location().to_string(), + partition_path, + bucket, + self.schema_id, + self.target_file_size, + self.file_compression.clone(), + self.file_compression_zstd_level, + self.write_buffer_size, + ); + + self.partition_writers + .insert((partition_bytes, bucket), writer); + Ok(()) + } +} + +/// Internal writer that produces parquet data files for a single (partition, bucket). +/// +/// Batches are accumulated into a single `FormatFileWriter` that streams directly +/// to storage. Call `prepare_commit()` to finalize and collect file metadata. +struct DataFileWriter { + file_io: FileIO, + table_location: String, + partition_path: String, + bucket: i32, + schema_id: i64, + target_file_size: i64, + file_compression: String, + file_compression_zstd_level: i32, + write_buffer_size: i64, + written_files: Vec, + /// Background file close tasks spawned during rolling. + in_flight_closes: JoinSet>, + /// Current open format writer, lazily created on first write. + current_writer: Option>, + current_file_name: Option, + current_row_count: i64, +} + +impl DataFileWriter { + #[allow(clippy::too_many_arguments)] + fn new( + file_io: FileIO, + table_location: String, + partition_path: String, + bucket: i32, + schema_id: i64, + target_file_size: i64, + file_compression: String, + file_compression_zstd_level: i32, + write_buffer_size: i64, + ) -> Self { + Self { + file_io, + table_location, + partition_path, + bucket, + schema_id, + target_file_size, + file_compression, + file_compression_zstd_level, + write_buffer_size, + written_files: Vec::new(), + in_flight_closes: JoinSet::new(), + current_writer: None, + current_file_name: None, + current_row_count: 0, + } + } + + /// Write a RecordBatch. Rolls to a new file when target size is reached. + async fn write(&mut self, batch: &RecordBatch) -> Result<()> { + if batch.num_rows() == 0 { + return Ok(()); + } + + if self.current_writer.is_none() { + self.open_new_file(batch.schema()).await?; + } + + self.current_row_count += batch.num_rows() as i64; + self.current_writer.as_mut().unwrap().write(batch).await?; + + // Roll to a new file if target size is reached — close in background + if self.current_writer.as_ref().unwrap().num_bytes() as i64 >= self.target_file_size { + self.roll_file(); + } + + // Flush row group if in-progress buffer exceeds write_buffer_size + if let Some(w) = self.current_writer.as_mut() { + if w.in_progress_size() as i64 >= self.write_buffer_size { + w.flush().await?; + } + } + + Ok(()) + } + + async fn open_new_file(&mut self, schema: arrow_schema::SchemaRef) -> Result<()> { + let file_name = format!( + "data-{}-{}.parquet", + uuid::Uuid::new_v4(), + self.written_files.len() + ); + + let bucket_dir = if self.partition_path.is_empty() { + format!("{}/bucket-{}", self.table_location, self.bucket) + } else { + format!( + "{}/{}/bucket-{}", + self.table_location, self.partition_path, self.bucket + ) + }; + self.file_io.mkdirs(&format!("{bucket_dir}/")).await?; + + let file_path = format!("{}/{}", bucket_dir, file_name); + let output = self.file_io.new_output(&file_path)?; + let writer = create_format_writer( + &output, + schema, + &self.file_compression, + self.file_compression_zstd_level, + ) + .await?; + self.current_writer = Some(writer); + self.current_file_name = Some(file_name); + self.current_row_count = 0; + Ok(()) + } + + /// Close the current file writer and record the file metadata. + async fn close_current_file(&mut self) -> Result<()> { + let writer = match self.current_writer.take() { + Some(w) => w, + None => return Ok(()), + }; + let file_name = self.current_file_name.take().unwrap(); + + let row_count = self.current_row_count; + self.current_row_count = 0; + let file_size = writer.close().await? as i64; + + let meta = Self::build_meta(file_name, file_size, row_count, self.schema_id); + self.written_files.push(meta); + Ok(()) + } + + /// Spawn the current writer's close in the background for non-blocking rolling. + fn roll_file(&mut self) { + let writer = match self.current_writer.take() { + Some(w) => w, + None => return, + }; + let file_name = self.current_file_name.take().unwrap(); + let row_count = self.current_row_count; + self.current_row_count = 0; + let schema_id = self.schema_id; + + self.in_flight_closes.spawn(async move { + let file_size = writer.close().await? as i64; + Ok(Self::build_meta(file_name, file_size, row_count, schema_id)) + }); + } + + /// Close the current writer and return all written file metadata. + async fn prepare_commit(&mut self) -> Result> { + self.close_current_file().await?; + while let Some(result) = self.in_flight_closes.join_next().await { + let meta = result.map_err(|e| crate::Error::DataInvalid { + message: format!("Background file close task panicked: {e}"), + source: None, + })??; + self.written_files.push(meta); + } + Ok(std::mem::take(&mut self.written_files)) + } + + fn build_meta( + file_name: String, + file_size: i64, + row_count: i64, + schema_id: i64, + ) -> DataFileMeta { + DataFileMeta { + file_name, + file_size, + row_count, + min_key: EMPTY_SERIALIZED_ROW.clone(), + max_key: EMPTY_SERIALIZED_ROW.clone(), + key_stats: BinaryTableStats::new( + EMPTY_SERIALIZED_ROW.clone(), + EMPTY_SERIALIZED_ROW.clone(), + vec![], + ), + value_stats: BinaryTableStats::new( + EMPTY_SERIALIZED_ROW.clone(), + EMPTY_SERIALIZED_ROW.clone(), + vec![], + ), + min_sequence_number: 0, + max_sequence_number: 0, + schema_id, + level: 0, + extra_files: vec![], + creation_time: Some(Utc::now()), + delete_row_count: Some(0), + embedded_index: None, + file_source: Some(0), // APPEND + value_stats_cols: Some(vec![]), + external_path: None, + first_row_id: None, + write_cols: None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::catalog::Identifier; + use crate::io::FileIOBuilder; + use crate::spec::{IntType, Schema, TableSchema, VarCharType}; + use crate::table::{SnapshotManager, TableCommit}; + use arrow_array::Int32Array; + use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; + use std::sync::Arc; + + fn test_file_io() -> FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + fn test_schema() -> TableSchema { + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn test_partitioned_schema() -> TableSchema { + let schema = Schema::builder() + .column("pt", DataType::VarChar(VarCharType::string_type())) + .column("id", DataType::Int(IntType::new())) + .partition_keys(["pt"]) + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn test_table(file_io: &FileIO, table_path: &str) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test_table"), + table_path.to_string(), + test_schema(), + None, + ) + } + + fn test_partitioned_table(file_io: &FileIO, table_path: &str) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test_table"), + table_path.to_string(), + test_partitioned_schema(), + None, + ) + } + + async fn setup_dirs(file_io: &FileIO, table_path: &str) { + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + file_io + .mkdirs(&format!("{table_path}/manifest/")) + .await + .unwrap(); + } + + fn make_batch(ids: Vec, values: Vec) -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", ArrowDataType::Int32, false), + ArrowField::new("value", ArrowDataType::Int32, false), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(ids)), + Arc::new(Int32Array::from(values)), + ], + ) + .unwrap() + } + + fn make_partitioned_batch(pts: Vec<&str>, ids: Vec) -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("pt", ArrowDataType::Utf8, false), + ArrowField::new("id", ArrowDataType::Int32, false), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(arrow_array::StringArray::from(pts)), + Arc::new(Int32Array::from(ids)), + ], + ) + .unwrap() + } + + #[tokio::test] + async fn test_write_and_commit() { + let file_io = test_file_io(); + let table_path = "memory:/test_table_write"; + setup_dirs(&file_io, table_path).await; + + let table = test_table(&file_io, table_path); + let mut table_write = TableWrite::new(&table).unwrap(); + + let batch = make_batch(vec![1, 2, 3], vec![10, 20, 30]); + table_write.write_arrow_batch(&batch).await.unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].bucket, 0); + assert_eq!(messages[0].new_files.len(), 1); + assert_eq!(messages[0].new_files[0].row_count, 3); + + // Commit and verify snapshot + let commit = TableCommit::new(table, "test-user".to_string()); + commit.commit(messages).await.unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 1); + assert_eq!(snapshot.total_record_count(), Some(3)); + } + + #[tokio::test] + async fn test_write_partitioned() { + let file_io = test_file_io(); + let table_path = "memory:/test_table_write_partitioned"; + setup_dirs(&file_io, table_path).await; + + let table = test_partitioned_table(&file_io, table_path); + let mut table_write = TableWrite::new(&table).unwrap(); + + let batch = make_partitioned_batch(vec!["a", "b", "a"], vec![1, 2, 3]); + table_write.write_arrow_batch(&batch).await.unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + // Should have 2 commit messages (one per partition) + assert_eq!(messages.len(), 2); + + let total_rows: i64 = messages + .iter() + .flat_map(|m| &m.new_files) + .map(|f| f.row_count) + .sum(); + assert_eq!(total_rows, 3); + + // Commit and verify + let commit = TableCommit::new(table, "test-user".to_string()); + commit.commit(messages).await.unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 1); + assert_eq!(snapshot.total_record_count(), Some(3)); + } + + #[tokio::test] + async fn test_write_empty_batch() { + let file_io = test_file_io(); + let table_path = "memory:/test_table_write_empty"; + let table = test_table(&file_io, table_path); + let mut table_write = TableWrite::new(&table).unwrap(); + + let batch = make_batch(vec![], vec![]); + table_write.write_arrow_batch(&batch).await.unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + assert!(messages.is_empty()); + } + + #[tokio::test] + async fn test_prepare_commit_reusable() { + let file_io = test_file_io(); + let table_path = "memory:/test_table_write_reuse"; + setup_dirs(&file_io, table_path).await; + + let table = test_table(&file_io, table_path); + let mut table_write = TableWrite::new(&table).unwrap(); + + // First write + prepare_commit + table_write + .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20])) + .await + .unwrap(); + let messages1 = table_write.prepare_commit().await.unwrap(); + assert_eq!(messages1.len(), 1); + assert_eq!(messages1[0].new_files[0].row_count, 2); + + // Second write + prepare_commit (reuse) + table_write + .write_arrow_batch(&make_batch(vec![3, 4, 5], vec![30, 40, 50])) + .await + .unwrap(); + let messages2 = table_write.prepare_commit().await.unwrap(); + assert_eq!(messages2.len(), 1); + assert_eq!(messages2[0].new_files[0].row_count, 3); + + // Empty prepare_commit is fine + let messages3 = table_write.prepare_commit().await.unwrap(); + assert!(messages3.is_empty()); + } + + #[tokio::test] + async fn test_write_multiple_batches() { + let file_io = test_file_io(); + let table_path = "memory:/test_table_write_multi"; + setup_dirs(&file_io, table_path).await; + + let table = test_table(&file_io, table_path); + let mut table_write = TableWrite::new(&table).unwrap(); + + table_write + .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20])) + .await + .unwrap(); + table_write + .write_arrow_batch(&make_batch(vec![3, 4], vec![30, 40])) + .await + .unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + assert_eq!(messages.len(), 1); + // Multiple batches accumulate into a single file + assert_eq!(messages[0].new_files.len(), 1); + + let total_rows: i64 = messages[0].new_files.iter().map(|f| f.row_count).sum(); + assert_eq!(total_rows, 4); + } + + #[tokio::test] + async fn test_write_rolling_on_target_file_size() { + let file_io = test_file_io(); + let table_path = "memory:/test_table_write_rolling"; + setup_dirs(&file_io, table_path).await; + + // Create table with very small target-file-size to trigger rolling + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .option("target-file-size", "1b") + .build() + .unwrap(); + let table_schema = TableSchema::new(0, &schema); + let table = Table::new( + file_io.clone(), + Identifier::new("default", "test_table"), + table_path.to_string(), + table_schema, + None, + ); + + let mut table_write = TableWrite::new(&table).unwrap(); + + // Write multiple batches — each should roll to a new file + table_write + .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20])) + .await + .unwrap(); + table_write + .write_arrow_batch(&make_batch(vec![3, 4], vec![30, 40])) + .await + .unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + assert_eq!(messages.len(), 1); + // With 1-byte target, each batch should produce a separate file + assert_eq!(messages[0].new_files.len(), 2); + + let total_rows: i64 = messages[0].new_files.iter().map(|f| f.row_count).sum(); + assert_eq!(total_rows, 4); + } +} diff --git a/crates/paimon/src/table/write_builder.rs b/crates/paimon/src/table/write_builder.rs index d6458cf8..45db333c 100644 --- a/crates/paimon/src/table/write_builder.rs +++ b/crates/paimon/src/table/write_builder.rs @@ -19,9 +19,7 @@ //! //! Reference: [pypaimon WriteBuilder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/write_builder.py) -use crate::spec::Datum; -use crate::table::{Table, TableCommit}; -use std::collections::HashMap; +use crate::table::{Table, TableCommit, TableWrite}; use uuid::Uuid; /// Builder for creating table writers and committers. @@ -31,7 +29,6 @@ use uuid::Uuid; pub struct WriteBuilder<'a> { table: &'a Table, commit_user: String, - overwrite_partition: Option>, } impl<'a> WriteBuilder<'a> { @@ -39,25 +36,16 @@ impl<'a> WriteBuilder<'a> { Self { table, commit_user: Uuid::new_v4().to_string(), - overwrite_partition: None, } } - /// Set overwrite mode. If `partition` is None, overwrites the entire table. - /// If `partition` is Some, overwrites only the specified partition. - pub fn overwrite(&mut self, partition: Option>) -> &mut Self { - self.overwrite_partition = Some(partition.unwrap_or_default()); - self - } - /// Create a new TableCommit for committing write results. pub fn new_commit(&self) -> TableCommit { - TableCommit::new( - self.table.clone(), - self.commit_user.clone(), - self.overwrite_partition.clone(), - ) + TableCommit::new(self.table.clone(), self.commit_user.clone()) } - // TODO: pub fn new_write(&self) -> TableWrite { ... } + /// Create a new TableWrite for writing Arrow data. + pub fn new_write(&self) -> crate::Result { + TableWrite::new(self.table) + } } From 5d59939ad7eb4c685843e579c273ba73cd69617c Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sun, 12 Apr 2026 09:29:31 +0800 Subject: [PATCH 2/4] Fix comments --- crates/paimon/src/table/table_commit.rs | 92 +++++++++++++++++++++---- 1 file changed, 79 insertions(+), 13 deletions(-) diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index c36e9b46..06df5248 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -54,6 +54,7 @@ pub struct TableCommit { commit_min_retry_wait_ms: u64, commit_max_retry_wait_ms: u64, row_tracking_enabled: bool, + partition_default_name: String, } impl TableCommit { @@ -73,6 +74,7 @@ impl TableCommit { let commit_min_retry_wait_ms = core_options.commit_min_retry_wait_ms(); let commit_max_retry_wait_ms = core_options.commit_max_retry_wait_ms(); let row_tracking_enabled = core_options.row_tracking_enabled(); + let partition_default_name = core_options.partition_default_name().to_string(); Self { table, snapshot_manager, @@ -84,6 +86,7 @@ impl TableCommit { commit_min_retry_wait_ms, commit_max_retry_wait_ms, row_tracking_enabled, + partition_default_name, } } @@ -148,15 +151,13 @@ impl TableCommit { // Collect unique partition bytes let mut seen = std::collections::HashSet::new(); - let mut partition_specs: Vec> = Vec::new(); + let mut partition_specs: Vec>> = Vec::new(); for msg in commit_messages { if seen.insert(msg.partition.clone()) { let row = BinaryRow::from_serialized_bytes(&msg.partition)?; let mut spec = HashMap::new(); for (i, key) in partition_keys.iter().enumerate() { - if let Some(datum) = extract_datum(&row, i, &data_types[i])? { - spec.insert(key.clone(), datum); - } + spec.insert(key.clone(), extract_datum(&row, i, &data_types[i])?); } partition_specs.push(spec); } @@ -170,18 +171,27 @@ impl TableCommit { Ok(Some(Predicate::or(predicates))) } - /// Build a partition predicate from key-value pairs. - fn build_partition_predicate(&self, partition: &HashMap) -> Result { + /// Build a partition predicate from key-value pairs, handling NULL via IS NULL. + fn build_partition_predicate( + &self, + partition: &HashMap>, + ) -> Result { let pb = PredicateBuilder::new(&self.table.schema().partition_fields()); let predicates: Vec = partition .iter() - .map(|(key, value)| pb.equal(key, value.clone())) + .map(|(key, value)| match value { + Some(v) => pb.equal(key, v.clone()), + None => pb.is_null(key), + }) .collect::>>()?; Ok(Predicate::and(predicates)) } /// Drop specific partitions (OVERWRITE with only deletes). - pub async fn truncate_partitions(&self, partitions: Vec>) -> Result<()> { + pub async fn truncate_partitions( + &self, + partitions: Vec>>, + ) -> Result<()> { if partitions.is_empty() { return Ok(()); } @@ -648,9 +658,11 @@ impl TableCommit { } let row = BinaryRow::from_serialized_bytes(partition_bytes)?; for (i, key) in partition_keys.iter().enumerate() { - if let Some(datum) = extract_datum(&row, i, &data_types[i])? { - spec.insert(key.clone(), datum.to_string()); - } + let value = match extract_datum(&row, i, &data_types[i])? { + Some(datum) => datum.to_string(), + None => self.partition_default_name.clone(), + }; + spec.insert(key.clone(), value); } Ok(spec) } @@ -1018,8 +1030,8 @@ mod tests { // Drop partitions "a" and "c" let partitions = vec![ - HashMap::from([("pt".to_string(), Datum::String("a".to_string()))]), - HashMap::from([("pt".to_string(), Datum::String("c".to_string()))]), + HashMap::from([("pt".to_string(), Some(Datum::String("a".to_string())))]), + HashMap::from([("pt".to_string(), Some(Datum::String("c".to_string())))]), ]; commit.truncate_partitions(partitions).await.unwrap(); @@ -1030,4 +1042,58 @@ mod tests { // 600 - 100 (a) - 300 (c) = 200 assert_eq!(snapshot.total_record_count(), Some(200)); } + + fn null_partition_bytes() -> Vec { + let mut builder = BinaryRowBuilder::new(1); + builder.set_null_at(0); + builder.build_serialized() + } + + #[tokio::test] + async fn test_overwrite_null_partition() { + let file_io = test_file_io(); + let table_path = "memory:/test_overwrite_null_partition"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_partitioned_commit(&file_io, table_path); + + // Append data for partition "a", "b", and NULL + commit + .commit(vec![ + CommitMessage::new( + partition_bytes("a"), + 0, + vec![test_data_file("data-a.parquet", 100)], + ), + CommitMessage::new( + partition_bytes("b"), + 0, + vec![test_data_file("data-b.parquet", 200)], + ), + CommitMessage::new( + null_partition_bytes(), + 0, + vec![test_data_file("data-null.parquet", 300)], + ), + ]) + .await + .unwrap(); + + // Overwrite NULL partition only — should NOT affect "a" or "b" + commit + .overwrite(vec![CommitMessage::new( + null_partition_bytes(), + 0, + vec![test_data_file("data-null2.parquet", 50)], + )]) + .await + .unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 2); + assert_eq!(snapshot.commit_kind(), &CommitKind::OVERWRITE); + // 600 - 300 (delete null) + 50 (add null2) = 350 + assert_eq!(snapshot.total_record_count(), Some(350)); + } } From b25cfa9ab536b0e98624e133ff3127cc15607768 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sun, 12 Apr 2026 17:31:59 +0800 Subject: [PATCH 3/4] Fix comment --- crates/paimon/src/table/table_write.rs | 236 +++++++++++++++++++++++-- 1 file changed, 226 insertions(+), 10 deletions(-) diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index eb0b58ee..a148ef51 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -26,7 +26,7 @@ use crate::spec::stats::BinaryTableStats; use crate::spec::PartitionComputer; use crate::spec::{ extract_datum_from_arrow, BinaryRow, BinaryRowBuilder, CoreOptions, DataField, DataFileMeta, - DataType, Datum, EMPTY_SERIALIZED_ROW, + EMPTY_SERIALIZED_ROW, }; use crate::table::commit_message::CommitMessage; use crate::table::Table; @@ -268,16 +268,16 @@ impl TableWrite { let bucket = if self.total_buckets <= 1 || self.bucket_key_indices.is_empty() { 0 } else { - let mut datums: Vec<(Datum, DataType)> = Vec::new(); - for &field_idx in &self.bucket_key_indices { + let mut builder = BinaryRowBuilder::new(self.bucket_key_indices.len() as i32); + for (pos, &field_idx) in self.bucket_key_indices.iter().enumerate() { let field = &fields[field_idx]; - let datum = extract_datum_from_arrow(batch, row_idx, field_idx, field.data_type())?; - if let Some(d) = datum { - datums.push((d, field.data_type().clone())); + match extract_datum_from_arrow(batch, row_idx, field_idx, field.data_type())? { + Some(datum) => builder.write_datum(pos, &datum, field.data_type()), + None => builder.set_null_at(pos), } } - let refs: Vec<(&Datum, &DataType)> = datums.iter().map(|(d, t)| (d, t)).collect(); - BinaryRow::compute_bucket_from_datums(&refs, self.total_buckets).unwrap_or(0) + let row = builder.build(); + (row.hash_code() % self.total_buckets).abs() }; Ok((partition_bytes, bucket)) @@ -514,10 +514,15 @@ mod tests { use super::*; use crate::catalog::Identifier; use crate::io::FileIOBuilder; - use crate::spec::{IntType, Schema, TableSchema, VarCharType}; + use crate::spec::{ + DataType, DecimalType, IntType, LocalZonedTimestampType, Schema, TableSchema, TimestampType, + VarCharType, + }; use crate::table::{SnapshotManager, TableCommit}; use arrow_array::Int32Array; - use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; + use arrow_schema::{ + DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, TimeUnit, + }; use std::sync::Arc; fn test_file_io() -> FileIO { @@ -738,6 +743,217 @@ mod tests { assert_eq!(total_rows, 4); } + fn test_bucketed_schema() -> TableSchema { + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .option("bucket", "4") + .option("bucket-key", "id") + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn test_bucketed_table(file_io: &FileIO, table_path: &str) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test_table"), + table_path.to_string(), + test_bucketed_schema(), + None, + ) + } + + /// Build a batch where the bucket-key column ("id") is nullable. + fn make_nullable_id_batch(ids: Vec>, values: Vec) -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", ArrowDataType::Int32, true), + ArrowField::new("value", ArrowDataType::Int32, false), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(ids)), + Arc::new(Int32Array::from(values)), + ], + ) + .unwrap() + } + + #[tokio::test] + async fn test_write_bucketed_with_null_bucket_key() { + let file_io = test_file_io(); + let table_path = "memory:/test_table_write_null_bk"; + setup_dirs(&file_io, table_path).await; + + let table = test_bucketed_table(&file_io, table_path); + let mut table_write = TableWrite::new(&table).unwrap(); + + // Row with NULL bucket key should not panic + let batch = make_nullable_id_batch(vec![None, Some(1), None], vec![10, 20, 30]); + table_write.write_arrow_batch(&batch).await.unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + let total_rows: i64 = messages + .iter() + .flat_map(|m| &m.new_files) + .map(|f| f.row_count) + .sum(); + assert_eq!(total_rows, 3); + } + + #[tokio::test] + async fn test_null_bucket_key_routes_consistently() { + let file_io = test_file_io(); + let table_path = "memory:/test_table_write_null_bk_consistent"; + setup_dirs(&file_io, table_path).await; + + let table = test_bucketed_table(&file_io, table_path); + let mut table_write = TableWrite::new(&table).unwrap(); + + // Two NULLs should land in the same bucket + let batch = make_nullable_id_batch(vec![None, None], vec![10, 20]); + table_write.write_arrow_batch(&batch).await.unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + // Both NULL-key rows must be in the same (partition, bucket) group + let null_bucket_rows: i64 = messages + .iter() + .flat_map(|m| &m.new_files) + .map(|f| f.row_count) + .sum(); + assert_eq!(null_bucket_rows, 2); + // All NULL-key rows go to exactly one bucket + assert_eq!(messages.len(), 1); + } + + #[tokio::test] + async fn test_null_vs_nonnull_bucket_key_differ() { + let file_io = test_file_io(); + let table_path = "memory:/test_table_write_null_vs_nonnull"; + setup_dirs(&file_io, table_path).await; + + let table = test_bucketed_table(&file_io, table_path); + + // Compute bucket for NULL key + let fields = table.schema().fields().to_vec(); + let tw = TableWrite::new(&table).unwrap(); + + let batch_null = make_nullable_id_batch(vec![None], vec![10]); + let (_, bucket_null) = tw + .extract_partition_bucket(&batch_null, 0, &fields) + .unwrap(); + + // Compute bucket for key = 0 (the value a null field's fixed bytes happen to be) + let batch_zero = make_nullable_id_batch(vec![Some(0)], vec![20]); + let (_, bucket_zero) = tw + .extract_partition_bucket(&batch_zero, 0, &fields) + .unwrap(); + + // A NULL bucket key must produce a BinaryRow with the null bit set, + // which hashes differently from a non-null 0 value. + // (With 4 buckets they could theoretically collide, but the hash codes differ.) + let mut builder_null = BinaryRowBuilder::new(1); + builder_null.set_null_at(0); + let hash_null = builder_null.build().hash_code(); + + let mut builder_zero = BinaryRowBuilder::new(1); + builder_zero.write_int(0, 0); + let hash_zero = builder_zero.build().hash_code(); + + assert_ne!(hash_null, hash_zero, "NULL and 0 should hash differently"); + // If hashes differ, buckets should differ (with 4 buckets, very likely) + // But we verify the hash difference is the important invariant + let _ = (bucket_null, bucket_zero); + } + + /// Mirrors Java's testUnCompactDecimalAndTimestampNullValueBucketNumber. + /// Non-compact types (Decimal(38,18), LocalZonedTimestamp(6), Timestamp(6)) + /// use variable-length encoding in BinaryRow — NULL handling must still work. + #[tokio::test] + async fn test_non_compact_null_bucket_key() { + let file_io = test_file_io(); + + let bucket_cols = ["d", "ltz", "ntz"]; + let total_buckets = 16; + + for bucket_col in &bucket_cols { + let table_path = format!("memory:/test_null_bk_{bucket_col}"); + setup_dirs(&file_io, &table_path).await; + + let schema = Schema::builder() + .column("d", DataType::Decimal(DecimalType::new(38, 18).unwrap())) + .column( + "ltz", + DataType::LocalZonedTimestamp(LocalZonedTimestampType::new(6).unwrap()), + ) + .column("ntz", DataType::Timestamp(TimestampType::new(6).unwrap())) + .column("k", DataType::Int(IntType::new())) + .option("bucket", total_buckets.to_string()) + .option("bucket-key", *bucket_col) + .build() + .unwrap(); + let table_schema = TableSchema::new(0, &schema); + let table = Table::new( + file_io.clone(), + Identifier::new("default", "test_table"), + table_path.to_string(), + table_schema, + None, + ); + + let tw = TableWrite::new(&table).unwrap(); + let fields = table.schema().fields().to_vec(); + + // Build a batch: d=NULL, ltz=NULL, ntz=NULL, k=1 + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("d", ArrowDataType::Decimal128(38, 18), true), + ArrowField::new( + "ltz", + ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ), + ArrowField::new( + "ntz", + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + ArrowField::new("k", ArrowDataType::Int32, false), + ])); + let batch = RecordBatch::try_new( + arrow_schema, + vec![ + Arc::new( + arrow_array::Decimal128Array::from(vec![None::]) + .with_precision_and_scale(38, 18) + .unwrap(), + ), + Arc::new( + arrow_array::TimestampMicrosecondArray::from(vec![None::]) + .with_timezone("UTC"), + ), + Arc::new(arrow_array::TimestampMicrosecondArray::from(vec![ + None::, + ])), + Arc::new(Int32Array::from(vec![1])), + ], + ) + .unwrap(); + + let (_, bucket) = tw.extract_partition_bucket(&batch, 0, &fields).unwrap(); + + // Expected: BinaryRow with 1 field, null at pos 0 + let mut builder = BinaryRowBuilder::new(1); + builder.set_null_at(0); + let expected_bucket = (builder.build().hash_code() % total_buckets).abs(); + + assert_eq!( + bucket, expected_bucket, + "NULL bucket-key '{bucket_col}' should produce bucket {expected_bucket}, got {bucket}" + ); + } + } + #[tokio::test] async fn test_write_rolling_on_target_file_size() { let file_io = test_file_io(); From 71f3463553c5fd4f8bf2ec5952308ef65cbe0b14 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sun, 12 Apr 2026 17:50:16 +0800 Subject: [PATCH 4/4] Fix more Null for bucket --- crates/paimon/src/spec/binary_row.rs | 76 ++++-------------------- crates/paimon/src/table/bucket_filter.rs | 26 ++++---- crates/paimon/src/table/table_commit.rs | 5 -- crates/paimon/src/table/table_scan.rs | 60 +++++++++++++++++++ crates/paimon/src/table/table_write.rs | 21 ++++--- 5 files changed, 94 insertions(+), 94 deletions(-) diff --git a/crates/paimon/src/spec/binary_row.rs b/crates/paimon/src/spec/binary_row.rs index 630f189a..6599484c 100644 --- a/crates/paimon/src/spec/binary_row.rs +++ b/crates/paimon/src/spec/binary_row.rs @@ -281,82 +281,28 @@ impl BinaryRow { } /// Build a BinaryRow from typed Datum values using `BinaryRowBuilder`. - pub fn from_datums(datums: &[(&crate::spec::Datum, &crate::spec::DataType)]) -> Option { + /// `None` entries are written as null fields. + pub fn from_datums(datums: &[(Option<&crate::spec::Datum>, &crate::spec::DataType)]) -> Self { let arity = datums.len() as i32; let mut builder = BinaryRowBuilder::new(arity); - for (pos, (datum, data_type)) in datums.iter().enumerate() { - match datum { - crate::spec::Datum::Bool(v) => builder.write_boolean(pos, *v), - crate::spec::Datum::TinyInt(v) => builder.write_byte(pos, *v), - crate::spec::Datum::SmallInt(v) => builder.write_short(pos, *v), - crate::spec::Datum::Int(v) - | crate::spec::Datum::Date(v) - | crate::spec::Datum::Time(v) => builder.write_int(pos, *v), - crate::spec::Datum::Long(v) => builder.write_long(pos, *v), - crate::spec::Datum::Float(v) => builder.write_float(pos, *v), - crate::spec::Datum::Double(v) => builder.write_double(pos, *v), - crate::spec::Datum::Timestamp { millis, nanos } => { - let precision = match data_type { - crate::spec::DataType::Timestamp(ts) => ts.precision(), - _ => 3, - }; - if precision <= 3 { - builder.write_timestamp_compact(pos, *millis); - } else { - builder.write_timestamp_non_compact(pos, *millis, *nanos); - } - } - crate::spec::Datum::LocalZonedTimestamp { millis, nanos } => { - let precision = match data_type { - crate::spec::DataType::LocalZonedTimestamp(ts) => ts.precision(), - _ => 3, - }; - if precision <= 3 { - builder.write_timestamp_compact(pos, *millis); - } else { - builder.write_timestamp_non_compact(pos, *millis, *nanos); - } - } - crate::spec::Datum::Decimal { - unscaled, - precision, - .. - } => { - if *precision <= 18 { - builder.write_decimal_compact(pos, *unscaled as i64); - } else { - builder.write_decimal_var_len(pos, *unscaled); - } - } - crate::spec::Datum::String(s) => { - if s.len() <= 7 { - builder.write_string_inline(pos, s); - } else { - builder.write_string(pos, s); - } - } - crate::spec::Datum::Bytes(b) => { - if b.len() <= 7 { - builder.write_binary_inline(pos, b); - } else { - builder.write_binary(pos, b); - } - } + for (pos, (datum_opt, data_type)) in datums.iter().enumerate() { + match datum_opt { + Some(datum) => builder.write_datum(pos, datum, data_type), + None => builder.set_null_at(pos), } } - let row = builder.build(); - Some(row) + builder.build() } pub fn compute_bucket_from_datums( - datums: &[(&crate::spec::Datum, &crate::spec::DataType)], + datums: &[(Option<&crate::spec::Datum>, &crate::spec::DataType)], total_buckets: i32, - ) -> Option { - let row = Self::from_datums(datums)?; + ) -> i32 { + let row = Self::from_datums(datums); let hash = row.hash_code(); - Some((hash % total_buckets).abs()) + (hash % total_buckets).abs() } } diff --git a/crates/paimon/src/table/bucket_filter.rs b/crates/paimon/src/table/bucket_filter.rs index ea300dfd..80942d3e 100644 --- a/crates/paimon/src/table/bucket_filter.rs +++ b/crates/paimon/src/table/bucket_filter.rs @@ -101,14 +101,14 @@ pub(super) fn compute_target_buckets( } // Collect equal-value candidates per bucket key field (by projected index). - // Each field can have one value (Eq) or multiple values (In). + // Each field can have one value (Eq), multiple values (In), or NULL (IsNull). let num_keys = bucket_key_fields.len(); - let mut field_candidates: Vec>> = vec![None; num_keys]; + let mut field_candidates: Vec>>> = vec![None; num_keys]; collect_eq_candidates(bucket_predicate, &mut field_candidates); // All bucket key fields must have candidates. - let candidates: Vec<&Vec<&Datum>> = + let candidates: Vec<&Vec>> = field_candidates.iter().filter_map(|c| c.as_ref()).collect(); if candidates.len() != num_keys { return None; @@ -118,18 +118,15 @@ pub(super) fn compute_target_buckets( let mut buckets = HashSet::new(); let mut combo: Vec = vec![0; num_keys]; loop { - let datums: Vec<(&Datum, &DataType)> = (0..num_keys) + let datums: Vec<(Option<&Datum>, &DataType)> = (0..num_keys) .map(|i| { let vals = field_candidates[i].as_ref().unwrap(); (vals[combo[i]], bucket_key_fields[i].data_type()) }) .collect(); - if let Some(bucket) = BinaryRow::compute_bucket_from_datums(&datums, total_buckets) { - buckets.insert(bucket); - } else { - return None; - } + let bucket = BinaryRow::compute_bucket_from_datums(&datums, total_buckets); + buckets.insert(bucket); // Advance the combination counter (rightmost first). let mut carry = true; @@ -155,10 +152,10 @@ pub(super) fn compute_target_buckets( } } -/// Recursively collect Eq/In literal candidates from a predicate for each bucket key field. +/// Recursively collect Eq/In/IsNull literal candidates from a predicate for each bucket key field. fn collect_eq_candidates<'a>( predicate: &'a Predicate, - field_candidates: &mut Vec>>, + field_candidates: &mut Vec>>>, ) { match predicate { Predicate::And(children) => { @@ -176,14 +173,17 @@ fn collect_eq_candidates<'a>( match op { PredicateOperator::Eq => { if let Some(lit) = literals.first() { - field_candidates[*index] = Some(vec![lit]); + field_candidates[*index] = Some(vec![Some(lit)]); } } PredicateOperator::In => { if !literals.is_empty() { - field_candidates[*index] = Some(literals.iter().collect()); + field_candidates[*index] = Some(literals.iter().map(Some).collect()); } } + PredicateOperator::IsNull => { + field_candidates[*index] = Some(vec![None]); + } _ => {} } } diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index 06df5248..e14a4b77 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -813,11 +813,6 @@ mod tests { } fn partition_bytes(pt: &str) -> Vec { - use crate::spec::{DataType, VarCharType}; - let datum = Datum::String(pt.to_string()); - let dt = DataType::VarChar(VarCharType::string_type()); - let datums = vec![(&datum, &dt)]; - BinaryRow::from_datums(&datums).unwrap(); let mut builder = BinaryRowBuilder::new(1); if pt.len() <= 7 { builder.write_string_inline(0, pt); diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index 06fe87a0..1d2f621d 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -1375,4 +1375,64 @@ mod tests { let bucket = *buckets.iter().next().unwrap(); assert!((0..4).contains(&bucket)); } + + #[test] + fn test_compute_target_buckets_is_null() { + let fields = bucket_key_fields(); + let pred = Predicate::Leaf { + column: "id".into(), + index: 0, + data_type: DataType::Int(IntType::new()), + op: PredicateOperator::IsNull, + literals: vec![], + }; + + let buckets = compute_target_buckets(&pred, &fields, 4); + assert!(buckets.is_some(), "IsNull should determine a target bucket"); + let buckets = buckets.unwrap(); + assert_eq!(buckets.len(), 1); + let bucket = *buckets.iter().next().unwrap(); + assert!((0..4).contains(&bucket)); + + // Verify it matches the expected bucket from a null BinaryRow + let mut builder = BinaryRowBuilder::new(1); + builder.set_null_at(0); + let expected = (builder.build().hash_code() % 4).abs(); + assert_eq!(bucket, expected); + } + + #[test] + fn test_compute_target_buckets_composite_key_with_null() { + let fields = vec![ + DataField::new(0, "a".to_string(), DataType::Int(IntType::new())), + DataField::new(1, "b".to_string(), DataType::Int(IntType::new())), + ]; + // a = 1 AND b IS NULL + let pred = Predicate::And(vec![ + Predicate::Leaf { + column: "a".into(), + index: 0, + data_type: DataType::Int(IntType::new()), + op: PredicateOperator::Eq, + literals: vec![Datum::Int(1)], + }, + Predicate::Leaf { + column: "b".into(), + index: 1, + data_type: DataType::Int(IntType::new()), + op: PredicateOperator::IsNull, + literals: vec![], + }, + ]); + + let buckets = compute_target_buckets(&pred, &fields, 8); + assert!( + buckets.is_some(), + "Composite key with IsNull should determine a target bucket" + ); + let buckets = buckets.unwrap(); + assert_eq!(buckets.len(), 1); + let bucket = *buckets.iter().next().unwrap(); + assert!((0..8).contains(&bucket)); + } } diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index a148ef51..7d5252ee 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -26,7 +26,7 @@ use crate::spec::stats::BinaryTableStats; use crate::spec::PartitionComputer; use crate::spec::{ extract_datum_from_arrow, BinaryRow, BinaryRowBuilder, CoreOptions, DataField, DataFileMeta, - EMPTY_SERIALIZED_ROW, + DataType, Datum, EMPTY_SERIALIZED_ROW, }; use crate::table::commit_message::CommitMessage; use crate::table::Table; @@ -268,16 +268,15 @@ impl TableWrite { let bucket = if self.total_buckets <= 1 || self.bucket_key_indices.is_empty() { 0 } else { - let mut builder = BinaryRowBuilder::new(self.bucket_key_indices.len() as i32); - for (pos, &field_idx) in self.bucket_key_indices.iter().enumerate() { + let mut datums: Vec<(Option, DataType)> = Vec::new(); + for &field_idx in &self.bucket_key_indices { let field = &fields[field_idx]; - match extract_datum_from_arrow(batch, row_idx, field_idx, field.data_type())? { - Some(datum) => builder.write_datum(pos, &datum, field.data_type()), - None => builder.set_null_at(pos), - } + let datum = extract_datum_from_arrow(batch, row_idx, field_idx, field.data_type())?; + datums.push((datum, field.data_type().clone())); } - let row = builder.build(); - (row.hash_code() % self.total_buckets).abs() + let refs: Vec<(Option<&Datum>, &DataType)> = + datums.iter().map(|(d, t)| (d.as_ref(), t)).collect(); + BinaryRow::compute_bucket_from_datums(&refs, self.total_buckets) }; Ok((partition_bytes, bucket)) @@ -515,8 +514,8 @@ mod tests { use crate::catalog::Identifier; use crate::io::FileIOBuilder; use crate::spec::{ - DataType, DecimalType, IntType, LocalZonedTimestampType, Schema, TableSchema, TimestampType, - VarCharType, + DataType, DecimalType, IntType, LocalZonedTimestampType, Schema, TableSchema, + TimestampType, VarCharType, }; use crate::table::{SnapshotManager, TableCommit}; use arrow_array::Int32Array;