diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index e803153..dbe01e0 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -37,21 +37,23 @@ //! translatable partition-only conjuncts from DataFusion filters. mod catalog; -mod ddl; mod error; mod filter_pushdown; #[cfg(feature = "fulltext")] mod full_text_search; +mod merge_into; mod physical_plan; mod relation_planner; pub mod runtime; +mod sql_handler; mod table; +mod update; pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider}; -pub use ddl::PaimonDdlHandler; pub use error::to_datafusion_error; #[cfg(feature = "fulltext")] pub use full_text_search::{register_full_text_search, FullTextSearchFunction}; pub use physical_plan::PaimonTableScan; pub use relation_planner::PaimonRelationPlanner; +pub use sql_handler::PaimonSqlHandler; pub use table::PaimonTableProvider; diff --git a/crates/integrations/datafusion/src/merge_into.rs b/crates/integrations/datafusion/src/merge_into.rs new file mode 100644 index 0000000..3b04597 --- /dev/null +++ b/crates/integrations/datafusion/src/merge_into.rs @@ -0,0 +1,796 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! MERGE INTO execution for data evolution tables. +//! +//! This module provides the DataFusion-specific SQL parsing and JOIN execution layer. +//! The engine-agnostic merge logic (file metadata lookup, row grouping, reading originals, +//! applying updates, writing partial files, committing) lives in +//! [`paimon::table::DataEvolutionWriter`]. + +use std::collections::HashMap; +use std::sync::Arc; + +use datafusion::arrow::array::{RecordBatch, UInt64Array}; +use datafusion::arrow::compute; +use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; +use datafusion::datasource::MemTable; +use datafusion::error::{DataFusionError, Result as DFResult}; +use datafusion::prelude::{DataFrame, SessionContext}; +use datafusion::sql::sqlparser::ast::{ + AssignmentTarget, Merge, MergeAction, MergeClauseKind, MergeInsertKind, TableFactor, +}; + +use paimon::table::{DataEvolutionWriter, Table}; + +use crate::error::to_datafusion_error; + +/// Maximum number of retries when MERGE INTO conflicts with concurrent compaction. +const MERGE_INTO_MAX_RETRIES: u32 = 5; + +/// Execute a MERGE INTO statement on a data evolution Paimon table. +/// +/// The `ctx` must already have the target and source tables registered. +/// The `merge` AST node comes from sqlparser. +/// +/// When a concurrent compaction rewrites files that this MERGE INTO references, +/// the commit will fail with a row ID conflict. In that case, the entire operation +/// (scan + write + commit) is retried from scratch against the new file layout. +pub(crate) async fn execute_merge_into( + ctx: &SessionContext, + merge: &Merge, + table: Table, +) -> DFResult { + for retry in 0..MERGE_INTO_MAX_RETRIES { + match execute_merge_into_once(ctx, merge, &table).await { + Ok(df) => return Ok(df), + Err(e) if is_row_id_conflict(&e) => { + if retry + 1 >= MERGE_INTO_MAX_RETRIES { + return Err(DataFusionError::External(Box::new(std::io::Error::other( + format!( + "MERGE INTO failed after {} retries due to concurrent compaction: {}", + MERGE_INTO_MAX_RETRIES, e + ), + )))); + } + // Retry: re-execute the entire MERGE INTO against the new file layout + continue; + } + Err(e) => return Err(e), + } + } + unreachable!() +} + +/// Check if a DataFusion error is caused by a row ID conflict during commit. +pub(crate) fn is_row_id_conflict(err: &DataFusionError) -> bool { + match err { + DataFusionError::External(e) => e.to_string().contains("Row ID conflict"), + _ => false, + } +} + +/// Single attempt of MERGE INTO execution. +async fn execute_merge_into_once( + ctx: &SessionContext, + merge: &Merge, + table: &Table, +) -> DFResult { + // 1. Parse all MERGE clauses + let parsed = extract_merge_clauses(merge)?; + + // Validate preconditions early and create writer (before executing any SQL) + let update_writer = if let Some(ref upd) = parsed.update { + Some(DataEvolutionWriter::new(table, upd.columns.clone()).map_err(to_datafusion_error)?) + } else { + None + }; + + let (target_ref, target_alias) = extract_table_ref(&merge.table)?; + let (source_ref, source_alias) = extract_source_ref(&merge.source)?; + let on_condition = merge.on.to_string(); + let t_alias = target_alias.as_deref().unwrap_or(&target_ref); + let s_alias = source_alias.as_deref().unwrap_or(&source_ref); + + // 2. Build a single LEFT JOIN: source LEFT JOIN target + // _ROW_ID IS NOT NULL → matched (UPDATE path) + // _ROW_ID IS NULL → not matched (INSERT path) + let mut select_parts = vec![format!("{t_alias}.\"_ROW_ID\"")]; + + // Add update expressions (prefixed to avoid collisions) + if let Some(ref upd) = parsed.update { + for (col, expr) in upd.columns.iter().zip(upd.exprs.iter()) { + select_parts.push(format!("{expr} AS \"__upd_{col}\"")); + } + } + + // Add source columns for INSERT path (all source columns via s.*) + // We also need insert expressions if they differ from source columns + if !parsed.inserts.is_empty() { + select_parts.push(format!("{s_alias}.*")); + } + + let select_clause = select_parts.join(", "); + // Safety: all interpolated values (select_clause, source_ref, s_alias, t_alias, on_condition) + // originate from sqlparser AST's Display impl, so they are well-formed SQL fragments. + let join_sql = format!( + "SELECT {select_clause} FROM {source_ref} AS {s_alias} \ + LEFT JOIN {target_ref} AS {t_alias} ON {on_condition}" + ); + + let join_result = ctx.sql(&join_sql).await?.collect().await?; + + // 3. Split by _ROW_ID null/not-null + let mut all_messages = Vec::new(); + let mut total_count: u64 = 0; + + // Separate matched and not-matched rows + let (matched_batches, not_matched_batches) = split_by_row_id(&join_result)?; + + // 4. Handle matched rows (UPDATE) + if let Some(mut writer) = update_writer { + let upd = parsed.update.as_ref().unwrap(); + let matched_count: usize = matched_batches.iter().map(|b| b.num_rows()).sum(); + if matched_count > 0 { + // Extract _ROW_ID + update columns (rename __upd_X → X) + let update_batches = project_update_columns(&matched_batches, &upd.columns)?; + for batch in update_batches { + writer + .add_matched_batch(batch) + .map_err(to_datafusion_error)?; + } + let update_messages = writer.prepare_commit().await.map_err(to_datafusion_error)?; + all_messages.extend(update_messages); + total_count += matched_count as u64; + } + } + + // 5. Handle not-matched rows (INSERT) + if !parsed.inserts.is_empty() { + // Collect the exact set of injected column names to strip from JOIN result + let mut injected_columns: Vec = vec!["_ROW_ID".to_string()]; + if let Some(ref upd) = parsed.update { + for col in &upd.columns { + injected_columns.push(format!("__upd_{col}")); + } + } + // Table schema field names for reordering INSERT columns + let table_fields: Vec = table + .schema() + .fields() + .iter() + .map(|f| f.name().to_string()) + .collect(); + let insert_batches = build_insert_batches( + ctx, + ¬_matched_batches, + &parsed.inserts, + s_alias, + &injected_columns, + &table_fields, + ) + .await?; + let insert_count: usize = insert_batches.iter().map(|b| b.num_rows()).sum(); + if insert_count > 0 { + let mut table_write = table + .new_write_builder() + .new_write() + .map_err(to_datafusion_error)?; + for batch in &insert_batches { + table_write + .write_arrow_batch(batch) + .await + .map_err(to_datafusion_error)?; + } + let insert_messages = table_write + .prepare_commit() + .await + .map_err(to_datafusion_error)?; + all_messages.extend(insert_messages); + total_count += insert_count as u64; + } + } + + // 6. Commit all messages atomically + if !all_messages.is_empty() { + let commit = table.new_write_builder().new_commit(); + commit + .commit(all_messages) + .await + .map_err(to_datafusion_error)?; + } + + ok_result(ctx, total_count) +} + +/// Split join result into matched (_ROW_ID not null) and not-matched (_ROW_ID null) batches. +fn split_by_row_id(batches: &[RecordBatch]) -> DFResult<(Vec, Vec)> { + let mut matched = Vec::new(); + let mut not_matched = Vec::new(); + + for batch in batches { + if batch.num_rows() == 0 { + continue; + } + let row_id_col = batch.column_by_name("_ROW_ID").ok_or_else(|| { + DataFusionError::Internal("_ROW_ID column not found in join result".to_string()) + })?; + + let is_not_null = compute::is_not_null(row_id_col)?; + let is_null = compute::is_null(row_id_col)?; + + let matched_batch = compute::filter_record_batch(batch, &is_not_null)?; + if matched_batch.num_rows() > 0 { + matched.push(matched_batch); + } + + let not_matched_batch = compute::filter_record_batch(batch, &is_null)?; + if not_matched_batch.num_rows() > 0 { + not_matched.push(not_matched_batch); + } + } + + Ok((matched, not_matched)) +} + +/// Extract _ROW_ID + __upd_X columns from matched batches, renaming __upd_X → X. +pub(crate) fn project_update_columns( + batches: &[RecordBatch], + update_columns: &[String], +) -> DFResult> { + let mut result = Vec::new(); + for batch in batches { + let row_id_idx = batch + .schema() + .index_of("_ROW_ID") + .map_err(|e| DataFusionError::Internal(format!("_ROW_ID not found: {e}")))?; + + let mut columns = vec![batch.column(row_id_idx).clone()]; + let mut fields = vec![batch.schema().field(row_id_idx).clone()]; + + for col in update_columns { + let prefixed = format!("__upd_{col}"); + let idx = batch.schema().index_of(&prefixed).map_err(|e| { + DataFusionError::Internal(format!("Column {prefixed} not found: {e}")) + })?; + columns.push(batch.column(idx).clone()); + fields.push(Field::new( + col, + batch.schema().field(idx).data_type().clone(), + true, + )); + } + + let schema = Arc::new(Schema::new(fields)); + let projected = RecordBatch::try_new(schema, columns)?; + result.push(projected); + } + Ok(result) +} + +/// Build insert batches from not-matched rows, applying INSERT clause projections and predicates. +async fn build_insert_batches( + ctx: &SessionContext, + not_matched_batches: &[RecordBatch], + inserts: &[MergeInsertClause], + s_alias: &str, + injected_columns: &[String], + table_fields: &[String], +) -> DFResult> { + if not_matched_batches.is_empty() || not_matched_batches.iter().all(|b| b.num_rows() == 0) { + return Ok(Vec::new()); + } + + // Strip injected columns (_ROW_ID, __upd_*) — keep only source columns + let source_batches = strip_non_source_columns(not_matched_batches, injected_columns)?; + + // Register as temp table for SQL-based projection/filtering + let first_schema = source_batches[0].schema(); + let mem_table = MemTable::try_new(first_schema, vec![source_batches])?; + let tmp_name = format!("__merge_not_matched_{}", std::process::id()); + ctx.register_table(&tmp_name, Arc::new(mem_table))?; + + let result = build_insert_batches_inner(ctx, inserts, s_alias, &tmp_name, table_fields).await; + + // Always clean up temp table, even on error + let _ = ctx.deregister_table(&tmp_name); + + result +} + +/// Execute INSERT clause queries against the registered temp table. +async fn build_insert_batches_inner( + ctx: &SessionContext, + inserts: &[MergeInsertClause], + s_alias: &str, + tmp_name: &str, + table_fields: &[String], +) -> DFResult> { + let mut all_batches = Vec::new(); + let mut consumed_predicates: Vec = Vec::new(); + + for ins in inserts { + let mut conditions = Vec::new(); + for prev in &consumed_predicates { + conditions.push(format!("NOT ({prev})")); + } + if let Some(ref pred) = ins.predicate { + conditions.push(pred.clone()); + consumed_predicates.push(pred.clone()); + } + + let where_clause = if conditions.is_empty() { + String::new() + } else { + format!(" WHERE {}", conditions.join(" AND ")) + }; + + let select_clause = insert_select_clause(ins, table_fields); + let sql = format!("SELECT {select_clause} FROM {tmp_name} AS {s_alias}{where_clause}"); + + let batches = ctx.sql(&sql).await?.collect().await?; + all_batches.extend(batches); + } + + Ok(all_batches) +} + +/// Remove injected columns from batches, keeping only source columns. +fn strip_non_source_columns( + batches: &[RecordBatch], + injected_columns: &[String], +) -> DFResult> { + let mut result = Vec::new(); + for batch in batches { + let schema = batch.schema(); + let mut indices = Vec::new(); + let mut fields = Vec::new(); + for (i, field) in schema.fields().iter().enumerate() { + if injected_columns.contains(field.name()) { + continue; + } + indices.push(i); + fields.push(field.as_ref().clone()); + } + let new_schema = Arc::new(Schema::new(fields)); + let columns: Vec<_> = indices.iter().map(|&i| batch.column(i).clone()).collect(); + let projected = RecordBatch::try_new(new_schema, columns)?; + result.push(projected); + } + Ok(result) +} + +/// Build the SELECT clause for an INSERT clause, ordered by table schema fields. +/// +/// When the INSERT specifies explicit columns (`INSERT (col2, col1) VALUES (expr2, expr1)`), +/// the output must be reordered to match the table schema so that `write_arrow_batch` +/// (which reads columns by positional index) maps them correctly. +fn insert_select_clause(ins: &MergeInsertClause, table_fields: &[String]) -> String { + if ins.columns.is_empty() && ins.value_exprs.is_empty() { + "*".to_string() + } else { + // Build column_name -> expression mapping from the INSERT clause + let col_expr_map: HashMap = ins + .columns + .iter() + .zip(ins.value_exprs.iter()) + .map(|(col, expr)| (col.to_lowercase(), expr.as_str())) + .collect(); + + // Emit SELECT in table schema order + table_fields + .iter() + .map(|field| { + let key = field.to_lowercase(); + match col_expr_map.get(&key) { + Some(expr) => format!("{expr} AS \"{field}\""), + // Column not in INSERT list — fill with NULL + None => format!("NULL AS \"{field}\""), + } + }) + .collect::>() + .join(", ") + } +} + +/// Parsed WHEN NOT MATCHED THEN INSERT clause. +struct MergeInsertClause { + /// Column names from INSERT (col1, col2). Empty means INSERT *. + columns: Vec, + /// SQL expressions from VALUES(...). + value_exprs: Vec, + /// Optional AND predicate (SQL string). + predicate: Option, +} + +/// Parsed WHEN MATCHED THEN UPDATE clause. +struct MergeUpdateClause { + columns: Vec, + exprs: Vec, +} + +/// Parsed merge clauses. +struct ParsedMergeClauses { + update: Option, + inserts: Vec, +} + +/// Extract UPDATE and INSERT clauses from the MERGE AST. +fn extract_merge_clauses(merge: &Merge) -> DFResult { + let mut update: Option = None; + let mut inserts: Vec = Vec::new(); + + for clause in &merge.clauses { + match clause.clause_kind { + MergeClauseKind::Matched => { + if update.is_some() { + return Err(DataFusionError::Plan( + "Multiple WHEN MATCHED clauses are not yet supported".to_string(), + )); + } + if clause.predicate.is_some() { + return Err(DataFusionError::Plan( + "WHEN MATCHED AND is not yet supported".to_string(), + )); + } + match &clause.action { + MergeAction::Update(update_expr) => { + let mut columns = Vec::new(); + let mut exprs = Vec::new(); + for assignment in &update_expr.assignments { + let col_name = match &assignment.target { + AssignmentTarget::ColumnName(name) => name + .0 + .last() + .and_then(|p| p.as_ident()) + .map(|id| id.value.clone()) + .ok_or_else(|| { + DataFusionError::Plan(format!( + "Invalid column name in SET: {name}" + )) + })?, + AssignmentTarget::Tuple(_) => { + return Err(DataFusionError::Plan( + "Tuple assignment in MERGE INTO SET is not supported" + .to_string(), + )); + } + }; + columns.push(col_name); + exprs.push(assignment.value.to_string()); + } + update = Some(MergeUpdateClause { columns, exprs }); + } + MergeAction::Delete { .. } => { + return Err(DataFusionError::Plan( + "WHEN MATCHED THEN DELETE is not supported for data evolution tables" + .to_string(), + )); + } + MergeAction::Insert(_) => { + return Err(DataFusionError::Plan( + "WHEN MATCHED THEN INSERT is not valid SQL".to_string(), + )); + } + } + } + MergeClauseKind::NotMatched | MergeClauseKind::NotMatchedByTarget => { + match &clause.action { + MergeAction::Insert(insert_expr) => { + let columns: Vec = + insert_expr.columns.iter().map(|c| c.to_string()).collect(); + + let value_exprs = match &insert_expr.kind { + MergeInsertKind::Values(values) => { + if values.rows.is_empty() { + return Err(DataFusionError::Plan( + "INSERT VALUES must have at least one row".to_string(), + )); + } + values.rows[0].iter().map(|e| e.to_string()).collect() + } + MergeInsertKind::Row => { + // INSERT ROW — BigQuery syntax, treat as INSERT * + Vec::new() + } + }; + + let predicate = clause.predicate.as_ref().map(|p| p.to_string()); + + inserts.push(MergeInsertClause { + columns, + value_exprs, + predicate, + }); + } + _ => { + return Err(DataFusionError::Plan( + "WHEN NOT MATCHED only supports INSERT".to_string(), + )); + } + } + } + MergeClauseKind::NotMatchedBySource => { + return Err(DataFusionError::Plan( + "WHEN NOT MATCHED BY SOURCE is not yet supported for data evolution MERGE INTO" + .to_string(), + )); + } + } + } + + if update.is_none() && inserts.is_empty() { + return Err(DataFusionError::Plan( + "MERGE INTO requires at least one WHEN MATCHED or WHEN NOT MATCHED clause".to_string(), + )); + } + + Ok(ParsedMergeClauses { update, inserts }) +} + +/// Extract table name and optional alias from a TableFactor. +fn extract_table_ref(table: &TableFactor) -> DFResult<(String, Option)> { + match table { + TableFactor::Table { name, alias, .. } => { + let table_name = name.to_string(); + let alias_name = alias.as_ref().map(|a| a.name.value.clone()); + Ok((table_name, alias_name)) + } + other => Err(DataFusionError::Plan(format!( + "Unsupported table reference in MERGE INTO: {other}" + ))), + } +} + +/// Extract source reference (table or subquery) from a TableFactor. +fn extract_source_ref(source: &TableFactor) -> DFResult<(String, Option)> { + match source { + TableFactor::Table { name, alias, .. } => { + let table_name = name.to_string(); + let alias_name = alias.as_ref().map(|a| a.name.value.clone()); + Ok((table_name, alias_name)) + } + TableFactor::Derived { + subquery, alias, .. + } => { + let subquery_sql = format!("({subquery})"); + let alias_name = alias.as_ref().map(|a| a.name.value.clone()); + if alias_name.is_none() { + return Err(DataFusionError::Plan( + "Subquery source in MERGE INTO must have an alias".to_string(), + )); + } + Ok((subquery_sql, alias_name)) + } + other => Err(DataFusionError::Plan(format!( + "Unsupported source in MERGE INTO: {other}" + ))), + } +} + +/// Return a DataFrame with a single "count" column. +pub(crate) fn ok_result(ctx: &SessionContext, count: u64) -> DFResult { + let schema = Arc::new(Schema::new(vec![Field::new( + "count", + ArrowDataType::UInt64, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(UInt64Array::from(vec![count]))], + )?; + ctx.read_batch(batch) +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::prelude::SessionContext; + use datafusion::sql::sqlparser::dialect::GenericDialect; + use datafusion::sql::sqlparser::parser::Parser; + use paimon::catalog::Identifier; + use paimon::io::FileIOBuilder; + use paimon::spec::{DataType, IntType, Schema as PaimonSchema, TableSchema, VarCharType}; + + use crate::PaimonTableProvider; + + async fn setup_data_evolution_table() -> (SessionContext, Table) { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let table_path = "memory:/test_merge_into"; + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + file_io + .mkdirs(&format!("{table_path}/manifest/")) + .await + .unwrap(); + + let schema = PaimonSchema::builder() + .column("id", DataType::Int(IntType::new())) + .column("name", DataType::VarChar(VarCharType::string_type())) + .column("value", DataType::Int(IntType::new())) + .option("data-evolution.enabled", "true") + .option("row-tracking.enabled", "true") + .build() + .unwrap(); + let table_schema = TableSchema::new(0, &schema); + let table = Table::new( + file_io, + Identifier::new("default", "target"), + table_path.to_string(), + table_schema, + None, + ); + + let provider = PaimonTableProvider::try_new(table.clone()).unwrap(); + let ctx = SessionContext::new(); + ctx.register_table("target", Arc::new(provider)).unwrap(); + + (ctx, table) + } + + fn parse_merge(sql: &str) -> Merge { + let dialect = GenericDialect {}; + let stmts = Parser::parse_sql(&dialect, sql).unwrap(); + match stmts.into_iter().next().unwrap() { + datafusion::sql::sqlparser::ast::Statement::Merge(m) => m, + _ => panic!("Expected MERGE statement"), + } + } + + #[tokio::test] + async fn test_merge_into_updates_matched_rows() { + let (ctx, table) = setup_data_evolution_table().await; + + // Insert initial data + ctx.sql("INSERT INTO target (id, name, value) VALUES (1, 'alice', 10), (2, 'bob', 20), (3, 'charlie', 30)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Create source table with updates + ctx.sql( + "CREATE TABLE source (id INT, name VARCHAR) AS VALUES (1, 'ALICE'), (3, 'CHARLIE')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Execute MERGE INTO + let merge = parse_merge( + "MERGE INTO target t USING source s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET name = s.name", + ); + execute_merge_into(&ctx, &merge, table).await.unwrap(); + + let batches = ctx + .sql("SELECT id, name, value FROM target ORDER BY id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut rows = Vec::new(); + for batch in &batches { + let ids = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let names = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let values = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + rows.push((ids.value(i), names.value(i).to_string(), values.value(i))); + } + } + + assert_eq!( + rows, + vec![ + (1, "ALICE".to_string(), 10), + (2, "bob".to_string(), 20), + (3, "CHARLIE".to_string(), 30), + ] + ); + } + + #[tokio::test] + async fn test_merge_into_no_matches() { + let (ctx, table) = setup_data_evolution_table().await; + + ctx.sql("INSERT INTO target (id, name, value) VALUES (1, 'alice', 10)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + ctx.sql("CREATE TABLE source (id INT, name VARCHAR) AS VALUES (99, 'nobody')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let merge = parse_merge( + "MERGE INTO target t USING source s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET name = s.name", + ); + let result = execute_merge_into(&ctx, &merge, table).await.unwrap(); + let batches = result.collect().await.unwrap(); + let count = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(0); + assert_eq!(count, 0); + } + + #[tokio::test] + async fn test_merge_into_rejects_non_data_evolution_table() { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let table_path = "memory:/test_merge_reject"; + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + file_io + .mkdirs(&format!("{table_path}/manifest/")) + .await + .unwrap(); + + let schema = PaimonSchema::builder() + .column("id", DataType::Int(IntType::new())) + .build() + .unwrap(); + let table_schema = TableSchema::new(0, &schema); + let table = Table::new( + file_io, + Identifier::new("default", "t"), + table_path.to_string(), + table_schema, + None, + ); + + let ctx = SessionContext::new(); + let merge = parse_merge( + "MERGE INTO t USING s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET id = s.id", + ); + let result = execute_merge_into(&ctx, &merge, table).await; + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("data-evolution.enabled")); + } +} diff --git a/crates/integrations/datafusion/src/ddl.rs b/crates/integrations/datafusion/src/sql_handler.rs similarity index 94% rename from crates/integrations/datafusion/src/ddl.rs rename to crates/integrations/datafusion/src/sql_handler.rs index feace72..78f64bb 100644 --- a/crates/integrations/datafusion/src/ddl.rs +++ b/crates/integrations/datafusion/src/sql_handler.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -//! DDL support for Paimon tables. +//! SQL support for Paimon tables. //! -//! DataFusion does not natively support all DDL statements needed by Paimon. -//! This module provides [`PaimonDdlHandler`] which intercepts CREATE TABLE and -//! ALTER TABLE SQL, translates them to Paimon catalog operations, and delegates -//! everything else (SELECT, CREATE/DROP SCHEMA, DROP TABLE, etc.) to the -//! underlying [`SessionContext`]. +//! DataFusion does not natively support all SQL statements needed by Paimon. +//! This module provides [`PaimonSqlHandler`] which intercepts CREATE TABLE, +//! ALTER TABLE, MERGE INTO, UPDATE and other SQL, translates them to Paimon +//! catalog operations, and delegates everything else (SELECT, CREATE/DROP +//! SCHEMA, DROP TABLE, etc.) to the underlying [`SessionContext`]. //! //! Supported DDL: //! - `CREATE TABLE db.t (col TYPE, ..., PRIMARY KEY (col, ...)) [PARTITIONED BY (col TYPE, ...)] [WITH ('key' = 'val')]` @@ -38,8 +38,8 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError, Result as DFResult}; use datafusion::prelude::{DataFrame, SessionContext}; use datafusion::sql::sqlparser::ast::{ - AlterTableOperation, ColumnDef, CreateTable, CreateTableOptions, HiveDistributionStyle, - ObjectName, RenameTableNameKind, SqlOption, Statement, + AlterTableOperation, ColumnDef, CreateTable, CreateTableOptions, HiveDistributionStyle, Merge, + ObjectName, RenameTableNameKind, SqlOption, Statement, TableFactor, Update, }; use datafusion::sql::sqlparser::dialect::GenericDialect; use datafusion::sql::sqlparser::parser::Parser; @@ -56,17 +56,17 @@ use paimon::arrow::arrow_to_paimon_type; /// /// # Example /// ```ignore -/// let handler = PaimonDdlHandler::new(ctx, catalog); +/// let handler = PaimonSqlHandler::new(ctx, catalog); /// let df = handler.sql("ALTER TABLE paimon.db.t ADD COLUMN age INT").await?; /// ``` -pub struct PaimonDdlHandler { +pub struct PaimonSqlHandler { ctx: SessionContext, catalog: Arc, /// The catalog name registered in the SessionContext (used to strip the catalog prefix). catalog_name: String, } -impl PaimonDdlHandler { +impl PaimonSqlHandler { pub fn new( ctx: SessionContext, catalog: Arc, @@ -107,6 +107,8 @@ impl PaimonDdlHandler { ) .await } + Statement::Merge(merge) => self.handle_merge_into(merge).await, + Statement::Update(update) => self.handle_update(update).await, _ => self.ctx.sql(sql).await, } } @@ -248,6 +250,48 @@ impl PaimonDdlHandler { ok_result(&self.ctx) } + async fn handle_merge_into(&self, merge: &Merge) -> DFResult { + // Resolve the target table name from the MERGE INTO clause + let table_name = match &merge.table { + TableFactor::Table { name, .. } => name.clone(), + other => { + return Err(DataFusionError::Plan(format!( + "Unsupported target table in MERGE INTO: {other}" + ))) + } + }; + let identifier = self.resolve_table_name(&table_name)?; + + // Load the Paimon table from the catalog + let table = self + .catalog + .get_table(&identifier) + .await + .map_err(to_datafusion_error)?; + + crate::merge_into::execute_merge_into(&self.ctx, merge, table).await + } + + async fn handle_update(&self, update: &Update) -> DFResult { + let table_name = match &update.table.relation { + TableFactor::Table { name, .. } => name.clone(), + other => { + return Err(DataFusionError::Plan(format!( + "Unsupported target table in UPDATE: {other}" + ))) + } + }; + let identifier = self.resolve_table_name(&table_name)?; + + let table = self + .catalog + .get_table(&identifier) + .await + .map_err(to_datafusion_error)?; + + crate::update::execute_update(&self.ctx, update, table).await + } + /// Resolve an ObjectName like `paimon.db.table` or `db.table` to a Paimon Identifier. fn resolve_table_name(&self, name: &ObjectName) -> DFResult { let parts: Vec = name @@ -571,8 +615,8 @@ mod tests { } } - fn make_handler(catalog: Arc) -> PaimonDdlHandler { - PaimonDdlHandler::new(SessionContext::new(), catalog, "paimon") + fn make_handler(catalog: Arc) -> PaimonSqlHandler { + PaimonSqlHandler::new(SessionContext::new(), catalog, "paimon") } // ==================== sql_data_type_to_arrow tests ==================== @@ -905,7 +949,7 @@ mod tests { } } - // ==================== PaimonDdlHandler::sql integration tests ==================== + // ==================== PaimonSqlHandler::sql integration tests ==================== #[tokio::test] async fn test_create_table_basic() { diff --git a/crates/integrations/datafusion/src/update.rs b/crates/integrations/datafusion/src/update.rs new file mode 100644 index 0000000..c530bc7 --- /dev/null +++ b/crates/integrations/datafusion/src/update.rs @@ -0,0 +1,410 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! UPDATE execution for data evolution tables. +//! +//! This module provides the DataFusion-specific SQL parsing and execution layer for +//! `UPDATE ... SET ... WHERE ...` statements. The engine-agnostic update logic +//! (file metadata lookup, row grouping, reading originals, applying updates, +//! writing partial files, committing) lives in [`paimon::table::DataEvolutionWriter`]. + +use datafusion::error::{DataFusionError, Result as DFResult}; +use datafusion::prelude::{DataFrame, SessionContext}; +use datafusion::sql::sqlparser::ast::{AssignmentTarget, Update}; + +use paimon::table::{DataEvolutionWriter, Table}; + +use crate::error::to_datafusion_error; +use crate::merge_into::{is_row_id_conflict, ok_result, project_update_columns}; + +/// Maximum number of retries when UPDATE conflicts with concurrent compaction. +const UPDATE_MAX_RETRIES: u32 = 5; + +/// Execute an UPDATE statement on a data evolution Paimon table. +/// +/// When a concurrent compaction rewrites files that this UPDATE references, +/// the commit will fail with a row ID conflict. In that case, the entire operation +/// is retried from scratch against the new file layout. +pub(crate) async fn execute_update( + ctx: &SessionContext, + update: &Update, + table: Table, +) -> DFResult { + let mut last_err = None; + for _ in 0..UPDATE_MAX_RETRIES { + match execute_update_once(ctx, update, &table).await { + Ok(df) => return Ok(df), + Err(e) if is_row_id_conflict(&e) => { + last_err = Some(e); + continue; + } + Err(e) => return Err(e), + } + } + Err(DataFusionError::External(Box::new(std::io::Error::other( + format!( + "UPDATE failed after {} retries due to concurrent compaction: {}", + UPDATE_MAX_RETRIES, + last_err.unwrap() + ), + )))) +} + +/// Single attempt of UPDATE execution. +async fn execute_update_once( + ctx: &SessionContext, + update: &Update, + table: &Table, +) -> DFResult { + // 1. Extract SET assignments + let mut columns = Vec::new(); + let mut exprs = Vec::new(); + for assignment in &update.assignments { + let col_name = match &assignment.target { + AssignmentTarget::ColumnName(name) => name + .0 + .last() + .and_then(|p| p.as_ident()) + .map(|id| id.value.clone()) + .ok_or_else(|| { + DataFusionError::Plan(format!("Invalid column name in SET: {name}")) + })?, + AssignmentTarget::Tuple(_) => { + return Err(DataFusionError::Plan( + "Tuple assignment in UPDATE SET is not supported".to_string(), + )); + } + }; + columns.push(col_name); + exprs.push(assignment.value.to_string()); + } + + // 2. Create DataEvolutionWriter (validates preconditions) + let mut writer = + DataEvolutionWriter::new(table, columns.clone()).map_err(to_datafusion_error)?; + + // 3. Query the target table directly with WHERE filter. + let table_ref = update.table.to_string(); + + let select_parts: Vec = std::iter::once("\"_ROW_ID\"".to_string()) + .chain( + columns + .iter() + .zip(exprs.iter()) + .map(|(col, expr)| format!("{expr} AS \"__upd_{col}\"")), + ) + .collect(); + + let select_clause = select_parts.join(", "); + let where_clause = match &update.selection { + Some(expr) => format!(" WHERE {expr}"), + None => String::new(), + }; + + let query_sql = format!("SELECT {select_clause} FROM {table_ref}{where_clause}"); + let batches = ctx.sql(&query_sql).await?.collect().await?; + + // 4. Project update columns (rename __upd_X → X) + let total_count: u64 = batches.iter().map(|b| b.num_rows() as u64).sum(); + if total_count == 0 { + return ok_result(ctx, 0); + } + + let update_batches = project_update_columns(&batches, &columns)?; + for batch in update_batches { + writer + .add_matched_batch(batch) + .map_err(to_datafusion_error)?; + } + + // 5. Commit + let messages = writer.prepare_commit().await.map_err(to_datafusion_error)?; + if !messages.is_empty() { + let commit = table.new_write_builder().new_commit(); + commit.commit(messages).await.map_err(to_datafusion_error)?; + } + + ok_result(ctx, total_count) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + + use datafusion::arrow::array::{Int32Array, StringArray, UInt64Array}; + use datafusion::prelude::SessionContext; + use datafusion::sql::sqlparser::dialect::GenericDialect; + use datafusion::sql::sqlparser::parser::Parser; + use paimon::catalog::Identifier; + use paimon::io::FileIOBuilder; + use paimon::spec::{DataType, IntType, Schema as PaimonSchema, TableSchema, VarCharType}; + + use crate::PaimonTableProvider; + + async fn setup_data_evolution_table(name: &str) -> (SessionContext, Table) { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let table_path = format!("memory:/test_update_{name}"); + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + file_io + .mkdirs(&format!("{table_path}/manifest/")) + .await + .unwrap(); + + let schema = PaimonSchema::builder() + .column("id", DataType::Int(IntType::new())) + .column("name", DataType::VarChar(VarCharType::string_type())) + .column("value", DataType::Int(IntType::new())) + .option("data-evolution.enabled", "true") + .option("row-tracking.enabled", "true") + .build() + .unwrap(); + let table_schema = TableSchema::new(0, &schema); + let table = Table::new( + file_io, + Identifier::new("default", "target"), + table_path, + table_schema, + None, + ); + + let provider = PaimonTableProvider::try_new(table.clone()).unwrap(); + let ctx = SessionContext::new(); + ctx.register_table("target", Arc::new(provider)).unwrap(); + + (ctx, table) + } + + fn parse_update(sql: &str) -> Update { + let dialect = GenericDialect {}; + let stmts = Parser::parse_sql(&dialect, sql).unwrap(); + match stmts.into_iter().next().unwrap() { + datafusion::sql::sqlparser::ast::Statement::Update(u) => u, + _ => panic!("Expected UPDATE statement"), + } + } + + fn collect_rows(batches: &[datafusion::arrow::array::RecordBatch]) -> Vec<(i32, String, i32)> { + let mut rows = Vec::new(); + for batch in batches { + let ids = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let names = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let values = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + rows.push((ids.value(i), names.value(i).to_string(), values.value(i))); + } + } + rows + } + + #[tokio::test] + async fn test_update_with_where() { + let (ctx, table) = setup_data_evolution_table("with_where").await; + + ctx.sql("INSERT INTO target (id, name, value) VALUES (1, 'alice', 10), (2, 'bob', 20), (3, 'charlie', 30)") + .await.unwrap().collect().await.unwrap(); + + let update = parse_update("UPDATE target SET name = 'ALICE' WHERE id = 1"); + execute_update(&ctx, &update, table).await.unwrap(); + + let batches = ctx + .sql("SELECT id, name, value FROM target ORDER BY id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_rows(&batches); + assert_eq!( + rows, + vec![ + (1, "ALICE".to_string(), 10), + (2, "bob".to_string(), 20), + (3, "charlie".to_string(), 30), + ] + ); + } + + #[tokio::test] + async fn test_update_without_where() { + let (ctx, table) = setup_data_evolution_table("without_where").await; + + ctx.sql("INSERT INTO target (id, name, value) VALUES (1, 'alice', 10), (2, 'bob', 20)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let update = parse_update("UPDATE target SET value = 99"); + execute_update(&ctx, &update, table).await.unwrap(); + + let batches = ctx + .sql("SELECT id, name, value FROM target ORDER BY id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_rows(&batches); + assert_eq!( + rows, + vec![(1, "alice".to_string(), 99), (2, "bob".to_string(), 99),] + ); + } + + #[tokio::test] + async fn test_update_multiple_columns() { + let (ctx, table) = setup_data_evolution_table("multi_col").await; + + ctx.sql("INSERT INTO target (id, name, value) VALUES (1, 'alice', 10), (2, 'bob', 20)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let update = parse_update("UPDATE target SET name = 'updated', value = 0 WHERE id = 2"); + execute_update(&ctx, &update, table).await.unwrap(); + + let batches = ctx + .sql("SELECT id, name, value FROM target ORDER BY id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_rows(&batches); + assert_eq!( + rows, + vec![(1, "alice".to_string(), 10), (2, "updated".to_string(), 0),] + ); + } + + #[tokio::test] + async fn test_update_no_matching_rows() { + let (ctx, table) = setup_data_evolution_table("no_match").await; + + ctx.sql("INSERT INTO target (id, name, value) VALUES (1, 'alice', 10)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let update = parse_update("UPDATE target SET name = 'nobody' WHERE id = 99"); + let result = execute_update(&ctx, &update, table).await.unwrap(); + let batches = result.collect().await.unwrap(); + let count = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(0); + assert_eq!(count, 0); + } + + #[tokio::test] + async fn test_update_row_id_stability() { + let (ctx, table) = setup_data_evolution_table("row_id").await; + + ctx.sql("INSERT INTO target (id, name, value) VALUES (1, 'alice', 10), (2, 'bob', 20)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Get row IDs before update + let before = ctx + .sql("SELECT id, \"_ROW_ID\" FROM target ORDER BY id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let update = parse_update("UPDATE target SET name = 'ALICE' WHERE id = 1"); + execute_update(&ctx, &update, table).await.unwrap(); + + // Get row IDs after update + let after = ctx + .sql("SELECT id, \"_ROW_ID\" FROM target ORDER BY id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Row IDs should remain the same + assert_eq!(before, after); + } + + #[tokio::test] + async fn test_update_rejects_non_data_evolution_table() { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let table_path = "memory:/test_update_reject"; + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + file_io + .mkdirs(&format!("{table_path}/manifest/")) + .await + .unwrap(); + + let schema = PaimonSchema::builder() + .column("id", DataType::Int(IntType::new())) + .build() + .unwrap(); + let table_schema = TableSchema::new(0, &schema); + let table = Table::new( + file_io, + Identifier::new("default", "t"), + table_path.to_string(), + table_schema, + None, + ); + + let ctx = SessionContext::new(); + let update = parse_update("UPDATE t SET id = 1"); + let result = execute_update(&ctx, &update, table).await; + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("data-evolution.enabled")); + } +} diff --git a/crates/integrations/datafusion/tests/merge_into_tests.rs b/crates/integrations/datafusion/tests/merge_into_tests.rs new file mode 100644 index 0000000..4a4386e --- /dev/null +++ b/crates/integrations/datafusion/tests/merge_into_tests.rs @@ -0,0 +1,1212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! MERGE INTO integration tests for data evolution tables. +//! +//! Covers row tracking, `_ROW_ID` stability, multiple merges, self-merge, +//! join on `_ROW_ID`, and error path validation. +//! Reference: Java Paimon's `RowTrackingTestBase`. + +use std::sync::Arc; + +use arrow_array::{Int32Array, Int64Array, StringArray}; +use datafusion::prelude::SessionContext; +use paimon::catalog::Identifier; +use paimon::table::SnapshotManager; +use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options}; +use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner, PaimonSqlHandler}; +use tempfile::TempDir; + +// ======================= Helpers ======================= + +fn create_test_env() -> (TempDir, Arc) { + let temp_dir = TempDir::new().expect("Failed to create temp dir"); + let warehouse = format!("file://{}", temp_dir.path().display()); + let mut options = Options::new(); + options.set(CatalogOptions::WAREHOUSE, warehouse); + let catalog = FileSystemCatalog::new(options).expect("Failed to create catalog"); + (temp_dir, Arc::new(catalog)) +} + +fn create_handler(catalog: Arc) -> PaimonSqlHandler { + let ctx = SessionContext::new(); + ctx.register_catalog( + "paimon", + Arc::new(PaimonCatalogProvider::new(catalog.clone())), + ); + ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new())) + .expect("Failed to register relation planner"); + PaimonSqlHandler::new(ctx, catalog, "paimon") +} + +async fn setup_data_evolution_table(handler: &PaimonSqlHandler) { + handler + .sql("CREATE SCHEMA paimon.test_db") + .await + .expect("CREATE SCHEMA failed"); + handler + .sql( + "CREATE TABLE paimon.test_db.target (\ + id INT NOT NULL, name STRING, value INT\ + ) WITH (\ + 'data-evolution.enabled' = 'true', \ + 'row-tracking.enabled' = 'true'\ + )", + ) + .await + .expect("CREATE TABLE failed"); +} + +async fn collect_rows_3col(handler: &PaimonSqlHandler, sql: &str) -> Vec<(i32, String, i32)> { + let batches = handler.sql(sql).await.unwrap().collect().await.unwrap(); + let mut rows = Vec::new(); + for batch in &batches { + let ids = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let names = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let values = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + rows.push((ids.value(i), names.value(i).to_string(), values.value(i))); + } + } + rows +} + +async fn collect_row_ids(handler: &PaimonSqlHandler, sql: &str) -> Vec<(i64, i32)> { + let batches = handler.sql(sql).await.unwrap().collect().await.unwrap(); + let mut rows = Vec::new(); + for batch in &batches { + let row_ids = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let ids = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + rows.push((row_ids.value(i), ids.value(i))); + } + } + rows +} + +async fn assert_merge_error(handler: &PaimonSqlHandler, sql: &str, expected_substring: &str) { + let result = handler.sql(sql).await; + assert!( + result.is_err(), + "Expected error containing '{expected_substring}', but got Ok" + ); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains(expected_substring), + "Error message '{err_msg}' does not contain '{expected_substring}'" + ); +} + +async fn register_source(handler: &PaimonSqlHandler, sql: &str) { + handler + .ctx() + .sql(sql) + .await + .unwrap() + .collect() + .await + .unwrap(); +} + +// ======================= Functional Tests ======================= + +#[tokio::test] +async fn test_row_id_values_after_insert() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (1, 'alice', 10), (2, 'bob', 20), (3, 'charlie', 30)") + .await.unwrap().collect().await.unwrap(); + + let batches = handler + .sql("SELECT \"_ROW_ID\" FROM paimon.test_db.target ORDER BY \"_ROW_ID\"") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut row_ids = Vec::new(); + for batch in &batches { + let col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + row_ids.push(col.value(i)); + } + } + + assert_eq!( + row_ids, + vec![0, 1, 2], + "_ROW_ID should be 0-based sequential" + ); + // Verify uniqueness + let mut deduped = row_ids.clone(); + deduped.sort(); + deduped.dedup(); + assert_eq!( + deduped.len(), + row_ids.len(), + "_ROW_ID values must be unique" + ); +} + +#[tokio::test] +async fn test_row_id_stability_after_merge_into() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (1, 'alice', 10), (2, 'bob', 20), (3, 'charlie', 30)") + .await.unwrap().collect().await.unwrap(); + + // Capture _ROW_ID -> id mapping before merge + let before = collect_row_ids( + &handler, + "SELECT \"_ROW_ID\", id FROM paimon.test_db.target ORDER BY id", + ) + .await; + + // Register source and execute MERGE INTO + register_source( + &handler, + "CREATE TABLE source1 (id INT, name VARCHAR) AS VALUES (1, 'ALICE'), (3, 'CHARLIE')", + ) + .await; + + handler + .sql( + "MERGE INTO paimon.test_db.target t USING source1 s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET name = s.name", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Capture _ROW_ID -> id mapping after merge + let after = collect_row_ids( + &handler, + "SELECT \"_ROW_ID\", id FROM paimon.test_db.target ORDER BY id", + ) + .await; + + // _ROW_ID must be identical for all rows + assert_eq!( + before, after, + "_ROW_ID values must not change after MERGE INTO" + ); + + // Verify data correctness + let rows = collect_rows_3col( + &handler, + "SELECT id, name, value FROM paimon.test_db.target ORDER BY id", + ) + .await; + assert_eq!( + rows, + vec![ + (1, "ALICE".to_string(), 10), + (2, "bob".to_string(), 20), + (3, "CHARLIE".to_string(), 30), + ] + ); +} + +#[tokio::test] +async fn test_multiple_merge_into_different_columns() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (1, 'alice', 10), (2, 'bob', 20)") + .await.unwrap().collect().await.unwrap(); + + // First MERGE: update name for id=1 + register_source( + &handler, + "CREATE TABLE src_name (id INT, name VARCHAR) AS VALUES (1, 'ALICE')", + ) + .await; + handler + .sql( + "MERGE INTO paimon.test_db.target t USING src_name s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET name = s.name", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Second MERGE: update value for id=2 + register_source( + &handler, + "CREATE TABLE src_value (id INT, value INT) AS VALUES (2, 200)", + ) + .await; + handler + .sql( + "MERGE INTO paimon.test_db.target t USING src_value s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET value = s.value", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_rows_3col( + &handler, + "SELECT id, name, value FROM paimon.test_db.target ORDER BY id", + ) + .await; + assert_eq!( + rows, + vec![(1, "ALICE".to_string(), 10), (2, "bob".to_string(), 200),] + ); +} + +#[tokio::test] +async fn test_merge_into_with_non_paimon_source() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (1, 'alice', 10), (2, 'bob', 20)") + .await.unwrap().collect().await.unwrap(); + + // Source is a plain DataFusion in-memory table, not Paimon + register_source( + &handler, + "CREATE TABLE df_source (id INT, name VARCHAR) AS VALUES (2, 'BOB_UPDATED')", + ) + .await; + + handler + .sql( + "MERGE INTO paimon.test_db.target t USING df_source s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET name = s.name", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_rows_3col( + &handler, + "SELECT id, name, value FROM paimon.test_db.target ORDER BY id", + ) + .await; + assert_eq!( + rows, + vec![ + (1, "alice".to_string(), 10), + (2, "BOB_UPDATED".to_string(), 20), + ] + ); +} + +#[tokio::test] +async fn test_merge_into_join_on_row_id() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (1, 'alice', 10), (2, 'bob', 20), (3, 'charlie', 30)") + .await.unwrap().collect().await.unwrap(); + + // Get _ROW_ID for id=2 + let row_id_map = collect_row_ids( + &handler, + "SELECT \"_ROW_ID\", id FROM paimon.test_db.target ORDER BY id", + ) + .await; + let row_id_of_2 = row_id_map.iter().find(|(_, id)| *id == 2).unwrap().0; + + // Create source with that row_id + register_source( + &handler, + &format!( + "CREATE TABLE rid_source (row_id BIGINT, name VARCHAR) AS VALUES ({row_id_of_2}, 'BOB')" + ), + ) + .await; + + handler + .sql( + "MERGE INTO paimon.test_db.target t USING rid_source s ON t.\"_ROW_ID\" = s.row_id \ + WHEN MATCHED THEN UPDATE SET name = s.name", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_rows_3col( + &handler, + "SELECT id, name, value FROM paimon.test_db.target ORDER BY id", + ) + .await; + assert_eq!( + rows, + vec![ + (1, "alice".to_string(), 10), + (2, "BOB".to_string(), 20), + (3, "charlie".to_string(), 30), + ] + ); +} + +#[tokio::test] +async fn test_self_merge() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (1, 'alice', 10), (2, 'bob', 20)") + .await.unwrap().collect().await.unwrap(); + + // Self-merge: target used as both target and source + handler + .sql( + "MERGE INTO paimon.test_db.target t USING paimon.test_db.target s \ + ON t.\"_ROW_ID\" = s.\"_ROW_ID\" \ + WHEN MATCHED THEN UPDATE SET name = UPPER(s.name)", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_rows_3col( + &handler, + "SELECT id, name, value FROM paimon.test_db.target ORDER BY id", + ) + .await; + assert_eq!( + rows, + vec![(1, "ALICE".to_string(), 10), (2, "BOB".to_string(), 20),] + ); +} + +#[tokio::test] +async fn test_row_count_after_merge() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (1, 'alice', 10), (2, 'bob', 20), (3, 'charlie', 30)") + .await.unwrap().collect().await.unwrap(); + + // Snapshot 1: 3 rows inserted + let table = catalog + .get_table(&Identifier::new("test_db", "target")) + .await + .unwrap(); + let snap_mgr = SnapshotManager::new(table.file_io().clone(), table.location().to_string()); + let snap1 = snap_mgr.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snap1.next_row_id(), Some(3)); + + // MERGE INTO: update 1 row + register_source( + &handler, + "CREATE TABLE src_count (id INT, name VARCHAR) AS VALUES (1, 'ALICE')", + ) + .await; + handler + .sql( + "MERGE INTO paimon.test_db.target t USING src_count s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET name = s.name", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Snapshot 2: MERGE INTO should NOT allocate new row IDs + let snap2 = snap_mgr.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!( + snap2.next_row_id(), + Some(3), + "MERGE INTO should not allocate new row IDs" + ); +} + +// ======================= Error Path Tests ======================= + +// ======================= WHEN NOT MATCHED THEN INSERT Tests ======================= + +#[tokio::test] +async fn test_merge_into_update_and_insert() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (2, 'bob', 20), (3, 'charlie', 30)") + .await.unwrap().collect().await.unwrap(); + + register_source( + &handler, + "CREATE TABLE src_ui (id INT, name VARCHAR, value INT) AS VALUES (1, 'alice', 11), (2, 'BOB', 22)", + ) + .await; + + handler + .sql( + "MERGE INTO paimon.test_db.target t USING src_ui s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET name = s.name \ + WHEN NOT MATCHED THEN INSERT (id, name, value) VALUES (s.id, s.name, s.value)", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_rows_3col( + &handler, + "SELECT id, name, value FROM paimon.test_db.target ORDER BY id", + ) + .await; + assert_eq!( + rows, + vec![ + (1, "alice".to_string(), 11), // inserted + (2, "BOB".to_string(), 20), // updated (name only) + (3, "charlie".to_string(), 30), // untouched + ] + ); +} + +#[tokio::test] +async fn test_merge_into_insert_only() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (2, 'bob', 20), (3, 'charlie', 30)") + .await.unwrap().collect().await.unwrap(); + + register_source( + &handler, + "CREATE TABLE src_io (id INT, name VARCHAR, value INT) AS VALUES (1, 'alice', 11), (2, 'BOB', 22)", + ) + .await; + + // Only INSERT, no MATCHED clause — matched row id=2 should be untouched + handler + .sql( + "MERGE INTO paimon.test_db.target t USING src_io s ON t.id = s.id \ + WHEN NOT MATCHED THEN INSERT (id, name, value) VALUES (s.id, s.name, s.value)", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_rows_3col( + &handler, + "SELECT id, name, value FROM paimon.test_db.target ORDER BY id", + ) + .await; + assert_eq!( + rows, + vec![ + (1, "alice".to_string(), 11), // inserted + (2, "bob".to_string(), 20), // untouched + (3, "charlie".to_string(), 30), // untouched + ] + ); +} + +#[tokio::test] +async fn test_merge_into_insert_all_columns() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (2, 'bob', 20)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Source schema matches target: (id, name, value) + register_source( + &handler, + "CREATE TABLE src_star (id INT, name VARCHAR, value INT) AS VALUES (1, 'alice', 10), (2, 'BOB', 22)", + ) + .await; + + handler + .sql( + "MERGE INTO paimon.test_db.target t USING src_star s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET name = s.name \ + WHEN NOT MATCHED THEN INSERT (id, name, value) VALUES (s.id, s.name, s.value)", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_rows_3col( + &handler, + "SELECT id, name, value FROM paimon.test_db.target ORDER BY id", + ) + .await; + assert_eq!( + rows, + vec![ + (1, "alice".to_string(), 10), // inserted + (2, "BOB".to_string(), 20), // updated name + ] + ); +} + +#[tokio::test] +async fn test_merge_into_insert_partial_columns() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (2, 'bob', 20)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + register_source( + &handler, + "CREATE TABLE src_partial (id INT, name VARCHAR) AS VALUES (1, 'alice'), (2, 'BOB')", + ) + .await; + + // INSERT only id and name, value should be NULL (but our schema has INT, so this tests partial insert) + handler + .sql( + "MERGE INTO paimon.test_db.target t USING src_partial s ON t.id = s.id \ + WHEN NOT MATCHED THEN INSERT (id, name, value) VALUES (s.id, s.name, 0)", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_rows_3col( + &handler, + "SELECT id, name, value FROM paimon.test_db.target ORDER BY id", + ) + .await; + assert_eq!( + rows, + vec![ + (1, "alice".to_string(), 0), // inserted with value=0 + (2, "bob".to_string(), 20), // untouched + ] + ); +} + +#[tokio::test] +async fn test_merge_into_insert_with_predicate() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (1, 'alice', 10)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Source has 3 rows, only id=1 matches target + register_source( + &handler, + "CREATE TABLE src_pred (id INT, name VARCHAR, value INT) AS VALUES (1, 'ALICE', 11), (2, 'bob', 20), (3, 'charlie', 30)", + ) + .await; + + // Only insert when value > 25 + handler + .sql( + "MERGE INTO paimon.test_db.target t USING src_pred s ON t.id = s.id \ + WHEN NOT MATCHED AND s.value > 25 THEN INSERT (id, name, value) VALUES (s.id, s.name, s.value)", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_rows_3col( + &handler, + "SELECT id, name, value FROM paimon.test_db.target ORDER BY id", + ) + .await; + assert_eq!( + rows, + vec![ + (1, "alice".to_string(), 10), // untouched (matched but no UPDATE clause) + (3, "charlie".to_string(), 30), // inserted (value=30 > 25) + // id=2 not inserted (value=20 <= 25) + ] + ); +} + +#[tokio::test] +async fn test_merge_into_row_id_for_inserted_rows() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (2, 'bob', 20), (3, 'charlie', 30)") + .await.unwrap().collect().await.unwrap(); + + // Before merge: _ROW_ID should be 0, 1 + let before = collect_row_ids( + &handler, + "SELECT \"_ROW_ID\", id FROM paimon.test_db.target ORDER BY id", + ) + .await; + assert_eq!(before, vec![(0, 2), (1, 3)]); + + register_source( + &handler, + "CREATE TABLE src_rid (id INT, name VARCHAR, value INT) AS VALUES (1, 'alice', 11), (2, 'BOB', 22)", + ) + .await; + + handler + .sql( + "MERGE INTO paimon.test_db.target t USING src_rid s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET name = s.name \ + WHEN NOT MATCHED THEN INSERT (id, name, value) VALUES (s.id, s.name, s.value)", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // After merge: existing rows keep their _ROW_ID, new row gets next available + let after = collect_row_ids( + &handler, + "SELECT \"_ROW_ID\", id FROM paimon.test_db.target ORDER BY id", + ) + .await; + + // id=1 is new → _ROW_ID=2 (next after 0,1) + // id=2 updated → _ROW_ID=0 (preserved) + // id=3 untouched → _ROW_ID=1 (preserved) + assert_eq!(after, vec![(2, 1), (0, 2), (1, 3)]); + + // Verify next_row_id in snapshot + let table = catalog + .get_table(&Identifier::new("test_db", "target")) + .await + .unwrap(); + let snap_mgr = SnapshotManager::new(table.file_io().clone(), table.location().to_string()); + let snap = snap_mgr.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!( + snap.next_row_id(), + Some(3), + "next_row_id should be 3 after inserting 1 new row" + ); +} + +#[tokio::test] +async fn test_rejects_when_matched_delete() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (1, 'alice', 10)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + register_source(&handler, "CREATE TABLE src_del (id INT) AS VALUES (1)").await; + + assert_merge_error( + &handler, + "MERGE INTO paimon.test_db.target t USING src_del s ON t.id = s.id \ + WHEN MATCHED THEN DELETE", + "WHEN MATCHED THEN DELETE is not supported", + ) + .await; +} + +#[tokio::test] +async fn test_rejects_multiple_when_matched() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (1, 'alice', 10)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + register_source( + &handler, + "CREATE TABLE src_multi (id INT, name VARCHAR) AS VALUES (1, 'ALICE')", + ) + .await; + + assert_merge_error( + &handler, + "MERGE INTO paimon.test_db.target t USING src_multi s ON t.id = s.id \ + WHEN MATCHED AND t.id = 1 THEN UPDATE SET name = s.name \ + WHEN MATCHED THEN UPDATE SET name = 'default'", + "WHEN MATCHED AND is not yet supported", + ) + .await; +} + +#[tokio::test] +async fn test_rejects_partition_column_in_set() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + + handler.sql("CREATE SCHEMA paimon.test_db").await.unwrap(); + handler + .sql( + "CREATE TABLE paimon.test_db.part_target (\ + pt STRING, id INT NOT NULL, name STRING\ + ) PARTITIONED BY (pt STRING) WITH (\ + 'data-evolution.enabled' = 'true', \ + 'row-tracking.enabled' = 'true'\ + )", + ) + .await + .unwrap(); + + handler + .sql("INSERT INTO paimon.test_db.part_target (pt, id, name) VALUES ('a', 1, 'alice')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + register_source( + &handler, + "CREATE TABLE src_pt (id INT, pt VARCHAR) AS VALUES (1, 'b')", + ) + .await; + + assert_merge_error( + &handler, + "MERGE INTO paimon.test_db.part_target t USING src_pt s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET pt = s.pt", + "Cannot update partition column", + ) + .await; +} + +#[tokio::test] +async fn test_rejects_table_without_row_tracking() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + + handler.sql("CREATE SCHEMA paimon.test_db").await.unwrap(); + handler + .sql( + "CREATE TABLE paimon.test_db.no_tracking (\ + id INT NOT NULL, name STRING\ + ) WITH (\ + 'data-evolution.enabled' = 'true'\ + )", + ) + .await + .unwrap(); + + handler + .sql("INSERT INTO paimon.test_db.no_tracking (id, name) VALUES (1, 'alice')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + register_source( + &handler, + "CREATE TABLE src_nrt (id INT, name VARCHAR) AS VALUES (1, 'ALICE')", + ) + .await; + + assert_merge_error( + &handler, + "MERGE INTO paimon.test_db.no_tracking t USING src_nrt s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET name = s.name", + "row-tracking.enabled", + ) + .await; +} + +#[tokio::test] +async fn test_successive_merges_read_file_group() { + // Verifies that a second MERGE INTO correctly reads columns from the file group + // (base file + partial-column files created by the first merge), not just a single file. + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (1, 'alice', 10), (2, 'bob', 20)") + .await.unwrap().collect().await.unwrap(); + + // First MERGE: update 'name' column → creates a partial-column file for 'name' + register_source( + &handler, + "CREATE TABLE src_m1 (id INT, name VARCHAR) AS VALUES (1, 'ALICE'), (2, 'BOB')", + ) + .await; + handler + .sql( + "MERGE INTO paimon.test_db.target t USING src_m1 s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET name = s.name", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Verify first merge result + let rows = collect_rows_3col( + &handler, + "SELECT id, name, value FROM paimon.test_db.target ORDER BY id", + ) + .await; + assert_eq!( + rows, + vec![(1, "ALICE".to_string(), 10), (2, "BOB".to_string(), 20),] + ); + + // Second MERGE: update 'name' again → must read the merged 'name' from file group + // (base file has original 'name', partial file has updated 'name' from first merge) + register_source( + &handler, + "CREATE TABLE src_m2 (id INT, name VARCHAR) AS VALUES (1, 'Alice_v2')", + ) + .await; + handler + .sql( + "MERGE INTO paimon.test_db.target t USING src_m2 s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET name = s.name", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_rows_3col( + &handler, + "SELECT id, name, value FROM paimon.test_db.target ORDER BY id", + ) + .await; + assert_eq!( + rows, + vec![(1, "Alice_v2".to_string(), 10), (2, "BOB".to_string(), 20),] + ); +} + +#[tokio::test] +async fn test_successive_merges_different_columns_read_file_group() { + // First merge updates 'name', second merge updates 'value'. + // The second merge must correctly read 'value' from the file group + // even though a partial-column file for 'name' now exists. + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (1, 'alice', 10), (2, 'bob', 20)") + .await.unwrap().collect().await.unwrap(); + + // First MERGE: update 'name' + register_source( + &handler, + "CREATE TABLE src_dc1 (id INT, name VARCHAR) AS VALUES (1, 'ALICE')", + ) + .await; + handler + .sql( + "MERGE INTO paimon.test_db.target t USING src_dc1 s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET name = s.name", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Second MERGE: update 'value' — reads from file group (base + name-partial) + register_source( + &handler, + "CREATE TABLE src_dc2 (id INT, value INT) AS VALUES (1, 100), (2, 200)", + ) + .await; + handler + .sql( + "MERGE INTO paimon.test_db.target t USING src_dc2 s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET value = s.value", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_rows_3col( + &handler, + "SELECT id, name, value FROM paimon.test_db.target ORDER BY id", + ) + .await; + assert_eq!( + rows, + vec![(1, "ALICE".to_string(), 100), (2, "bob".to_string(), 200),] + ); +} + +#[tokio::test] +async fn test_merge_insert_reordered_columns() { + // Verifies that INSERT with columns in a different order than the table schema + // still maps data correctly (columns matched by name, not position). + // Table schema: (id INT, name STRING, value INT) + // INSERT specifies: (value, name, id) — reversed order + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + setup_data_evolution_table(&handler).await; + + handler + .sql("INSERT INTO paimon.test_db.target (id, name, value) VALUES (1, 'alice', 10)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + register_source( + &handler, + "CREATE TABLE src_reorder (id INT, name VARCHAR, value INT) AS VALUES (2, 'bob', 20), (1, 'ALICE', 11)", + ) + .await; + + // INSERT columns in reversed order: (value, name, id) + handler + .sql( + "MERGE INTO paimon.test_db.target t USING src_reorder s ON t.id = s.id \ + WHEN NOT MATCHED THEN INSERT (value, name, id) VALUES (s.value, s.name, s.id)", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let rows = collect_rows_3col( + &handler, + "SELECT id, name, value FROM paimon.test_db.target ORDER BY id", + ) + .await; + assert_eq!( + rows, + vec![ + (1, "alice".to_string(), 10), // untouched (matched, no UPDATE clause) + (2, "bob".to_string(), 20), // inserted — columns must be correctly mapped + ] + ); +} + +#[tokio::test] +async fn test_merge_insert_reordered_columns_on_partitioned_table() { + // Verifies column reordering on a partitioned table where mis-mapping + // would cause data to land in the wrong partition. + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + + handler.sql("CREATE SCHEMA paimon.test_db").await.unwrap(); + handler + .sql( + "CREATE TABLE paimon.test_db.part_tbl (\ + dt STRING, id INT NOT NULL, name STRING\ + ) PARTITIONED BY (dt STRING) WITH (\ + 'data-evolution.enabled' = 'true', \ + 'row-tracking.enabled' = 'true'\ + )", + ) + .await + .unwrap(); + + handler + .sql("INSERT INTO paimon.test_db.part_tbl (dt, id, name) VALUES ('2024-01-01', 1, 'alice')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + register_source( + &handler, + "CREATE TABLE src_pt_reorder (id INT, name VARCHAR, dt VARCHAR) AS VALUES (2, 'bob', '2024-02-01'), (1, 'ALICE', '2024-01-01')", + ) + .await; + + // INSERT with columns in different order than table schema: (name, id, dt) vs table (dt, id, name) + handler + .sql( + "MERGE INTO paimon.test_db.part_tbl t USING src_pt_reorder s ON t.id = s.id \ + WHEN NOT MATCHED THEN INSERT (name, id, dt) VALUES (s.name, s.id, s.dt)", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let batches = handler + .sql("SELECT dt, id, name FROM paimon.test_db.part_tbl ORDER BY id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut rows = Vec::new(); + for batch in &batches { + let dts = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let ids = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let names = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + rows.push(( + dts.value(i).to_string(), + ids.value(i), + names.value(i).to_string(), + )); + } + } + + assert_eq!( + rows, + vec![ + ("2024-01-01".to_string(), 1, "alice".to_string()), // untouched + ("2024-02-01".to_string(), 2, "bob".to_string()), // inserted — dt must be partition, not name + ] + ); +} + +#[tokio::test] +async fn test_rejects_table_with_primary_keys() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog); + + handler.sql("CREATE SCHEMA paimon.test_db").await.unwrap(); + handler + .sql( + "CREATE TABLE paimon.test_db.pk_target (\ + id INT NOT NULL, name STRING, PRIMARY KEY (id)\ + ) WITH (\ + 'data-evolution.enabled' = 'true', \ + 'row-tracking.enabled' = 'true'\ + )", + ) + .await + .unwrap(); + + register_source( + &handler, + "CREATE TABLE src_pk (id INT, name VARCHAR) AS VALUES (1, 'ALICE')", + ) + .await; + + assert_merge_error( + &handler, + "MERGE INTO paimon.test_db.pk_target t USING src_pk s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET name = s.name", + "does not support primary keys", + ) + .await; +} diff --git a/crates/integrations/datafusion/tests/ddl_tests.rs b/crates/integrations/datafusion/tests/sql_handler_tests.rs similarity index 98% rename from crates/integrations/datafusion/tests/ddl_tests.rs rename to crates/integrations/datafusion/tests/sql_handler_tests.rs index c520f18..c3608b3 100644 --- a/crates/integrations/datafusion/tests/ddl_tests.rs +++ b/crates/integrations/datafusion/tests/sql_handler_tests.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! DDL integration tests for paimon-datafusion. +//! SQL handler integration tests for paimon-datafusion. use std::sync::Arc; @@ -24,7 +24,7 @@ use datafusion::prelude::SessionContext; use paimon::catalog::Identifier; use paimon::spec::{ArrayType, DataType, IntType, MapType, VarCharType}; use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options}; -use paimon_datafusion::{PaimonCatalogProvider, PaimonDdlHandler, PaimonRelationPlanner}; +use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner, PaimonSqlHandler}; use tempfile::TempDir; fn create_test_env() -> (TempDir, Arc) { @@ -36,7 +36,7 @@ fn create_test_env() -> (TempDir, Arc) { (temp_dir, Arc::new(catalog)) } -fn create_handler(catalog: Arc) -> PaimonDdlHandler { +fn create_handler(catalog: Arc) -> PaimonSqlHandler { let ctx = SessionContext::new(); ctx.register_catalog( "paimon", @@ -44,7 +44,7 @@ fn create_handler(catalog: Arc) -> PaimonDdlHandler { ); ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new())) .expect("Failed to register relation planner"); - PaimonDdlHandler::new(ctx, catalog, "paimon") + PaimonSqlHandler::new(ctx, catalog, "paimon") } // ======================= CREATE / DROP SCHEMA ======================= diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index b322347..5aabe25 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -42,8 +42,8 @@ pub use catalog::CatalogFactory; pub use catalog::FileSystemCatalog; pub use table::{ - CommitMessage, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RESTEnv, - RESTSnapshotCommit, ReadBuilder, RenamingSnapshotCommit, RowRange, SnapshotCommit, - SnapshotManager, Table, TableCommit, TableRead, TableScan, TableWrite, TagManager, - WriteBuilder, + CommitMessage, DataEvolutionWriter, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, + Plan, RESTEnv, RESTSnapshotCommit, ReadBuilder, RenamingSnapshotCommit, RowRange, + SnapshotCommit, SnapshotManager, Table, TableCommit, TableRead, TableScan, TableWrite, + TagManager, WriteBuilder, }; diff --git a/crates/paimon/src/spec/binary_row.rs b/crates/paimon/src/spec/binary_row.rs index 6599484..b7f9e94 100644 --- a/crates/paimon/src/spec/binary_row.rs +++ b/crates/paimon/src/spec/binary_row.rs @@ -95,6 +95,14 @@ impl BinaryRow { Ok(Self::from_bytes(arity, data[4..].to_vec())) } + /// Serialize this BinaryRow to bytes (arity prefix + data), the inverse of `from_serialized_bytes`. + pub fn to_serialized_bytes(&self) -> Vec { + let mut buf = Vec::with_capacity(4 + self.data.len()); + buf.extend_from_slice(&self.arity.to_be_bytes()); + buf.extend_from_slice(&self.data); + buf + } + pub fn arity(&self) -> i32 { self.arity } @@ -623,11 +631,15 @@ pub fn extract_datum_from_arrow( Datum::Double(arr.value(row_idx)) } DataType::Char(_) | DataType::VarChar(_) => { - let arr = col - .as_any() - .downcast_ref::() - .ok_or_else(|| type_mismatch_err("String", col_idx))?; - Datum::String(arr.value(row_idx).to_string()) + if let Some(arr) = col.as_any().downcast_ref::() { + Datum::String(arr.value(row_idx).to_string()) + } else if let Some(arr) = col.as_any().downcast_ref::() { + Datum::String(arr.value(row_idx).to_string()) + } else if let Some(arr) = col.as_any().downcast_ref::() { + Datum::String(arr.value(row_idx).to_string()) + } else { + return Err(type_mismatch_err("String", col_idx)); + } } DataType::Date(_) => { let arr = col diff --git a/crates/paimon/src/table/data_evolution_writer.rs b/crates/paimon/src/table/data_evolution_writer.rs new file mode 100644 index 0000000..9e2ffbb --- /dev/null +++ b/crates/paimon/src/table/data_evolution_writer.rs @@ -0,0 +1,862 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Row-ID-based update writer for data evolution tables. +//! +//! [`DataEvolutionWriter`] accepts rows to update (identified by `_ROW_ID`) along with +//! new column values, then handles file metadata lookup, row grouping, +//! reading original columns, applying updates, and writing partial-column files. +//! +//! The writer does NOT commit — it returns `Vec` that the caller +//! passes to [`TableCommit`](super::TableCommit). +//! This separation allows callers to compose multiple operations into a single commit, +//! similar to Iceberg's Transaction/Action pattern. + +use crate::io::FileIO; +use crate::spec::{BinaryRow, CoreOptions, DataFileMeta, PartitionComputer}; +use crate::table::commit_message::CommitMessage; +use crate::table::data_file_writer::DataFileWriter; +use crate::table::stats_filter::group_by_overlapping_row_id; +use crate::table::DataSplitBuilder; +use crate::table::Table; +use crate::Result; +use arrow_array::{Array, ArrayRef, Int64Array, RecordBatch}; +use arrow_select::concat::concat_batches; +use arrow_select::interleave::interleave; +use futures::TryStreamExt; +use std::collections::HashMap; + +/// Engine-agnostic writer for partial-column updates via `_ROW_ID`. +/// +/// Usage: +/// 1. Create via [`DataEvolutionWriter::new`] (validates preconditions). +/// 2. Feed matched rows via [`add_matched_batch`](Self::add_matched_batch). +/// Each batch must contain a `_ROW_ID` (Int64) column plus the update columns. +/// 3. Call [`prepare_commit`](Self::prepare_commit) to produce `CommitMessage`s. +/// 4. Commit via [`TableCommit`](super::TableCommit) (caller's responsibility). +/// +/// The query engine (DataFusion, custom, etc.) is responsible for: +/// - Parsing the MERGE INTO SQL +/// - Executing the JOIN to find matched rows +/// - Computing new column values +/// - Passing the results as `RecordBatch`es with `_ROW_ID` + update columns +#[must_use = "writer must be used to call prepare_commit()"] +pub struct DataEvolutionWriter { + table: Table, + update_columns: Vec, + matched_batches: Vec, +} + +impl DataEvolutionWriter { + /// Create a new writer for the given table and update columns. + /// + /// Validates: + /// - `data-evolution.enabled = true` + /// - `row-tracking.enabled = true` + /// - No primary keys + /// - Update columns don't include partition keys + pub fn new(table: &Table, update_columns: Vec) -> Result { + let schema = table.schema(); + let core_options = CoreOptions::new(schema.options()); + + if !core_options.data_evolution_enabled() { + return Err(crate::Error::Unsupported { + message: + "MERGE INTO is only supported for tables with 'data-evolution.enabled' = 'true'" + .to_string(), + }); + } + if !core_options.row_tracking_enabled() { + return Err(crate::Error::Unsupported { + message: "MERGE INTO requires 'row-tracking.enabled' = 'true'".to_string(), + }); + } + if !schema.primary_keys().is_empty() { + return Err(crate::Error::Unsupported { + message: "MERGE INTO on data evolution tables does not support primary keys" + .to_string(), + }); + } + + let partition_keys = schema.partition_keys(); + for col in &update_columns { + if partition_keys.contains(col) { + return Err(crate::Error::Unsupported { + message: format!("Cannot update partition column '{col}' in MERGE INTO"), + }); + } + } + + Ok(Self { + table: table.clone(), + update_columns, + matched_batches: Vec::new(), + }) + } + + /// Add a batch of matched rows. + /// + /// The batch must contain: + /// - A `_ROW_ID` column (Int64) identifying which rows to update + /// - One column for each entry in `update_columns` with the new values + pub fn add_matched_batch(&mut self, batch: RecordBatch) -> Result<()> { + if batch.num_rows() == 0 { + return Ok(()); + } + + // Validate _ROW_ID column exists + if batch.column_by_name("_ROW_ID").is_none() { + return Err(crate::Error::DataInvalid { + message: "Matched batch must contain a '_ROW_ID' column".to_string(), + source: None, + }); + } + + self.matched_batches.push(batch); + Ok(()) + } + + /// Scan file metadata, group matched rows by file, read originals, + /// apply updates, and write partial-column files. + /// + /// Returns `CommitMessage`s for the caller to commit via [`TableCommit`](super::TableCommit). + #[must_use = "commit messages must be passed to TableCommit"] + pub async fn prepare_commit(self) -> Result> { + let total_matched: usize = self.matched_batches.iter().map(|b| b.num_rows()).sum(); + if total_matched == 0 { + return Ok(Vec::new()); + } + + // 1. Scan file metadata and build row_id -> file group index. + // In data-evolution tables, multiple files can share the same first_row_id + // (base file + partial-column files). We must group them so the reader + // can merge columns correctly. + let scan = self.table.new_read_builder().new_scan(); + let plan = scan.plan().await?; + + let mut file_index: Vec = Vec::new(); + for split in plan.splits() { + let partition_bytes = split.partition().to_serialized_bytes(); + let bucket = split.bucket(); + let bucket_path = split.bucket_path().to_string(); + let snapshot_id = split.snapshot_id(); + let total_buckets = split.total_buckets(); + + let all_files: Vec = split + .data_files() + .iter() + .filter(|f| f.first_row_id.is_some()) + .cloned() + .collect(); + + let groups = group_by_overlapping_row_id(all_files); + for group in groups { + // Compute the overall row_id range for this group. + // The base file has the widest range; partial-column files share it. + let first_row_id = group.iter().filter_map(|f| f.first_row_id).min().unwrap(); + let last_row_id = group + .iter() + .filter_map(|f| f.row_id_range().map(|(_, end)| end)) + .max() + .unwrap(); + // The actual row count is the max among the group (base file's count). + let row_count = group.iter().map(|f| f.row_count).max().unwrap(); + + file_index.push(FileRowRange { + first_row_id, + last_row_id, + row_count, + partition: partition_bytes.clone(), + bucket, + bucket_path: bucket_path.clone(), + snapshot_id, + total_buckets, + files: group, + }); + } + } + file_index.sort_by_key(|f| f.first_row_id); + + if file_index.is_empty() { + return Err(crate::Error::DataInvalid { + message: "No files with row tracking found in target table".to_string(), + source: None, + }); + } + + // 2. Group matched rows by their owning file + let mut file_matches: HashMap> = HashMap::new(); + + for (batch_idx, batch) in self.matched_batches.iter().enumerate() { + let row_id_col = batch + .column_by_name("_ROW_ID") + .unwrap() + .as_any() + .downcast_ref::() + .ok_or_else(|| crate::Error::DataInvalid { + message: "_ROW_ID column must be Int64".to_string(), + source: None, + })?; + + for row_idx in 0..batch.num_rows() { + let row_id = row_id_col.value(row_idx); + let (file_pos, file_range) = + find_owning_file(&file_index, row_id).ok_or_else(|| { + crate::Error::DataInvalid { + message: format!("No file found for _ROW_ID {row_id}"), + source: None, + } + })?; + + let offset = (row_id - file_range.first_row_id) as usize; + file_matches.entry(file_pos).or_default().push(MatchedRow { + offset, + batch_idx, + row_idx, + }); + } + } + + // 3. For each affected file: read original columns, apply updates, write partial files + let mut writer = DataEvolutionPartialWriter::new(&self.table, self.update_columns.clone())?; + + for (&file_pos, matched_rows) in &file_matches { + let file_range = &file_index[file_pos]; + let first_row_id = file_range.first_row_id; + let row_count = file_range.row_count as usize; + + // Read original columns from the entire file group (base + partial-column files). + let col_refs: Vec<&str> = self.update_columns.iter().map(|s| s.as_str()).collect(); + let mut rb = self.table.new_read_builder(); + rb.with_projection(&col_refs); + let read = rb.new_read()?; + + let raw_convertible = file_range.files.len() == 1; + let split = DataSplitBuilder::new() + .with_snapshot(file_range.snapshot_id) + .with_partition(BinaryRow::from_serialized_bytes(&file_range.partition)?) + .with_bucket(file_range.bucket) + .with_bucket_path(file_range.bucket_path.clone()) + .with_total_buckets(file_range.total_buckets) + .with_data_files(file_range.files.clone()) + .with_raw_convertible(raw_convertible) + .build()?; + + let stream = read.to_arrow(&[split])?; + let original_batches: Vec = stream.try_collect().await?; + + let original_batch = if original_batches.is_empty() { + continue; + } else if original_batches.len() == 1 { + original_batches.into_iter().next().unwrap() + } else { + concat_batches(&original_batches[0].schema(), &original_batches).map_err(|e| { + crate::Error::DataInvalid { + message: format!("Failed to concat batches: {e}"), + source: None, + } + })? + }; + + if original_batch.num_rows() != row_count { + return Err(crate::Error::DataInvalid { + message: format!( + "Expected {} rows from file, got {}", + row_count, + original_batch.num_rows() + ), + source: None, + }); + } + + // Apply updates using 2-array interleave: [original_col, updates_col]. + // Matched rows are gathered into a single contiguous update array first, + // avoiding O(N) array clones for every row in the file. + let mut new_columns: Vec = Vec::with_capacity(self.update_columns.len()); + + // Sort matched rows by offset for contiguous iteration + let mut sorted_matches: Vec<(usize, usize, usize)> = matched_rows + .iter() + .map(|m| (m.offset, m.batch_idx, m.row_idx)) + .collect(); + sorted_matches.sort_by_key(|(offset, _, _)| *offset); + + for (col_idx, col_name) in self.update_columns.iter().enumerate() { + let original_col = original_batch.column(col_idx); + let original_dtype = original_col.data_type(); + + let join_col_idx = self.matched_batches[0] + .schema() + .index_of(col_name) + .map_err(|e| crate::Error::DataInvalid { + message: format!("Column {col_name} not found in matched batch: {e}"), + source: None, + })?; + + // Gather update values into a single array (one entry per matched row, in offset order) + let update_indices: Vec<(usize, usize)> = sorted_matches + .iter() + .map(|&(_, batch_idx, row_idx)| (batch_idx, row_idx)) + .collect(); + + // Collect unique batch arrays, cast if needed + let mut batch_arrays: Vec = Vec::new(); + let mut batch_id_map: HashMap = HashMap::new(); + let mut interleave_src: Vec<(usize, usize)> = + Vec::with_capacity(update_indices.len()); + + for &(batch_idx, row_idx) in &update_indices { + let arr_idx = match batch_id_map.get(&batch_idx) { + Some(&idx) => idx, + None => { + let src_col = self.matched_batches[batch_idx].column(join_col_idx); + let casted = if src_col.data_type() != original_dtype { + arrow_cast::cast(src_col, original_dtype).map_err(|e| { + crate::Error::DataInvalid { + message: format!("Failed to cast column {col_name}: {e}"), + source: None, + } + })? + } else { + src_col.clone() + }; + let idx = batch_arrays.len(); + batch_arrays.push(casted); + batch_id_map.insert(batch_idx, idx); + idx + } + }; + interleave_src.push((arr_idx, row_idx)); + } + + let update_col = if batch_arrays.len() == 1 && interleave_src.len() == 1 { + // Single update value — just slice + let (_, row_idx) = interleave_src[0]; + batch_arrays[0].slice(row_idx, 1) + } else { + let refs: Vec<&dyn Array> = batch_arrays.iter().map(|a| a.as_ref()).collect(); + interleave(&refs, &interleave_src).map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to gather update values for {col_name}: {e}"), + source: None, + })? + }; + + // Build final indices: 2 sources — [0] = original, [1] = update_col + let mut indices: Vec<(usize, usize)> = Vec::with_capacity(row_count); + let mut match_pos = 0; + for row in 0..row_count { + if match_pos < sorted_matches.len() && sorted_matches[match_pos].0 == row { + indices.push((1, match_pos)); + match_pos += 1; + } else { + indices.push((0, row)); + } + } + + let sources: [&dyn Array; 2] = [original_col.as_ref(), update_col.as_ref()]; + let new_col = + interleave(&sources, &indices).map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to interleave column {col_name}: {e}"), + source: None, + })?; + new_columns.push(new_col); + } + + let updated_batch = RecordBatch::try_new(original_batch.schema(), new_columns) + .map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to create updated batch: {e}"), + source: None, + })?; + + writer + .write_partial_batch( + file_range.partition.clone(), + file_range.bucket, + first_row_id, + updated_batch, + ) + .await?; + } + + // 4. Collect commit messages (caller is responsible for committing) + writer.prepare_commit().await + } +} + +/// Binary search for the file that owns a given row_id. +fn find_owning_file(file_index: &[FileRowRange], row_id: i64) -> Option<(usize, &FileRowRange)> { + let pos = file_index.partition_point(|f| f.first_row_id <= row_id); + if pos == 0 { + return None; + } + let idx = pos - 1; + let candidate = &file_index[idx]; + if row_id <= candidate.last_row_id { + Some((idx, candidate)) + } else { + None + } +} + +struct FileRowRange { + first_row_id: i64, + last_row_id: i64, + row_count: i64, + partition: Vec, + bucket: i32, + bucket_path: String, + snapshot_id: i64, + total_buckets: i32, + /// All files in this row-id group (base file + partial-column files). + files: Vec, +} + +struct MatchedRow { + offset: usize, + batch_idx: usize, + row_idx: usize, +} + +// --------------------------------------------------------------------------- +// DataEvolutionPartialWriter — writes partial-column parquet files for data evolution +// --------------------------------------------------------------------------- + +/// Key: (partition_bytes, bucket, first_row_id) +type WriterKey = (Vec, i32, i64); + +/// Writer for data evolution partial-column files. +/// +/// Unlike [`TableWrite`](super::TableWrite) which writes full-row files for append-only tables, +/// `DataEvolutionPartialWriter` writes partial-column files used by MERGE INTO on data evolution tables. +/// Each output file contains only the updated columns and shares the same `first_row_id` range +/// as the original file, allowing the reader to merge columns at read time. +/// +/// Produces parquet files containing only the specified `write_columns`, with +/// `file_source = APPEND (0)`, caller-supplied `first_row_id`, and `write_cols`. +pub(crate) struct DataEvolutionPartialWriter { + file_io: FileIO, + table_location: String, + partition_computer: PartitionComputer, + partition_keys: Vec, + schema_id: i64, + target_file_size: i64, + file_compression: String, + file_compression_zstd_level: i32, + write_buffer_size: i64, + write_columns: Vec, + /// Writers keyed by (partition_bytes, bucket, first_row_id). + writers: HashMap, +} + +impl DataEvolutionPartialWriter { + /// Create a new writer for partial-column data evolution files. + /// + /// `write_columns` specifies which table columns this write covers (the SET targets). + pub fn new(table: &Table, write_columns: Vec) -> Result { + let schema = table.schema(); + let core_options = CoreOptions::new(schema.options()); + + if !core_options.data_evolution_enabled() { + return Err(crate::Error::Unsupported { + message: "DataEvolutionPartialWriter requires data-evolution.enabled = true" + .to_string(), + }); + } + + let partition_keys: Vec = schema.partition_keys().to_vec(); + let fields = schema.fields(); + let partition_computer = PartitionComputer::new( + &partition_keys, + fields, + core_options.partition_default_name(), + core_options.legacy_partition_name(), + )?; + + Ok(Self { + file_io: table.file_io().clone(), + table_location: table.location().to_string(), + partition_computer, + partition_keys, + schema_id: schema.id(), + target_file_size: core_options.target_file_size(), + file_compression: core_options.file_compression().to_string(), + file_compression_zstd_level: core_options.file_compression_zstd_level(), + write_buffer_size: core_options.write_parquet_buffer_size(), + write_columns, + writers: HashMap::new(), + }) + } + + /// Write a partial-column batch for a specific partition, bucket, and row ID range. + /// + /// The `batch` must contain only the columns specified in `write_columns`. + /// `first_row_id` must match the original file's `first_row_id` for the affected rows. + pub async fn write_partial_batch( + &mut self, + partition_bytes: Vec, + bucket: i32, + first_row_id: i64, + batch: RecordBatch, + ) -> Result<()> { + if batch.num_rows() == 0 { + return Ok(()); + } + + let key = (partition_bytes.clone(), bucket, first_row_id); + if !self.writers.contains_key(&key) { + let partition_path = if self.partition_keys.is_empty() { + String::new() + } else { + let row = BinaryRow::from_serialized_bytes(&partition_bytes)?; + self.partition_computer.generate_partition_path(&row)? + }; + + let writer = DataFileWriter::new( + self.file_io.clone(), + self.table_location.clone(), + partition_path, + bucket, + self.schema_id, + self.target_file_size, + self.file_compression.clone(), + self.file_compression_zstd_level, + self.write_buffer_size, + Some(0), // file_source: APPEND + Some(first_row_id), + Some(self.write_columns.clone()), + ); + self.writers.insert(key.clone(), writer); + } + + let writer = self.writers.get_mut(&key).unwrap(); + writer.write(&batch).await + } + + /// Close all writers and collect CommitMessages for use with TableCommit. + pub async fn prepare_commit(&mut self) -> Result> { + let writers: Vec<(WriterKey, DataFileWriter)> = self.writers.drain().collect(); + + let futures: Vec<_> = writers + .into_iter() + .map( + |((partition_bytes, bucket, _first_row_id), mut writer)| async move { + let files = writer.prepare_commit().await?; + Ok::<_, crate::Error>((partition_bytes, bucket, files)) + }, + ) + .collect(); + + let results = futures::future::try_join_all(futures).await?; + + // Group files by (partition, bucket) since multiple first_row_ids may share the same partition/bucket + let mut grouped: HashMap<(Vec, i32), Vec> = HashMap::new(); + for (partition_bytes, bucket, files) in results { + grouped + .entry((partition_bytes, bucket)) + .or_default() + .extend(files); + } + + let mut messages = Vec::new(); + for ((partition_bytes, bucket), files) in grouped { + if !files.is_empty() { + messages.push(CommitMessage::new(partition_bytes, bucket, files)); + } + } + Ok(messages) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::catalog::Identifier; + use crate::io::FileIOBuilder; + use crate::spec::{DataType, IntType, Schema, TableSchema, VarCharType}; + use arrow_array::StringArray; + use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; + use std::sync::Arc; + + fn test_file_io() -> FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + fn make_test_file_meta( + file_name: &str, + row_count: i64, + first_row_id: Option, + max_seq: i64, + write_cols: Option>, + ) -> DataFileMeta { + use crate::spec::stats::BinaryTableStats; + let empty_stats = BinaryTableStats::new(vec![], vec![], vec![]); + DataFileMeta { + file_name: file_name.to_string(), + file_size: 0, + row_count, + min_key: vec![], + max_key: vec![], + key_stats: empty_stats.clone(), + value_stats: empty_stats, + min_sequence_number: 0, + max_sequence_number: max_seq, + schema_id: 0, + level: 0, + extra_files: vec![], + creation_time: None, + delete_row_count: None, + embedded_index: None, + file_source: Some(0), + value_stats_cols: None, + external_path: None, + first_row_id, + write_cols, + } + } + + fn test_data_evolution_schema() -> TableSchema { + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("name", DataType::VarChar(VarCharType::string_type())) + .column("value", DataType::Int(IntType::new())) + .option("data-evolution.enabled", "true") + .option("row-tracking.enabled", "true") + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn test_table(file_io: &FileIO, table_path: &str) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test_de_table"), + table_path.to_string(), + test_data_evolution_schema(), + None, + ) + } + + async fn setup_dirs(file_io: &FileIO, table_path: &str) { + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + file_io + .mkdirs(&format!("{table_path}/manifest/")) + .await + .unwrap(); + } + + fn make_partial_batch(names: Vec<&str>) -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "name", + ArrowDataType::Utf8, + true, + )])); + RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(names))]).unwrap() + } + + #[tokio::test] + async fn test_write_partial_column_file() { + let file_io = test_file_io(); + let table_path = "memory:/test_de_write"; + setup_dirs(&file_io, table_path).await; + + let table = test_table(&file_io, table_path); + let mut writer = DataEvolutionPartialWriter::new(&table, vec!["name".to_string()]).unwrap(); + + let batch = make_partial_batch(vec!["alice", "bob", "charlie"]); + writer + .write_partial_batch(vec![], 0, 0, batch) + .await + .unwrap(); + + let messages = writer.prepare_commit().await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].new_files.len(), 1); + + let meta = &messages[0].new_files[0]; + assert_eq!(meta.row_count, 3); + assert_eq!(meta.first_row_id, Some(0)); + assert_eq!(meta.write_cols, Some(vec!["name".to_string()])); + assert_eq!(meta.file_source, Some(0)); + } + + #[tokio::test] + async fn test_different_first_row_id_creates_separate_files() { + let file_io = test_file_io(); + let table_path = "memory:/test_de_write_multi"; + setup_dirs(&file_io, table_path).await; + + let table = test_table(&file_io, table_path); + let mut writer = DataEvolutionPartialWriter::new(&table, vec!["name".to_string()]).unwrap(); + + // Two batches with different first_row_id should produce two files + let batch1 = make_partial_batch(vec!["alice", "bob"]); + writer + .write_partial_batch(vec![], 0, 0, batch1) + .await + .unwrap(); + + let batch2 = make_partial_batch(vec!["charlie"]); + writer + .write_partial_batch(vec![], 0, 100, batch2) + .await + .unwrap(); + + let messages = writer.prepare_commit().await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].new_files.len(), 2); + + let mut files = messages[0].new_files.clone(); + files.sort_by_key(|f| f.first_row_id); + assert_eq!(files[0].first_row_id, Some(0)); + assert_eq!(files[0].row_count, 2); + assert_eq!(files[1].first_row_id, Some(100)); + assert_eq!(files[1].row_count, 1); + } + + #[test] + fn test_find_owning_file_with_grouped_ranges() { + // Simulate a file group: base file (3 cols, 100 rows) + partial file (1 col, 100 rows) + // sharing the same first_row_id range [0, 99]. + let base_file = make_test_file_meta("base-0.parquet", 100, Some(0), 1, None); + let partial_file = make_test_file_meta( + "partial-0.parquet", + 100, + Some(0), + 2, + Some(vec!["name".to_string()]), + ); + + let file_index = vec![ + FileRowRange { + first_row_id: 0, + last_row_id: 99, + row_count: 100, + partition: vec![], + bucket: 0, + bucket_path: String::new(), + snapshot_id: 1, + total_buckets: 1, + files: vec![base_file, partial_file], + }, + FileRowRange { + first_row_id: 100, + last_row_id: 149, + row_count: 50, + partition: vec![], + bucket: 0, + bucket_path: String::new(), + snapshot_id: 1, + total_buckets: 1, + files: vec![make_test_file_meta( + "base-1.parquet", + 50, + Some(100), + 1, + None, + )], + }, + ]; + + // row_id 0 -> first group (2 files) + let (pos, range) = find_owning_file(&file_index, 0).unwrap(); + assert_eq!(pos, 0); + assert_eq!(range.files.len(), 2); + + // row_id 50 -> still first group + let (pos, range) = find_owning_file(&file_index, 50).unwrap(); + assert_eq!(pos, 0); + assert_eq!(range.row_count, 100); + + // row_id 99 -> last row of first group + let (pos, _) = find_owning_file(&file_index, 99).unwrap(); + assert_eq!(pos, 0); + + // row_id 100 -> second group (1 file) + let (pos, range) = find_owning_file(&file_index, 100).unwrap(); + assert_eq!(pos, 1); + assert_eq!(range.files.len(), 1); + + // row_id 200 -> not found + assert!(find_owning_file(&file_index, 200).is_none()); + } + + #[test] + fn test_file_group_construction_from_overlapping_files() { + // Verify that group_by_overlapping_row_id correctly groups base + partial files, + // and that we can build FileRowRange from the result. + let base = make_test_file_meta("base.parquet", 100, Some(0), 1, None); + let partial1 = make_test_file_meta( + "partial1.parquet", + 100, + Some(0), + 2, + Some(vec!["name".to_string()]), + ); + let partial2 = make_test_file_meta( + "partial2.parquet", + 100, + Some(0), + 3, + Some(vec!["value".to_string()]), + ); + let separate = make_test_file_meta("separate.parquet", 50, Some(200), 1, None); + + let groups = group_by_overlapping_row_id(vec![base, partial1, partial2, separate]); + + // Should produce 2 groups: [base, partial1, partial2] and [separate] + assert_eq!(groups.len(), 2); + assert_eq!(groups[0].len(), 3); + assert_eq!(groups[1].len(), 1); + + // Build FileRowRange from first group + let group = &groups[0]; + let first_row_id = group.iter().filter_map(|f| f.first_row_id).min().unwrap(); + let last_row_id = group + .iter() + .filter_map(|f| f.row_id_range().map(|(_, end)| end)) + .max() + .unwrap(); + let row_count = group.iter().map(|f| f.row_count).max().unwrap(); + + assert_eq!(first_row_id, 0); + assert_eq!(last_row_id, 99); + assert_eq!(row_count, 100); + } + + #[tokio::test] + async fn test_rejects_non_data_evolution_table() { + let file_io = test_file_io(); + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .build() + .unwrap(); + let table_schema = TableSchema::new(0, &schema); + let table = Table::new( + file_io, + Identifier::new("default", "test"), + "memory:/test".to_string(), + table_schema, + None, + ); + + let result = DataEvolutionPartialWriter::new(&table, vec!["id".to_string()]); + assert!(result.is_err()); + } +} diff --git a/crates/paimon/src/table/data_file_writer.rs b/crates/paimon/src/table/data_file_writer.rs new file mode 100644 index 0000000..cfbc55a --- /dev/null +++ b/crates/paimon/src/table/data_file_writer.rs @@ -0,0 +1,266 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Low-level data file writer shared by [`TableWrite`](super::TableWrite) and +//! [`DataEvolutionPartialWriter`](super::data_evolution_writer::DataEvolutionPartialWriter). +//! +//! `DataFileWriter` streams Arrow `RecordBatch`es to Parquet files on storage, +//! handles file rolling when `target_file_size` is reached, and collects +//! [`DataFileMeta`] for the commit path. + +use crate::arrow::format::{create_format_writer, FormatFileWriter}; +use crate::io::FileIO; +use crate::spec::stats::BinaryTableStats; +use crate::spec::{DataFileMeta, EMPTY_SERIALIZED_ROW}; +use crate::Result; +use arrow_array::RecordBatch; +use chrono::Utc; +use tokio::task::JoinSet; + +/// Low-level writer that produces Parquet data files for a single (partition, bucket). +/// +/// Batches are accumulated into a single `FormatFileWriter` that streams directly +/// to storage. When `target_file_size` is reached the current file is rolled +/// (closed in the background) and a new one is opened on the next write. +/// +/// Call [`prepare_commit`](Self::prepare_commit) to finalize and collect file metadata. +pub(crate) struct DataFileWriter { + file_io: FileIO, + table_location: String, + partition_path: String, + bucket: i32, + schema_id: i64, + target_file_size: i64, + file_compression: String, + file_compression_zstd_level: i32, + write_buffer_size: i64, + file_source: Option, + first_row_id: Option, + write_cols: Option>, + written_files: Vec, + /// Background file close tasks spawned during rolling. + in_flight_closes: JoinSet>, + /// Current open format writer, lazily created on first write. + current_writer: Option>, + current_file_name: Option, + current_row_count: i64, +} + +impl DataFileWriter { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + file_io: FileIO, + table_location: String, + partition_path: String, + bucket: i32, + schema_id: i64, + target_file_size: i64, + file_compression: String, + file_compression_zstd_level: i32, + write_buffer_size: i64, + file_source: Option, + first_row_id: Option, + write_cols: Option>, + ) -> Self { + Self { + file_io, + table_location, + partition_path, + bucket, + schema_id, + target_file_size, + file_compression, + file_compression_zstd_level, + write_buffer_size, + file_source, + first_row_id, + write_cols, + written_files: Vec::new(), + in_flight_closes: JoinSet::new(), + current_writer: None, + current_file_name: None, + current_row_count: 0, + } + } + + /// Write a RecordBatch. Rolls to a new file when target size is reached. + pub(crate) async fn write(&mut self, batch: &RecordBatch) -> Result<()> { + if batch.num_rows() == 0 { + return Ok(()); + } + + if self.current_writer.is_none() { + self.open_new_file(batch.schema()).await?; + } + + self.current_row_count += batch.num_rows() as i64; + self.current_writer.as_mut().unwrap().write(batch).await?; + + // Roll to a new file if target size is reached — close in background + if self.current_writer.as_ref().unwrap().num_bytes() as i64 >= self.target_file_size { + self.roll_file(); + } + + // Flush row group if in-progress buffer exceeds write_buffer_size + if let Some(w) = self.current_writer.as_mut() { + if w.in_progress_size() as i64 >= self.write_buffer_size { + w.flush().await?; + } + } + + Ok(()) + } + + async fn open_new_file(&mut self, schema: arrow_schema::SchemaRef) -> Result<()> { + let file_name = format!( + "data-{}-{}.parquet", + uuid::Uuid::new_v4(), + self.written_files.len() + ); + + let bucket_dir = if self.partition_path.is_empty() { + format!("{}/bucket-{}", self.table_location, self.bucket) + } else { + format!( + "{}/{}/bucket-{}", + self.table_location, self.partition_path, self.bucket + ) + }; + self.file_io.mkdirs(&format!("{bucket_dir}/")).await?; + + let file_path = format!("{bucket_dir}/{file_name}"); + let output = self.file_io.new_output(&file_path)?; + let writer = create_format_writer( + &output, + schema, + &self.file_compression, + self.file_compression_zstd_level, + ) + .await?; + self.current_writer = Some(writer); + self.current_file_name = Some(file_name); + self.current_row_count = 0; + Ok(()) + } + + /// Close the current file writer and record the file metadata. + pub(crate) async fn close_current_file(&mut self) -> Result<()> { + let writer = match self.current_writer.take() { + Some(w) => w, + None => return Ok(()), + }; + let file_name = self.current_file_name.take().unwrap(); + + let row_count = self.current_row_count; + self.current_row_count = 0; + let file_size = writer.close().await? as i64; + + let meta = Self::build_meta( + file_name, + file_size, + row_count, + self.schema_id, + self.file_source, + self.first_row_id, + self.write_cols.clone(), + ); + self.written_files.push(meta); + Ok(()) + } + + /// Spawn the current writer's close in the background for non-blocking rolling. + fn roll_file(&mut self) { + let writer = match self.current_writer.take() { + Some(w) => w, + None => return, + }; + let file_name = self.current_file_name.take().unwrap(); + let row_count = self.current_row_count; + self.current_row_count = 0; + let schema_id = self.schema_id; + let file_source = self.file_source; + let first_row_id = self.first_row_id; + let write_cols = self.write_cols.clone(); + + self.in_flight_closes.spawn(async move { + let file_size = writer.close().await? as i64; + Ok(Self::build_meta( + file_name, + file_size, + row_count, + schema_id, + file_source, + first_row_id, + write_cols, + )) + }); + } + + /// Close the current writer and return all written file metadata. + pub(crate) async fn prepare_commit(&mut self) -> Result> { + self.close_current_file().await?; + while let Some(result) = self.in_flight_closes.join_next().await { + let meta = result.map_err(|e| crate::Error::DataInvalid { + message: format!("Background file close task panicked: {e}"), + source: None, + })??; + self.written_files.push(meta); + } + Ok(std::mem::take(&mut self.written_files)) + } + + fn build_meta( + file_name: String, + file_size: i64, + row_count: i64, + schema_id: i64, + file_source: Option, + first_row_id: Option, + write_cols: Option>, + ) -> DataFileMeta { + DataFileMeta { + file_name, + file_size, + row_count, + min_key: EMPTY_SERIALIZED_ROW.clone(), + max_key: EMPTY_SERIALIZED_ROW.clone(), + key_stats: BinaryTableStats::new( + EMPTY_SERIALIZED_ROW.clone(), + EMPTY_SERIALIZED_ROW.clone(), + vec![], + ), + value_stats: BinaryTableStats::new( + EMPTY_SERIALIZED_ROW.clone(), + EMPTY_SERIALIZED_ROW.clone(), + vec![], + ), + min_sequence_number: 0, + max_sequence_number: 0, + schema_id, + level: 0, + extra_files: vec![], + creation_time: Some(Utc::now()), + delete_row_count: Some(0), + embedded_index: None, + file_source, + value_stats_cols: Some(vec![]), + external_path: None, + first_row_id, + write_cols, + } + } +} diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 2fbd0a8..431857f 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -20,6 +20,8 @@ pub(crate) mod bin_pack; mod bucket_filter; mod commit_message; +pub mod data_evolution_writer; +mod data_file_writer; #[cfg(feature = "fulltext")] mod full_text_search_builder; pub(crate) mod global_index_scanner; @@ -40,6 +42,7 @@ mod write_builder; use crate::Result; use arrow_array::RecordBatch; pub use commit_message::CommitMessage; +pub use data_evolution_writer::DataEvolutionWriter; #[cfg(feature = "fulltext")] pub use full_text_search_builder::FullTextSearchBuilder; use futures::stream::BoxStream; diff --git a/crates/paimon/src/table/stats_filter.rs b/crates/paimon/src/table/stats_filter.rs index 9796bf4..40ce43a 100644 --- a/crates/paimon/src/table/stats_filter.rs +++ b/crates/paimon/src/table/stats_filter.rs @@ -295,14 +295,15 @@ pub(super) fn data_evolution_group_matches_predicates( sorted_files.sort_by(|a, b| b.max_sequence_number.cmp(&a.max_sequence_number)); // For each table field, find which file (index in sorted_files) provides it. - // The field index remains a table-field index so FileStatsRows can resolve - // it through its own schema-to-stats mapping. + // Use file_data_columns (based on write_cols) to determine which file contains + // the field, not file_stats_columns (based on value_stats_cols) which only + // indicates stats coverage. let field_sources: Vec> = table_fields .iter() .enumerate() .map(|(field_idx, field)| { for (file_idx, file) in sorted_files.iter().enumerate() { - let file_columns = file_stats_columns(file, table_fields); + let file_columns = file_data_columns(file, table_fields); for col_name in &file_columns { if *col_name == field.name() { return Some((file_idx, field_idx)); @@ -334,13 +335,10 @@ pub(super) fn data_evolution_group_matches_predicates( }) } -/// Resolve which columns a file's value stats cover. -/// If `value_stats_cols` is set, those are the stats columns. Otherwise, the file's stats -/// cover all table fields (or `write_cols` if present). -fn file_stats_columns<'a>(file: &'a DataFileMeta, table_fields: &'a [DataField]) -> Vec<&'a str> { - if let Some(cols) = &file.value_stats_cols { - return cols.iter().map(|s| s.as_str()).collect(); - } +/// Resolve which columns a file actually contains (for field source resolution). +/// Uses `write_cols` if present, otherwise assumes all table fields. +/// This is distinct from `file_stats_columns` which resolves stats coverage. +fn file_data_columns<'a>(file: &'a DataFileMeta, table_fields: &'a [DataField]) -> Vec<&'a str> { match &file.write_cols { Some(cols) => cols.iter().map(|s| s.as_str()).collect(), None => table_fields.iter().map(|f| f.name()).collect(), diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index e14a4b7..25b1d94 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -32,7 +32,7 @@ use crate::table::commit_message::CommitMessage; use crate::table::snapshot_commit::SnapshotCommit; use crate::table::{SnapshotManager, Table, TableScan}; use crate::Result; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; @@ -292,6 +292,12 @@ impl TableCommit { let mut next_row_id: Option = None; if self.row_tracking_enabled { commit_entries = self.assign_snapshot_id(new_snapshot_id, commit_entries); + + // Validate that files with pre-assigned first_row_id (from MERGE INTO) + // still align with the current snapshot's file layout. + self.validate_row_id_alignment(&commit_entries, latest_snapshot) + .await?; + let first_row_id_start = latest_snapshot .as_ref() .and_then(|s| s.next_row_id()) @@ -522,6 +528,90 @@ impl TableCommit { (result, start) } + /// Validate that files with pre-assigned `first_row_id` (e.g. partial-column + /// files from MERGE INTO) still match existing files in the current snapshot. + /// + /// When MERGE INTO and COMPACT run concurrently, compaction may rewrite the + /// original files that partial-column files reference. If the original file's + /// row ID range no longer exists, the partial-column files become invalid and + /// the commit must be rejected. + async fn validate_row_id_alignment( + &self, + commit_entries: &[ManifestEntry], + latest_snapshot: &Option, + ) -> Result<()> { + // Collect files that already have first_row_id assigned (pre-set by writer). + let files_to_check: Vec<_> = commit_entries + .iter() + .filter(|e| *e.kind() == FileKind::Add && e.file().first_row_id.is_some()) + .collect(); + + if files_to_check.is_empty() { + return Ok(()); + } + + let snap = match latest_snapshot { + Some(s) => s, + None => { + // No existing snapshot means no existing files — any pre-assigned + // first_row_id cannot match anything. + let entry = &files_to_check[0]; + return Err(crate::Error::DataInvalid { + message: format!( + "Row ID conflict: file '{}' has pre-assigned first_row_id={} \ + but no snapshot exists. The referenced files may have been removed \ + by a concurrent compaction.", + entry.file().file_name, + entry.file().first_row_id.unwrap(), + ), + source: None, + }); + } + }; + + // Read all current files from the latest snapshot. + let scan = TableScan::new(&self.table, None, vec![], None, None, None); + let existing_entries = scan.plan_manifest_entries(snap).await?; + + // Build index: (partition, bucket, first_row_id, row_count) + let existing_index: HashSet<(&[u8], i32, i64, i64)> = existing_entries + .iter() + .filter_map(|e| { + e.file() + .first_row_id + .map(|fid| (e.partition(), e.bucket(), fid, e.file().row_count)) + }) + .collect(); + + for entry in &files_to_check { + let fid = entry.file().first_row_id.unwrap(); + let key = ( + entry.partition(), + entry.bucket(), + fid, + entry.file().row_count, + ); + if !existing_index.contains(&key) { + return Err(crate::Error::DataInvalid { + message: format!( + "Row ID conflict: file '{}' references first_row_id={}, row_count={} \ + in partition/bucket ({}, {}), but no matching file exists in the \ + current snapshot. The referenced file may have been rewritten by a \ + concurrent compaction.", + entry.file().file_name, + fid, + entry.file().row_count, + entry.bucket(), + entry.file().row_count, + ), + source: None, + }); + } + } + + Ok(()) + } + /// Exponential backoff with jitter. async fn commit_retry_wait(&self, retry_count: u32) { let base_wait = self @@ -1044,6 +1134,135 @@ mod tests { builder.build_serialized() } + fn test_row_tracking_schema() -> TableSchema { + use crate::spec::{DataType, IntType, Schema, VarCharType}; + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("name", DataType::VarChar(VarCharType::string_type())) + .option("row-tracking.enabled", "true") + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn test_row_tracking_table(file_io: &FileIO, table_path: &str) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test_table"), + table_path.to_string(), + test_row_tracking_schema(), + None, + ) + } + + fn setup_row_tracking_commit(file_io: &FileIO, table_path: &str) -> TableCommit { + let table = test_row_tracking_table(file_io, table_path); + TableCommit::new(table, "test-user".to_string()) + } + + #[tokio::test] + async fn test_row_id_conflict_rejects_stale_partial_file() { + // Simulate: initial commit creates a file with row IDs 0-99, + // then a "partial-column" commit references row IDs 0-49 (wrong range) + // which should be rejected. + let file_io = test_file_io(); + let table_path = "memory:/test_row_id_conflict"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_row_tracking_commit(&file_io, table_path); + + // Step 1: Commit an initial file (row_count=100, first_row_id will be assigned as 0) + let mut initial_file = test_data_file("data-0.parquet", 100); + initial_file.file_source = Some(0); // APPEND + commit + .commit(vec![CommitMessage::new(vec![], 0, vec![initial_file])]) + .await + .unwrap(); + + // Verify snapshot has next_row_id = 100 + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.next_row_id(), Some(100)); + + // Step 2: Try to commit a partial-column file referencing row IDs 0-49 + // (wrong row_count — original file has 100 rows, not 50) + let mut partial_file = test_data_file("partial-0.parquet", 50); + partial_file.first_row_id = Some(0); + partial_file.file_source = Some(0); + partial_file.write_cols = Some(vec!["name".to_string()]); + + let result = commit + .commit(vec![CommitMessage::new(vec![], 0, vec![partial_file])]) + .await; + + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Row ID conflict"), + "Expected 'Row ID conflict' error, got: {err_msg}" + ); + } + + #[tokio::test] + async fn test_row_id_conflict_accepts_matching_partial_file() { + // Partial-column file with matching (first_row_id, row_count) should succeed. + let file_io = test_file_io(); + let table_path = "memory:/test_row_id_match"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_row_tracking_commit(&file_io, table_path); + + // Step 1: Commit initial file (100 rows, will get first_row_id=0) + let mut initial_file = test_data_file("data-0.parquet", 100); + initial_file.file_source = Some(0); + commit + .commit(vec![CommitMessage::new(vec![], 0, vec![initial_file])]) + .await + .unwrap(); + + // Step 2: Commit a partial-column file with matching range (0, 100) + let mut partial_file = test_data_file("partial-0.parquet", 100); + partial_file.first_row_id = Some(0); + partial_file.file_source = Some(0); + partial_file.write_cols = Some(vec!["name".to_string()]); + + commit + .commit(vec![CommitMessage::new(vec![], 0, vec![partial_file])]) + .await + .unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 2); + } + + #[tokio::test] + async fn test_row_id_conflict_no_snapshot_rejects() { + // Committing a file with pre-assigned first_row_id when no snapshot exists + // should be rejected. + let file_io = test_file_io(); + let table_path = "memory:/test_row_id_no_snap"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_row_tracking_commit(&file_io, table_path); + + let mut partial_file = test_data_file("partial-0.parquet", 100); + partial_file.first_row_id = Some(0); + partial_file.file_source = Some(0); + partial_file.write_cols = Some(vec!["name".to_string()]); + + let result = commit + .commit(vec![CommitMessage::new(vec![], 0, vec![partial_file])]) + .await; + + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Row ID conflict"), + "Expected 'Row ID conflict' error, got: {err_msg}" + ); + } + #[tokio::test] async fn test_overwrite_null_partition() { let file_io = test_file_io(); diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index 7d5252e..be0bcb5 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -20,22 +20,18 @@ //! Reference: [pypaimon TableWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/table_write.py) //! and [pypaimon FileStoreWrite](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/file_store_write.py) -use crate::arrow::format::{create_format_writer, FormatFileWriter}; -use crate::io::FileIO; -use crate::spec::stats::BinaryTableStats; use crate::spec::PartitionComputer; use crate::spec::{ - extract_datum_from_arrow, BinaryRow, BinaryRowBuilder, CoreOptions, DataField, DataFileMeta, - DataType, Datum, EMPTY_SERIALIZED_ROW, + extract_datum_from_arrow, BinaryRow, BinaryRowBuilder, CoreOptions, DataField, DataType, Datum, + EMPTY_SERIALIZED_ROW, }; use crate::table::commit_message::CommitMessage; +use crate::table::data_file_writer::DataFileWriter; use crate::table::Table; use crate::Result; use arrow_array::RecordBatch; -use chrono::Utc; use std::collections::HashMap; use std::sync::Arc; -use tokio::task::JoinSet; type PartitionBucketKey = (Vec, i32); @@ -73,11 +69,6 @@ impl TableWrite { message: "TableWrite does not support tables with primary keys".to_string(), }); } - if core_options.data_evolution_enabled() { - return Err(crate::Error::Unsupported { - message: "TableWrite does not support data-evolution.enabled mode".to_string(), - }); - } let total_buckets = core_options.bucket(); if total_buckets != -1 && core_options.bucket_key().is_none() { @@ -300,6 +291,9 @@ impl TableWrite { self.file_compression.clone(), self.file_compression_zstd_level, self.write_buffer_size, + Some(0), // file_source: APPEND + None, // first_row_id: assigned by commit + None, // write_cols: full-row write ); self.partition_writers @@ -308,211 +302,11 @@ impl TableWrite { } } -/// Internal writer that produces parquet data files for a single (partition, bucket). -/// -/// Batches are accumulated into a single `FormatFileWriter` that streams directly -/// to storage. Call `prepare_commit()` to finalize and collect file metadata. -struct DataFileWriter { - file_io: FileIO, - table_location: String, - partition_path: String, - bucket: i32, - schema_id: i64, - target_file_size: i64, - file_compression: String, - file_compression_zstd_level: i32, - write_buffer_size: i64, - written_files: Vec, - /// Background file close tasks spawned during rolling. - in_flight_closes: JoinSet>, - /// Current open format writer, lazily created on first write. - current_writer: Option>, - current_file_name: Option, - current_row_count: i64, -} - -impl DataFileWriter { - #[allow(clippy::too_many_arguments)] - fn new( - file_io: FileIO, - table_location: String, - partition_path: String, - bucket: i32, - schema_id: i64, - target_file_size: i64, - file_compression: String, - file_compression_zstd_level: i32, - write_buffer_size: i64, - ) -> Self { - Self { - file_io, - table_location, - partition_path, - bucket, - schema_id, - target_file_size, - file_compression, - file_compression_zstd_level, - write_buffer_size, - written_files: Vec::new(), - in_flight_closes: JoinSet::new(), - current_writer: None, - current_file_name: None, - current_row_count: 0, - } - } - - /// Write a RecordBatch. Rolls to a new file when target size is reached. - async fn write(&mut self, batch: &RecordBatch) -> Result<()> { - if batch.num_rows() == 0 { - return Ok(()); - } - - if self.current_writer.is_none() { - self.open_new_file(batch.schema()).await?; - } - - self.current_row_count += batch.num_rows() as i64; - self.current_writer.as_mut().unwrap().write(batch).await?; - - // Roll to a new file if target size is reached — close in background - if self.current_writer.as_ref().unwrap().num_bytes() as i64 >= self.target_file_size { - self.roll_file(); - } - - // Flush row group if in-progress buffer exceeds write_buffer_size - if let Some(w) = self.current_writer.as_mut() { - if w.in_progress_size() as i64 >= self.write_buffer_size { - w.flush().await?; - } - } - - Ok(()) - } - - async fn open_new_file(&mut self, schema: arrow_schema::SchemaRef) -> Result<()> { - let file_name = format!( - "data-{}-{}.parquet", - uuid::Uuid::new_v4(), - self.written_files.len() - ); - - let bucket_dir = if self.partition_path.is_empty() { - format!("{}/bucket-{}", self.table_location, self.bucket) - } else { - format!( - "{}/{}/bucket-{}", - self.table_location, self.partition_path, self.bucket - ) - }; - self.file_io.mkdirs(&format!("{bucket_dir}/")).await?; - - let file_path = format!("{}/{}", bucket_dir, file_name); - let output = self.file_io.new_output(&file_path)?; - let writer = create_format_writer( - &output, - schema, - &self.file_compression, - self.file_compression_zstd_level, - ) - .await?; - self.current_writer = Some(writer); - self.current_file_name = Some(file_name); - self.current_row_count = 0; - Ok(()) - } - - /// Close the current file writer and record the file metadata. - async fn close_current_file(&mut self) -> Result<()> { - let writer = match self.current_writer.take() { - Some(w) => w, - None => return Ok(()), - }; - let file_name = self.current_file_name.take().unwrap(); - - let row_count = self.current_row_count; - self.current_row_count = 0; - let file_size = writer.close().await? as i64; - - let meta = Self::build_meta(file_name, file_size, row_count, self.schema_id); - self.written_files.push(meta); - Ok(()) - } - - /// Spawn the current writer's close in the background for non-blocking rolling. - fn roll_file(&mut self) { - let writer = match self.current_writer.take() { - Some(w) => w, - None => return, - }; - let file_name = self.current_file_name.take().unwrap(); - let row_count = self.current_row_count; - self.current_row_count = 0; - let schema_id = self.schema_id; - - self.in_flight_closes.spawn(async move { - let file_size = writer.close().await? as i64; - Ok(Self::build_meta(file_name, file_size, row_count, schema_id)) - }); - } - - /// Close the current writer and return all written file metadata. - async fn prepare_commit(&mut self) -> Result> { - self.close_current_file().await?; - while let Some(result) = self.in_flight_closes.join_next().await { - let meta = result.map_err(|e| crate::Error::DataInvalid { - message: format!("Background file close task panicked: {e}"), - source: None, - })??; - self.written_files.push(meta); - } - Ok(std::mem::take(&mut self.written_files)) - } - - fn build_meta( - file_name: String, - file_size: i64, - row_count: i64, - schema_id: i64, - ) -> DataFileMeta { - DataFileMeta { - file_name, - file_size, - row_count, - min_key: EMPTY_SERIALIZED_ROW.clone(), - max_key: EMPTY_SERIALIZED_ROW.clone(), - key_stats: BinaryTableStats::new( - EMPTY_SERIALIZED_ROW.clone(), - EMPTY_SERIALIZED_ROW.clone(), - vec![], - ), - value_stats: BinaryTableStats::new( - EMPTY_SERIALIZED_ROW.clone(), - EMPTY_SERIALIZED_ROW.clone(), - vec![], - ), - min_sequence_number: 0, - max_sequence_number: 0, - schema_id, - level: 0, - extra_files: vec![], - creation_time: Some(Utc::now()), - delete_row_count: Some(0), - embedded_index: None, - file_source: Some(0), // APPEND - value_stats_cols: Some(vec![]), - external_path: None, - first_row_id: None, - write_cols: None, - } - } -} - #[cfg(test)] mod tests { use super::*; use crate::catalog::Identifier; - use crate::io::FileIOBuilder; + use crate::io::{FileIO, FileIOBuilder}; use crate::spec::{ DataType, DecimalType, IntType, LocalZonedTimestampType, Schema, TableSchema, TimestampType, VarCharType,