diff --git a/crates/paimon/src/spec/avro/manifest_entry_decode.rs b/crates/paimon/src/spec/avro/manifest_entry_decode.rs index 1d3ffcfa..df251bd3 100644 --- a/crates/paimon/src/spec/avro/manifest_entry_decode.rs +++ b/crates/paimon/src/spec/avro/manifest_entry_decode.rs @@ -270,8 +270,8 @@ fn decode_data_file_meta( row_count: row_count.unwrap_or(0), min_key: min_key.unwrap_or_default(), max_key: max_key.unwrap_or_default(), - key_stats: key_stats.unwrap_or_else(|| BinaryTableStats::new(vec![], vec![], vec![])), - value_stats: value_stats.unwrap_or_else(|| BinaryTableStats::new(vec![], vec![], vec![])), + key_stats: key_stats.unwrap_or_else(BinaryTableStats::empty), + value_stats: value_stats.unwrap_or_else(BinaryTableStats::empty), min_sequence_number: min_sequence_number.unwrap_or(0), max_sequence_number: max_sequence_number.unwrap_or(0), schema_id: schema_id.unwrap_or(0), @@ -391,8 +391,8 @@ fn default_data_file_meta() -> DataFileMeta { row_count: 0, min_key: vec![], max_key: vec![], - key_stats: BinaryTableStats::new(vec![], vec![], vec![]), - value_stats: BinaryTableStats::new(vec![], vec![], vec![]), + key_stats: BinaryTableStats::empty(), + value_stats: BinaryTableStats::empty(), min_sequence_number: 0, max_sequence_number: 0, schema_id: 0, diff --git a/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs b/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs index 7b30c52c..fa5cd09e 100644 --- a/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs +++ b/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs @@ -72,7 +72,7 @@ impl AvroRecordDecode for ManifestFileMeta { file_size.unwrap_or(0), num_added_files.unwrap_or(0), num_deleted_files.unwrap_or(0), - partition_stats.unwrap_or_else(|| BinaryTableStats::new(vec![], vec![], vec![])), + partition_stats.unwrap_or_else(BinaryTableStats::empty), schema_id.unwrap_or(0), min_bucket, max_bucket, diff --git a/crates/paimon/src/spec/manifest.rs b/crates/paimon/src/spec/manifest.rs index cd16133b..bebd09e7 100644 --- a/crates/paimon/src/spec/manifest.rs +++ b/crates/paimon/src/spec/manifest.rs @@ -125,7 +125,7 @@ mod tests { use crate::spec::ManifestEntry; fn entry(kind: FileKind, file_name: &str, level: i32) -> ManifestEntry { - let stats = BinaryTableStats::new(vec![], vec![], vec![]); + let stats = BinaryTableStats::empty(); let file = DataFileMeta { file_name: file_name.to_string(), file_size: 100, diff --git a/crates/paimon/src/spec/stats.rs b/crates/paimon/src/spec/stats.rs index 2c389a39..4ae9b243 100644 --- a/crates/paimon/src/spec/stats.rs +++ b/crates/paimon/src/spec/stats.rs @@ -18,7 +18,7 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::fmt::{Display, Formatter}; -use super::{extract_datum_from_arrow, BinaryRowBuilder, DataType, Datum}; +use super::{extract_datum_from_arrow, BinaryRowBuilder, DataType, Datum, EMPTY_SERIALIZED_ROW}; use arrow_array::RecordBatch; /// Deserialize `_NULL_COUNTS` which in Avro is `["null", {"type":"array","items":["null","long"]}]`. @@ -94,6 +94,21 @@ impl BinaryTableStats { null_counts, } } + + /// Stats with empty (arity=0) BinaryRow bytes for min/max and no null counts. + /// + /// Use this whenever there are no columns to collect stats for (e.g. a non-partitioned + /// table's `partition_stats`, or a writer producing no key/value stats columns). Writing + /// `Vec::new()` here breaks the Java reader: `SerializationUtils.deserializeBinaryRow` + /// requires at least the 4-byte BE arity prefix and throws `BufferUnderflowException` on + /// zero-length input. + pub fn empty() -> BinaryTableStats { + Self { + min_values: EMPTY_SERIALIZED_ROW.clone(), + max_values: EMPTY_SERIALIZED_ROW.clone(), + null_counts: Vec::new(), + } + } } impl Display for BinaryTableStats { @@ -154,3 +169,38 @@ pub fn compute_column_stats( null_counts, )) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::spec::BinaryRow; + + /// Empty stats must produce min/max bytes that the Java side's + /// `SerializationUtils.deserializeBinaryRow` accepts: at minimum a 4-byte BE + /// arity prefix. A bare `Vec::new()` would trigger `BufferUnderflowException` + /// when Spark/Flink read manifests written for a non-partitioned table. + #[test] + fn empty_stats_carries_arity_prefix_parseable_by_reader() { + let stats = BinaryTableStats::empty(); + assert!( + stats.min_values().len() >= 4, + "min_values must contain at least the 4-byte arity prefix" + ); + assert!( + stats.max_values().len() >= 4, + "max_values must contain at least the 4-byte arity prefix" + ); + assert!( + stats.null_counts().is_empty(), + "null_counts stays empty so the Java reader short-circuits to EMPTY_STATS" + ); + + // Round-trip through the same parser the Java reader uses (4-byte BE arity). + let min_row = BinaryRow::from_serialized_bytes(stats.min_values()) + .expect("min_values must decode as a BinaryRow"); + let max_row = BinaryRow::from_serialized_bytes(stats.max_values()) + .expect("max_values must decode as a BinaryRow"); + assert_eq!(min_row.arity(), 0); + assert_eq!(max_row.arity(), 0); + } +} diff --git a/crates/paimon/src/table/data_evolution_writer.rs b/crates/paimon/src/table/data_evolution_writer.rs index fe9e2348..f63ac24e 100644 --- a/crates/paimon/src/table/data_evolution_writer.rs +++ b/crates/paimon/src/table/data_evolution_writer.rs @@ -670,7 +670,7 @@ mod tests { write_cols: Option>, ) -> DataFileMeta { use crate::spec::stats::BinaryTableStats; - let empty_stats = BinaryTableStats::new(vec![], vec![], vec![]); + let empty_stats = BinaryTableStats::empty(); DataFileMeta { file_name: file_name.to_string(), file_size: 0, diff --git a/crates/paimon/src/table/referenced_files.rs b/crates/paimon/src/table/referenced_files.rs index a170b3c8..755b00f0 100644 --- a/crates/paimon/src/table/referenced_files.rs +++ b/crates/paimon/src/table/referenced_files.rs @@ -1063,8 +1063,8 @@ mod tests { row_count: 100, min_key: vec![], max_key: vec![], - key_stats: BinaryTableStats::new(vec![], vec![], vec![]), - value_stats: BinaryTableStats::new(vec![], vec![], vec![]), + key_stats: BinaryTableStats::empty(), + value_stats: BinaryTableStats::empty(), min_sequence_number: 0, max_sequence_number: 0, schema_id: 0, diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index 274dd963..27ade425 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -24,9 +24,9 @@ use crate::io::FileIO; use crate::spec::stats::BinaryTableStats; use crate::spec::FileKind; use crate::spec::{ - datums_to_binary_row, extract_datum, BinaryRow, CommitKind, CoreOptions, DataType, Datum, - IndexManifest, IndexManifestEntry, Manifest, ManifestEntry, ManifestFileMeta, ManifestList, - PartitionStatistics, Snapshot, + datums_to_binary_row, extract_datum, BinaryRow, BinaryRowBuilder, CommitKind, CoreOptions, + DataType, Datum, IndexManifest, IndexManifestEntry, Manifest, ManifestEntry, ManifestFileMeta, + ManifestList, PartitionStatistics, Snapshot, }; use crate::table::commit_message::CommitMessage; use crate::table::partition_filter::PartitionFilter; @@ -932,7 +932,7 @@ impl TableCommit { let num_fields = partition_fields.len(); if num_fields == 0 || entries.is_empty() { - return Ok(BinaryTableStats::new(vec![], vec![], vec![])); + return Ok(BinaryTableStats::empty()); } let data_types: Vec<_> = partition_fields @@ -970,11 +970,8 @@ impl TableCommit { } } - let min_datums: Vec<_> = mins.iter().zip(data_types.iter()).collect(); - let max_datums: Vec<_> = maxs.iter().zip(data_types.iter()).collect(); - - let min_bytes = datums_to_binary_row(&min_datums); - let max_bytes = datums_to_binary_row(&max_datums); + let min_bytes = build_partition_stats_row(&mins, &data_types); + let max_bytes = build_partition_stats_row(&maxs, &data_types); let null_counts = null_counts.into_iter().map(Some).collect(); Ok(BinaryTableStats::new(min_bytes, max_bytes, null_counts)) @@ -1127,6 +1124,20 @@ impl TableCommit { } } +/// Serialized BinaryRow for partition stats; unlike `datums_to_binary_row`, returns a +/// valid arity-N row even when every datum is `None` (the all-null case must still +/// decode on the Java side). +fn build_partition_stats_row(datums: &[Option], data_types: &[DataType]) -> Vec { + let mut builder = BinaryRowBuilder::new(datums.len() as i32); + for (pos, (datum_opt, data_type)) in datums.iter().zip(data_types.iter()).enumerate() { + match datum_opt { + Some(d) => builder.write_datum(pos, d, data_type), + None => builder.set_null_at(pos), + } + } + builder.build_serialized() +} + /// Plan for resolving commit entries. enum CommitEntriesPlan { /// Caller-provided entries. May contain `FileKind::Delete` entries from CoW @@ -1234,8 +1245,8 @@ mod tests { row_count, min_key: vec![], max_key: vec![], - key_stats: BinaryTableStats::new(vec![], vec![], vec![]), - value_stats: BinaryTableStats::new(vec![], vec![], vec![]), + key_stats: BinaryTableStats::empty(), + value_stats: BinaryTableStats::empty(), min_sequence_number: 0, max_sequence_number: 0, schema_id: 0, @@ -1837,6 +1848,75 @@ mod tests { ); } + /// Regression: a non-partitioned table (e.g. `CREATE TABLE test_pk (... PRIMARY KEY ...)`) + /// must still emit `_PARTITION_STATS._MIN_VALUES`/`_MAX_VALUES` carrying the 4-byte BE + /// arity prefix; otherwise Java readers like Spark/Flink hit + /// `BufferUnderflowException` inside `SerializationUtils.deserializeBinaryRow`. + #[test] + fn compute_partition_stats_no_partition_fields_returns_decodable_empty() { + let file_io = test_file_io(); + let commit = setup_commit(&file_io, "memory:/test_no_partition_stats"); + + let entry = ManifestEntry::new( + FileKind::Add, + vec![], + 0, + 1, + test_data_file("data-0.parquet", 1), + 2, + ); + + let stats = commit.compute_partition_stats(&[entry]).unwrap(); + BinaryRow::from_serialized_bytes(stats.min_values()) + .expect("min_values must decode via the same protocol as Java's deserializeBinaryRow"); + BinaryRow::from_serialized_bytes(stats.max_values()) + .expect("max_values must decode via the same protocol as Java's deserializeBinaryRow"); + assert!(stats.null_counts().is_empty()); + } + + /// Regression: when there are no entries at all, the empty stats we return must also + /// satisfy the protocol — same Java reader path runs on it. + #[test] + fn compute_partition_stats_empty_entries_returns_decodable_empty() { + let file_io = test_file_io(); + let commit = setup_partitioned_commit(&file_io, "memory:/test_no_entries_stats"); + + let stats = commit.compute_partition_stats(&[]).unwrap(); + BinaryRow::from_serialized_bytes(stats.min_values()).unwrap(); + BinaryRow::from_serialized_bytes(stats.max_values()).unwrap(); + assert!(stats.null_counts().is_empty()); + } + + /// Regression: partitioned table with an all-null partition row must still emit + /// decodable min/max bytes (otherwise Java hits `BufferUnderflowException`). + #[test] + fn compute_partition_stats_all_null_partition_values_returns_decodable_bytes() { + let file_io = test_file_io(); + let commit = setup_partitioned_commit(&file_io, "memory:/test_all_null_partition_stats"); + + let mut builder = BinaryRowBuilder::new(1); + builder.set_null_at(0); + let null_partition = builder.build_serialized(); + + let entry = ManifestEntry::new( + FileKind::Add, + null_partition, + 0, + 1, + test_data_file("data-null-pt.parquet", 1), + 2, + ); + + let stats = commit.compute_partition_stats(&[entry]).unwrap(); + let min_row = BinaryRow::from_serialized_bytes(stats.min_values()).unwrap(); + let max_row = BinaryRow::from_serialized_bytes(stats.max_values()).unwrap(); + assert_eq!(min_row.arity(), 1); + assert_eq!(max_row.arity(), 1); + assert!(min_row.is_null_at(0)); + assert!(max_row.is_null_at(0)); + assert_eq!(stats.null_counts(), &vec![Some(1)]); + } + /// `write_manifest_file` must aggregate min/max bucket and level across entries so the /// Java reader can prune manifests by bucket / level (see apache/paimon#5345). This /// drives a real commit so all the call-site plumbing is exercised end to end.