From 49d92a805b0cc1650393e384fc2492f42bf0e0d2 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Thu, 28 May 2026 23:04:28 +0800 Subject: [PATCH 1/3] [python] Rename DataBlobWriter to DedicatedFormatWriter and support blob+vector splitting DataBlobWriter now handles normal + blob + vector columns (matching Java's DedicatedFormatRollingFileWriter), so rename it to DedicatedFormatWriter. Also add data evolution format tests covering parquet, blob, and vector paths. Co-Authored-By: Claude Opus 4.6 --- paimon-python/pypaimon/read/table_read.py | 4 +- .../pypaimon/tests/blob_table_test.py | 104 +- .../tests/data_evolution_formats_test.py | 998 ++++++++++++++++++ .../pypaimon/write/file_store_write.py | 4 +- ...b_writer.py => dedicated_format_writer.py} | 137 ++- 5 files changed, 1135 insertions(+), 112 deletions(-) create mode 100644 paimon-python/pypaimon/tests/data_evolution_formats_test.py rename paimon-python/pypaimon/write/writer/{data_blob_writer.py => dedicated_format_writer.py} (78%) diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 52a4eaaa7f1a..7f867e4f519a 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -135,7 +135,7 @@ def _add_row_kind_to_schema(schema: pyarrow.Schema) -> pyarrow.Schema: @staticmethod def _try_to_pad_batch_by_schema(batch: pyarrow.RecordBatch, target_schema): - if batch.schema.names == target_schema.names: + if batch.schema.equals(target_schema): return batch columns = [] @@ -144,6 +144,8 @@ def _try_to_pad_batch_by_schema(batch: pyarrow.RecordBatch, target_schema): for field in target_schema: if field.name in batch.schema.names: col = batch.column(field.name) + if not col.type.equals(field.type): + col = col.cast(field.type) else: col = pyarrow.nulls(num_rows, type=field.type) columns.append(col) diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index 7261503450f2..042c47699a79 100755 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -29,8 +29,8 @@ from pypaimon.write.commit_message import CommitMessage -class DataBlobWriterTest(unittest.TestCase): - """Tests for DataBlobWriter functionality with paimon table operations.""" +class DedicatedFormatWriterTest(unittest.TestCase): + """Tests for DedicatedFormatWriter functionality with paimon table operations.""" @classmethod def setUpClass(cls): @@ -51,8 +51,8 @@ def tearDownClass(cls): except OSError: pass - def test_data_blob_writer_basic_functionality(self): - """Test basic DataBlobWriter functionality with paimon table.""" + def test_dedicated_format_writer_basic_functionality(self): + """Test basic DedicatedFormatWriter functionality with paimon table.""" from pypaimon import Schema # Create schema with normal and blob columns @@ -82,7 +82,7 @@ def test_data_blob_writer_basic_functionality(self): 'blob_data': [b'blob_data_1', b'blob_data_2', b'blob_data_3'] }, schema=pa_schema) - # Test DataBlobWriter initialization using proper table API + # Test DedicatedFormatWriter initialization using proper table API # Use proper table API to create writer write_builder = table.new_batch_write_builder() blob_writer = write_builder.new_write() @@ -108,8 +108,8 @@ def test_data_blob_writer_basic_functionality(self): blob_writer.close() - def test_data_blob_writer_schema_detection(self): - """Test that DataBlobWriter correctly detects blob columns from schema.""" + def test_dedicated_format_writer_schema_detection(self): + """Test that DedicatedFormatWriter correctly detects blob columns from schema.""" from pypaimon import Schema # Test schema with blob column @@ -132,7 +132,7 @@ def test_data_blob_writer_schema_detection(self): write_builder = table.new_batch_write_builder() blob_writer = write_builder.new_write() - # Test that DataBlobWriter was created internally + # Test that DedicatedFormatWriter was created internally # We can verify this by checking the internal data writers test_data = pa.Table.from_pydict({ 'id': [1, 2, 3], @@ -142,19 +142,19 @@ def test_data_blob_writer_schema_detection(self): # Write data to trigger writer creation blob_writer.write_arrow(test_data) - # Verify that a DataBlobWriter was created internally + # Verify that a DedicatedFormatWriter was created internally data_writers = blob_writer.file_store_write.data_writers self.assertGreater(len(data_writers), 0) - # Check that the writer is a DataBlobWriter + # Check that the writer is a DedicatedFormatWriter for writer in data_writers.values(): - from pypaimon.write.writer.data_blob_writer import DataBlobWriter - self.assertIsInstance(writer, DataBlobWriter) + from pypaimon.write.writer.dedicated_format_writer import DedicatedFormatWriter + self.assertIsInstance(writer, DedicatedFormatWriter) blob_writer.close() - def test_data_blob_writer_no_blob_column(self): - """Test that DataBlobWriter raises error when no blob column is found.""" + def test_dedicated_format_writer_no_blob_column(self): + """Test that DedicatedFormatWriter raises error when no blob column is found.""" from pypaimon import Schema # Test schema without blob column @@ -177,7 +177,7 @@ def test_data_blob_writer_no_blob_column(self): write_builder = table.new_batch_write_builder() writer = write_builder.new_write() - # Test that a regular writer (not DataBlobWriter) was created + # Test that a regular writer (not DedicatedFormatWriter) was created test_data = pa.Table.from_pydict({ 'id': [1, 2, 3], 'name': ['Alice', 'Bob', 'Charlie'] @@ -186,19 +186,19 @@ def test_data_blob_writer_no_blob_column(self): # Write data to trigger writer creation writer.write_arrow(test_data) - # Verify that a regular writer was created (not DataBlobWriter) + # Verify that a regular writer was created (not DedicatedFormatWriter) data_writers = writer.file_store_write.data_writers self.assertGreater(len(data_writers), 0) - # Check that the writer is NOT a DataBlobWriter + # Check that the writer is NOT a DedicatedFormatWriter for writer_instance in data_writers.values(): - from pypaimon.write.writer.data_blob_writer import DataBlobWriter - self.assertNotIsInstance(writer_instance, DataBlobWriter) + from pypaimon.write.writer.dedicated_format_writer import DedicatedFormatWriter + self.assertNotIsInstance(writer_instance, DedicatedFormatWriter) writer.close() - def test_data_blob_writer_multiple_blob_columns(self): - """Test that DataBlobWriter supports multiple blob columns.""" + def test_dedicated_format_writer_multiple_blob_columns(self): + """Test that DedicatedFormatWriter supports multiple blob columns.""" from pypaimon import Schema # Test schema with multiple blob columns @@ -242,7 +242,7 @@ def test_data_blob_writer_multiple_blob_columns(self): result = table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits()) self.assertEqual(result.num_rows, 3) - def test_data_blob_writer_partial_write_with_write_type(self): + def test_dedicated_format_writer_partial_write_with_write_type(self): """Partial write (normal + blob subset) via with_write_type: split must match batch columns.""" from pypaimon import Schema @@ -292,7 +292,7 @@ def test_data_blob_writer_partial_write_with_write_type(self): self.assertEqual(out.column('blob_data').to_pylist(), [b'a', b'b']) self.assertEqual(out.column('name').to_pylist(), [None, None]) - def test_data_blob_writer_partial_write_normal_only_with_write_type(self): + def test_dedicated_format_writer_partial_write_normal_only_with_write_type(self): """Partial write without blob columns in write_cols must not touch blob split paths.""" from pypaimon import Schema @@ -333,7 +333,7 @@ def test_data_blob_writer_partial_write_normal_only_with_write_type(self): self.assertEqual(out.column('name').to_pylist(), ['n']) self.assertEqual(out.column('blob_data').to_pylist(), [None]) - def test_data_blob_writer_partial_write_single_blob_of_two_with_write_type(self): + def test_dedicated_format_writer_partial_write_single_blob_of_two_with_write_type(self): """with_write_type lists only one blob column: only that column gets .blob files.""" from pypaimon import Schema @@ -369,8 +369,8 @@ def test_data_blob_writer_partial_write_single_blob_of_two_with_write_type(self) write_builder.new_commit().commit(commit_messages) writer.close() - def test_data_blob_writer_write_operations(self): - """Test DataBlobWriter write operations with real data.""" + def test_dedicated_format_writer_write_operations(self): + """Test DedicatedFormatWriter write operations with real data.""" from pypaimon import Schema # Create schema with blob column @@ -411,8 +411,8 @@ def test_data_blob_writer_write_operations(self): blob_writer.close() - def test_data_blob_writer_write_large_blob(self): - """Test DataBlobWriter with large blob data (5MB per item) in 10 batches.""" + def test_dedicated_format_writer_write_large_blob(self): + """Test DedicatedFormatWriter with very large blob data (50MB per item) in 10 batches.""" from pypaimon import Schema # Create schema with blob column @@ -467,7 +467,7 @@ def test_data_blob_writer_write_large_blob(self): # Log progress for large data processing print(f"Completed batch {batch_num + 1}/10 with {batch.num_rows} rows") - # Record count is tracked internally by DataBlobWriter + # Record count is tracked internally by DedicatedFormatWriter # Test prepare commit commit_messages: CommitMessage = blob_writer.prepare_commit() @@ -511,8 +511,8 @@ def test_data_blob_writer_write_large_blob(self): blob_writer.close() - def test_data_blob_writer_abort_functionality(self): - """Test DataBlobWriter abort functionality.""" + def test_dedicated_format_writer_abort_functionality(self): + """Test DedicatedFormatWriter abort functionality.""" from pypaimon import Schema # Create schema with blob column @@ -546,12 +546,12 @@ def test_data_blob_writer_abort_functionality(self): blob_writer.write_arrow_batch(batch) # Test abort - BatchTableWrite doesn't have abort method - # The abort functionality is handled internally by DataBlobWriter + # The abort functionality is handled internally by DedicatedFormatWriter blob_writer.close() - def test_data_blob_writer_multiple_batches(self): - """Test DataBlobWriter with multiple batches and verify results.""" + def test_dedicated_format_writer_multiple_batches(self): + """Test DedicatedFormatWriter with multiple batches and verify results.""" from pypaimon import Schema # Create schema with blob column @@ -608,7 +608,7 @@ def test_data_blob_writer_multiple_batches(self): blob_writer.write_arrow_batch(batch) total_rows += batch.num_rows - # Record count is tracked internally by DataBlobWriter + # Record count is tracked internally by DedicatedFormatWriter # Test prepare commit commit_messages = blob_writer.prepare_commit() @@ -619,8 +619,8 @@ def test_data_blob_writer_multiple_batches(self): blob_writer.close() - def test_data_blob_writer_large_batches(self): - """Test DataBlobWriter with large batches to test rolling behavior.""" + def test_dedicated_format_writer_large_batches(self): + """Test DedicatedFormatWriter with large batches to test rolling behavior.""" from pypaimon import Schema # Create schema with blob column @@ -671,7 +671,7 @@ def test_data_blob_writer_large_batches(self): blob_writer.write_arrow_batch(batch) total_rows += batch.num_rows - # Record count is tracked internally by DataBlobWriter + # Record count is tracked internally by DedicatedFormatWriter # Test prepare commit commit_messages = blob_writer.prepare_commit() @@ -682,8 +682,8 @@ def test_data_blob_writer_large_batches(self): blob_writer.close() - def test_data_blob_writer_mixed_data_types(self): - """Test DataBlobWriter with mixed data types in blob column.""" + def test_dedicated_format_writer_mixed_data_types(self): + """Test DedicatedFormatWriter with mixed data types in blob column.""" from pypaimon import Schema # Create schema with blob column @@ -726,7 +726,7 @@ def test_data_blob_writer_mixed_data_types(self): blob_writer.write_arrow_batch(batch) total_rows += batch.num_rows - # Record count is tracked internally by DataBlobWriter + # Record count is tracked internally by DedicatedFormatWriter # Test prepare commit commit_messages = blob_writer.prepare_commit() @@ -786,8 +786,8 @@ def test_data_blob_writer_mixed_data_types(self): self.assertEqual(result_type, original_type, f"Row {i + 1}: Type should match") self.assertEqual(result_data, original_data, f"Row {i + 1}: Blob data should match") - def test_data_blob_writer_empty_batches(self): - """Test DataBlobWriter with empty batches.""" + def test_dedicated_format_writer_empty_batches(self): + """Test DedicatedFormatWriter with empty batches.""" from pypaimon import Schema # Create schema with blob column @@ -842,8 +842,8 @@ def test_data_blob_writer_empty_batches(self): total_rows += batch.num_rows # Verify record count (empty batch should not affect count) - # Record count is tracked internally by DataBlobWriter - # Record count is tracked internally by DataBlobWriter + # Record count is tracked internally by DedicatedFormatWriter + # Record count is tracked internally by DedicatedFormatWriter # Test prepare commit commit_messages = blob_writer.prepare_commit() @@ -851,8 +851,8 @@ def test_data_blob_writer_empty_batches(self): blob_writer.close() - def test_data_blob_writer_rolling_behavior(self): - """Test DataBlobWriter rolling behavior with multiple commits.""" + def test_dedicated_format_writer_rolling_behavior(self): + """Test DedicatedFormatWriter rolling behavior with multiple commits.""" from pypaimon import Schema # Create schema with blob column @@ -892,7 +892,7 @@ def test_data_blob_writer_rolling_behavior(self): blob_writer.write_arrow_batch(batch) # Verify total record count - # Record count is tracked internally by DataBlobWriter + # Record count is tracked internally by DedicatedFormatWriter # Test prepare commit commit_messages = blob_writer.prepare_commit() @@ -2714,8 +2714,8 @@ def test_blob_large_data_volume_with_shard(self): actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id') self.assertEqual(actual, expected) - def test_data_blob_writer_with_slice(self): - """Test DataBlobWriter with mixed data types in blob column.""" + def test_dedicated_format_writer_with_slice(self): + """Test DedicatedFormatWriter with mixed data types in blob column.""" # Create schema with blob column pa_schema = pa.schema([ @@ -2776,8 +2776,8 @@ def test_data_blob_writer_with_slice(self): self.assertEqual(result.num_columns, 3, "Should have 3 columns") self.assertEqual(result["id"].unique().to_pylist(), [2, 3], "Get incorrect column ID") - def test_data_blob_writer_with_shard(self): - """Test DataBlobWriter with mixed data types in blob column.""" + def test_dedicated_format_writer_with_shard(self): + """Test DedicatedFormatWriter with mixed data types in blob column.""" # Create schema with blob column pa_schema = pa.schema([ diff --git a/paimon-python/pypaimon/tests/data_evolution_formats_test.py b/paimon-python/pypaimon/tests/data_evolution_formats_test.py new file mode 100644 index 000000000000..406a8feedddd --- /dev/null +++ b/paimon-python/pypaimon/tests/data_evolution_formats_test.py @@ -0,0 +1,998 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Data evolution tests covering parquet + blob + vector (vortex) formats. + +Each test writes data using different file format combinations and reads it +back, verifying correctness of the data evolution merge path across formats. +""" + +import os +import shutil +import sys +import tempfile +import unittest + +import pyarrow as pa + +from pypaimon import CatalogFactory, Schema +from pypaimon.manifest.schema.data_file_meta import DataFileMeta + + +class DataEvolutionFormatsTest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) + cls.catalog.create_database('default', False) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + # ------------------------------------------------------------------ + # Parquet-format data evolution + # ------------------------------------------------------------------ + + def test_parquet_column_subset_write_and_merge_read(self): + """Write disjoint column subsets as parquet, merge-read via data evolution.""" + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('score', pa.float64()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'parquet', + }) + self.catalog.create_table('default.fmt_parquet_subset', schema, False) + table = self.catalog.get_table('default.fmt_parquet_subset') + wb = table.new_batch_write_builder() + + # commit 1: write id + name + w0 = wb.new_write().with_write_type(['id', 'name']) + w1 = wb.new_write().with_write_type(['score']) + c = wb.new_commit() + w0.write_arrow(pa.Table.from_pydict( + {'id': [1, 2, 3], 'name': ['a', 'b', 'c']}, + schema=pa.schema([('id', pa.int32()), ('name', pa.string())]))) + w1.write_arrow(pa.Table.from_pydict( + {'score': [1.1, 2.2, 3.3]}, + schema=pa.schema([('score', pa.float64())]))) + cmts = w0.prepare_commit() + w1.prepare_commit() + for m in cmts: + for nf in m.new_files: + nf.first_row_id = 0 + c.commit(cmts) + w0.close() + w1.close() + c.close() + + # verify file format + all_files = [nf for m in cmts for nf in m.new_files] + for f in all_files: + self.assertTrue(f.file_name.endswith('.parquet'), + f"Expected parquet file, got {f.file_name}") + + # read back + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + expect = pa.Table.from_pydict( + {'id': [1, 2, 3], 'name': ['a', 'b', 'c'], 'score': [1.1, 2.2, 3.3]}, + schema=pa_schema) + self.assertEqual(actual, expect) + + def test_parquet_overwrite_column(self): + """Write all columns, then overwrite one column via a second commit.""" + pa_schema = pa.schema([ + ('k', pa.int64()), + ('v', pa.string()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'parquet', + }) + self.catalog.create_table('default.fmt_parquet_overwrite', schema, False) + table = self.catalog.get_table('default.fmt_parquet_overwrite') + wb = table.new_batch_write_builder() + + # commit 1: full row + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict( + {'k': [10, 20], 'v': ['old1', 'old2']}, schema=pa_schema)) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + # commit 2: overwrite v only (first_row_id=0) + tw = wb.new_write().with_write_type(['v']) + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict( + {'v': ['new1', 'new2']}, schema=pa.schema([('v', pa.string())]))) + cmts = tw.prepare_commit() + for m in cmts: + for nf in m.new_files: + nf.first_row_id = 0 + tc.commit(cmts) + tw.close() + tc.close() + + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + expect = pa.Table.from_pydict( + {'k': [10, 20], 'v': ['new1', 'new2']}, schema=pa_schema) + self.assertEqual(actual, expect) + + def test_parquet_append_new_rows(self): + """Append new rows (new first_row_id) with column subsets, merge-read all.""" + pa_schema = pa.schema([ + ('a', pa.int32()), + ('b', pa.string()), + ('c', pa.float32()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'parquet', + }) + self.catalog.create_table('default.fmt_parquet_append', schema, False) + table = self.catalog.get_table('default.fmt_parquet_append') + wb = table.new_batch_write_builder() + + # commit 1: 2 full rows + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict( + {'a': [1, 2], 'b': ['x', 'y'], 'c': [0.1, 0.2]}, schema=pa_schema)) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + # commit 2: append 2 new rows with column subsets, first_row_id=2 + w_ab = wb.new_write().with_write_type(['a', 'b']) + w_c = wb.new_write().with_write_type(['c']) + tc = wb.new_commit() + w_ab.write_arrow(pa.Table.from_pydict( + {'a': [3, 4], 'b': ['z', 'w']}, + schema=pa.schema([('a', pa.int32()), ('b', pa.string())]))) + w_c.write_arrow(pa.Table.from_pydict( + {'c': [0.3, 0.4]}, + schema=pa.schema([('c', pa.float32())]))) + cmts = w_ab.prepare_commit() + w_c.prepare_commit() + for m in cmts: + for nf in m.new_files: + nf.first_row_id = 2 + tc.commit(cmts) + w_ab.close() + w_c.close() + tc.close() + + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual.num_rows, 4) + expect = pa.Table.from_pydict( + {'a': [1, 2, 3, 4], 'b': ['x', 'y', 'z', 'w'], + 'c': [0.1, 0.2, 0.3, 0.4]}, + schema=pa_schema) + self.assertEqual(actual, expect) + + # ------------------------------------------------------------------ + # Blob-format data evolution + # ------------------------------------------------------------------ + + def test_blob_write_and_read(self): + """Write a table with normal + blob columns, read back and verify.""" + pa_schema = pa.schema([ + ('id', pa.int32()), + ('payload', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }) + self.catalog.create_table('default.fmt_blob_basic', schema, False) + table = self.catalog.get_table('default.fmt_blob_basic') + wb = table.new_batch_write_builder() + + blobs = [b'hello world', b'\x00\x01\x02\xff', b'paimon blob'] + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict( + {'id': [1, 2, 3], 'payload': blobs}, schema=pa_schema)) + cmts = tw.prepare_commit() + tc.commit(cmts) + tw.close() + tc.close() + + # verify we produced both parquet and blob files + all_files = [nf for m in cmts for nf in m.new_files] + parquet_files = [f for f in all_files if f.file_name.endswith('.parquet')] + blob_files = [f for f in all_files if f.file_name.endswith('.blob')] + self.assertGreater(len(parquet_files), 0) + self.assertGreater(len(blob_files), 0) + + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual.num_rows, 3) + self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3]) + self.assertEqual(actual.column('payload').to_pylist(), blobs) + + def test_blob_column_subset_evolution(self): + """Write normal+blob cols in one commit, overwrite normal col in another, merge-read. + + Note: writing blob-only subsets (with_write_type containing only blob columns + and no normal columns) is not supported by DedicatedFormatWriter. This test writes + blob alongside a normal column, then uses data evolution to overwrite the + normal column separately. + """ + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('doc', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }) + self.catalog.create_table('default.fmt_blob_evolution', schema, False) + table = self.catalog.get_table('default.fmt_blob_evolution') + wb = table.new_batch_write_builder() + + # commit 1: write id + doc (normal + blob together) + tw = wb.new_write().with_write_type(['id', 'doc']) + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict( + {'id': [1, 2], 'doc': [b'doc_alice', b'doc_bob']}, + schema=pa.schema([('id', pa.int32()), ('doc', pa.large_binary())]))) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + # commit 2: write name for the same rows (first_row_id=0) + tw = wb.new_write().with_write_type(['name']) + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict( + {'name': ['Alice', 'Bob']}, + schema=pa.schema([('name', pa.string())]))) + cmts = tw.prepare_commit() + for m in cmts: + for nf in m.new_files: + nf.first_row_id = 0 + tc.commit(cmts) + tw.close() + tc.close() + + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual.num_rows, 2) + self.assertEqual(actual.column('id').to_pylist(), [1, 2]) + self.assertEqual(actual.column('name').to_pylist(), ['Alice', 'Bob']) + self.assertEqual(actual.column('doc').to_pylist(), [b'doc_alice', b'doc_bob']) + + def test_blob_append_with_subset_evolution(self): + """Write normal+blob subset in first commit, add remaining col via evolution.""" + pa_schema = pa.schema([ + ('id', pa.int32()), + ('tag', pa.string()), + ('picture', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }) + self.catalog.create_table('default.fmt_blob_append_evo', schema, False) + table = self.catalog.get_table('default.fmt_blob_append_evo') + wb = table.new_batch_write_builder() + + # commit 1: id + picture (normal + blob) + tw = wb.new_write().with_write_type(['id', 'picture']) + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict( + {'id': [1, 2], 'picture': [b'pic1', b'pic2']}, + schema=pa.schema([('id', pa.int32()), ('picture', pa.large_binary())]))) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + # commit 2: add tag for the same rows + tw = wb.new_write().with_write_type(['tag']) + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict( + {'tag': ['t1', 't2']}, + schema=pa.schema([('tag', pa.string())]))) + cmts = tw.prepare_commit() + for m in cmts: + for nf in m.new_files: + nf.first_row_id = 0 + tc.commit(cmts) + tw.close() + tc.close() + + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual.num_rows, 2) + self.assertEqual(actual.column('id').to_pylist(), [1, 2]) + self.assertEqual(actual.column('tag').to_pylist(), ['t1', 't2']) + self.assertEqual(actual.column('picture').to_pylist(), [b'pic1', b'pic2']) + + def test_blob_multiple_blob_columns(self): + """Table with two blob columns, write and read both.""" + pa_schema = pa.schema([ + ('id', pa.int32()), + ('audio', pa.large_binary()), + ('video', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }) + self.catalog.create_table('default.fmt_blob_multi', schema, False) + table = self.catalog.get_table('default.fmt_blob_multi') + wb = table.new_batch_write_builder() + + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict({ + 'id': [1, 2], + 'audio': [b'audio_1', b'audio_2'], + 'video': [b'video_1', b'video_2'], + }, schema=pa_schema)) + cmts = tw.prepare_commit() + tc.commit(cmts) + tw.close() + tc.close() + + # verify blob files were produced + all_files = [nf for m in cmts for nf in m.new_files] + blob_files = [f for f in all_files if f.file_name.endswith('.blob')] + self.assertGreaterEqual(len(blob_files), 2) + + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual.num_rows, 2) + self.assertEqual(actual.column('audio').to_pylist(), [b'audio_1', b'audio_2']) + self.assertEqual(actual.column('video').to_pylist(), [b'video_1', b'video_2']) + + # ------------------------------------------------------------------ + # Vortex-format data evolution + # ------------------------------------------------------------------ + + @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11") + @unittest.skipUnless( + __import__('importlib').util.find_spec('vortex') is not None, + "vortex not installed") + def test_vortex_column_subset_write_and_merge_read(self): + """Write disjoint column subsets as vortex, merge-read via data evolution.""" + pa_schema = pa.schema([ + ('id', pa.int32()), + ('tag', pa.string()), + ('val', pa.float64()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'vortex', + }) + self.catalog.create_table('default.fmt_vortex_subset', schema, False) + table = self.catalog.get_table('default.fmt_vortex_subset') + wb = table.new_batch_write_builder() + + w0 = wb.new_write().with_write_type(['id', 'tag']) + w1 = wb.new_write().with_write_type(['val']) + c = wb.new_commit() + w0.write_arrow(pa.Table.from_pydict( + {'id': [10, 20, 30], 'tag': ['p', 'q', 'r']}, + schema=pa.schema([('id', pa.int32()), ('tag', pa.string())]))) + w1.write_arrow(pa.Table.from_pydict( + {'val': [1.5, 2.5, 3.5]}, + schema=pa.schema([('val', pa.float64())]))) + cmts = w0.prepare_commit() + w1.prepare_commit() + for m in cmts: + for nf in m.new_files: + nf.first_row_id = 0 + c.commit(cmts) + w0.close() + w1.close() + c.close() + + # verify vortex files + all_files = [nf for m in cmts for nf in m.new_files] + for f in all_files: + self.assertTrue(f.file_name.endswith('.vortex'), + f"Expected vortex file, got {f.file_name}") + + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + expect = pa.Table.from_pydict( + {'id': [10, 20, 30], 'tag': ['p', 'q', 'r'], 'val': [1.5, 2.5, 3.5]}, + schema=pa_schema) + self.assertEqual(actual, expect) + + @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11") + @unittest.skipUnless( + __import__('importlib').util.find_spec('vortex') is not None, + "vortex not installed") + def test_vortex_overwrite_column(self): + """Full row write then overwrite one column, all in vortex format.""" + pa_schema = pa.schema([ + ('k', pa.int64()), + ('v', pa.string()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'vortex', + }) + self.catalog.create_table('default.fmt_vortex_overwrite', schema, False) + table = self.catalog.get_table('default.fmt_vortex_overwrite') + wb = table.new_batch_write_builder() + + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict( + {'k': [100, 200], 'v': ['old', 'old']}, schema=pa_schema)) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + tw = wb.new_write().with_write_type(['v']) + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict( + {'v': ['new', 'new']}, schema=pa.schema([('v', pa.string())]))) + cmts = tw.prepare_commit() + for m in cmts: + for nf in m.new_files: + nf.first_row_id = 0 + tc.commit(cmts) + tw.close() + tc.close() + + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual, pa.Table.from_pydict( + {'k': [100, 200], 'v': ['new', 'new']}, schema=pa_schema)) + + @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11") + @unittest.skipUnless( + __import__('importlib').util.find_spec('vortex') is not None, + "vortex not installed") + def test_vortex_append_new_rows(self): + """Append new rows with column subsets in vortex format.""" + pa_schema = pa.schema([ + ('x', pa.int32()), + ('y', pa.string()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'vortex', + }) + self.catalog.create_table('default.fmt_vortex_append', schema, False) + table = self.catalog.get_table('default.fmt_vortex_append') + wb = table.new_batch_write_builder() + + # commit 1 + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict( + {'x': [1, 2], 'y': ['a', 'b']}, schema=pa_schema)) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + # commit 2: append with subsets, first_row_id=2 + w_x = wb.new_write().with_write_type(['x']) + w_y = wb.new_write().with_write_type(['y']) + tc = wb.new_commit() + w_x.write_arrow(pa.Table.from_pydict( + {'x': [3]}, schema=pa.schema([('x', pa.int32())]))) + w_y.write_arrow(pa.Table.from_pydict( + {'y': ['c']}, schema=pa.schema([('y', pa.string())]))) + cmts = w_x.prepare_commit() + w_y.prepare_commit() + for m in cmts: + for nf in m.new_files: + nf.first_row_id = 2 + tc.commit(cmts) + w_x.close() + w_y.close() + tc.close() + + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual.num_rows, 3) + expect = pa.Table.from_pydict( + {'x': [1, 2, 3], 'y': ['a', 'b', 'c']}, schema=pa_schema) + self.assertEqual(actual, expect) + + @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11") + @unittest.skipUnless( + __import__('importlib').util.find_spec('vortex') is not None, + "vortex not installed") + def test_vortex_with_row_id_and_filter(self): + """Write vortex data, read with _ROW_ID projection and filter.""" + pa_schema = pa.schema([ + ('id', pa.int32()), + ('val', pa.string()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'vortex', + }) + self.catalog.create_table('default.fmt_vortex_rowid_filter', schema, False) + table = self.catalog.get_table('default.fmt_vortex_rowid_filter') + wb = table.new_batch_write_builder() + + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict( + {'id': list(range(10)), 'val': [f'v{i}' for i in range(10)]}, + schema=pa_schema)) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + # full read + rb = table.new_read_builder() + full = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(full.num_rows, 10) + + # filter by _ROW_ID + rb_rid = table.new_read_builder().with_projection(['id', 'val', '_ROW_ID']) + pb = rb_rid.new_predicate_builder() + rb_f = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 5)) + actual = rb_f.new_read().to_arrow(rb_f.new_scan().plan().splits()) + self.assertEqual(actual.num_rows, 1) + self.assertEqual(actual.column('id')[0].as_py(), 5) + self.assertEqual(actual.column('val')[0].as_py(), 'v5') + + # ------------------------------------------------------------------ + # Vector (vortex) file format for embedding columns + # ------------------------------------------------------------------ + + @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11") + @unittest.skipUnless( + __import__('importlib').util.find_spec('vortex') is not None, + "vortex not installed") + def test_vector_vortex_write_and_read(self): + """Write table with normal + vector columns using vortex vector format.""" + pa_schema = pa.schema([ + ('id', pa.int64()), + ('embed', pa.list_(pa.float32(), 4)), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'vortex', + 'vector.file.format': 'vortex', + }) + self.catalog.create_table('default.fmt_vec_vortex', schema, False) + table = self.catalog.get_table('default.fmt_vec_vortex') + + embeddings = [1.0, 0.0, 0.0, 0.0, + 0.0, 1.0, 0.0, 0.0, + 0.0, 0.0, 1.0, 0.0] + test_data = pa.table({ + 'id': pa.array([1, 2, 3], type=pa.int64()), + 'embed': pa.FixedSizeListArray.from_arrays( + pa.array(embeddings, type=pa.float32()), 4), + }) + + wb = table.new_batch_write_builder() + tw = wb.new_write() + tw.write_arrow(test_data) + cmts = tw.prepare_commit() + + # should produce both normal and vector files + all_files = [nf for m in cmts for nf in m.new_files] + normal_files = [f for f in all_files if not DataFileMeta.is_vector_file(f.file_name)] + vector_files = [f for f in all_files if DataFileMeta.is_vector_file(f.file_name)] + self.assertGreater(len(normal_files), 0) + self.assertGreater(len(vector_files), 0) + for vf in vector_files: + self.assertIn('.vector.vortex', vf.file_name) + + wb.new_commit().commit(cmts) + tw.close() + + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual.num_rows, 3) + self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3]) + embed_col = actual.column('embed') + self.assertEqual(embed_col[0].as_py(), [1.0, 0.0, 0.0, 0.0]) + self.assertEqual(embed_col[1].as_py(), [0.0, 1.0, 0.0, 0.0]) + self.assertEqual(embed_col[2].as_py(), [0.0, 0.0, 1.0, 0.0]) + + @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11") + @unittest.skipUnless( + __import__('importlib').util.find_spec('vortex') is not None, + "vortex not installed") + def test_vector_vortex_multiple_appends(self): + """Append multiple batches of normal+vector data and read all back.""" + pa_schema = pa.schema([ + ('id', pa.int64()), + ('label', pa.string()), + ('embed', pa.list_(pa.float32(), 3)), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'vortex', + 'vector.file.format': 'vortex', + }) + self.catalog.create_table('default.fmt_vec_vortex_append', schema, False) + table = self.catalog.get_table('default.fmt_vec_vortex_append') + wb = table.new_batch_write_builder() + + # commit 1 + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.table({ + 'id': pa.array([1, 2], type=pa.int64()), + 'label': pa.array(['cat', 'dog']), + 'embed': pa.FixedSizeListArray.from_arrays( + pa.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6], type=pa.float32()), 3), + })) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + # commit 2: append + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.table({ + 'id': pa.array([3], type=pa.int64()), + 'label': pa.array(['bird']), + 'embed': pa.FixedSizeListArray.from_arrays( + pa.array([0.7, 0.8, 0.9], type=pa.float32()), 3), + })) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual.num_rows, 3) + self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3]) + self.assertEqual(actual.column('label').to_pylist(), ['cat', 'dog', 'bird']) + embed_col = actual.column('embed') + self.assertAlmostEqual(embed_col[0].as_py()[0], 0.1, places=5) + self.assertAlmostEqual(embed_col[2].as_py()[2], 0.9, places=5) + + # ------------------------------------------------------------------ + # Mixed formats: parquet + blob + vector in one table + # ------------------------------------------------------------------ + + def test_parquet_and_blob_mixed_append(self): + """Table with normal parquet cols + blob col, append new rows.""" + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('image', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }) + self.catalog.create_table('default.fmt_mixed_parquet_blob', schema, False) + table = self.catalog.get_table('default.fmt_mixed_parquet_blob') + wb = table.new_batch_write_builder() + + # commit 1: first batch + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict({ + 'id': [1, 2], + 'name': ['a', 'b'], + 'image': [b'img1', b'img2'], + }, schema=pa_schema)) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + # commit 2: append more rows + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict({ + 'id': [3, 4], + 'name': ['c', 'd'], + 'image': [b'img3', b'img4'], + }, schema=pa_schema)) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual.num_rows, 4) + self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3, 4]) + self.assertEqual(actual.column('name').to_pylist(), ['a', 'b', 'c', 'd']) + self.assertEqual(actual.column('image').to_pylist(), + [b'img1', b'img2', b'img3', b'img4']) + + @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11") + @unittest.skipUnless( + __import__('importlib').util.find_spec('vortex') is not None, + "vortex not installed") + def test_vortex_and_vector_vortex_mixed(self): + """Table with normal (vortex) + vector (vortex) columns, write and read. + + Verifies that the writer produces separate .vortex and .vector.vortex files, + and the data evolution merge reader stitches them back together. + """ + pa_schema = pa.schema([ + ('id', pa.int64()), + ('name', pa.string()), + ('embed', pa.list_(pa.float32(), 3)), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'vortex', + 'vector.file.format': 'vortex', + }) + self.catalog.create_table('default.fmt_vortex_vector', schema, False) + table = self.catalog.get_table('default.fmt_vortex_vector') + wb = table.new_batch_write_builder() + + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.table({ + 'id': pa.array([1, 2, 3], type=pa.int64()), + 'name': pa.array(['cat', 'dog', 'bird']), + 'embed': pa.FixedSizeListArray.from_arrays( + pa.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], + type=pa.float32()), 3), + })) + cmts = tw.prepare_commit() + tc.commit(cmts) + tw.close() + tc.close() + + # verify two file types: .vortex + .vector.vortex + all_files = [nf for m in cmts for nf in m.new_files] + normal_files = [f for f in all_files if not DataFileMeta.is_vector_file(f.file_name)] + vector_files = [f for f in all_files if DataFileMeta.is_vector_file(f.file_name)] + self.assertGreater(len(normal_files), 0, "should produce normal vortex files") + self.assertGreater(len(vector_files), 0, "should produce vector files") + for nf in normal_files: + self.assertTrue(nf.file_name.endswith('.vortex')) + for vf in vector_files: + self.assertIn('.vector.vortex', vf.file_name) + + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual.num_rows, 3) + self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3]) + self.assertEqual(actual.column('name').to_pylist(), ['cat', 'dog', 'bird']) + embed = actual.column('embed') + self.assertAlmostEqual(embed[0].as_py()[0], 0.1, places=5) + self.assertAlmostEqual(embed[2].as_py()[2], 0.9, places=5) + + def test_blob_and_vector_inline_mixed(self): + """Table with normal + blob + vector(inline) columns, write and read. + + When blob columns are present, vector columns are stored inline in the + parquet file (not as separate .vector files). This test verifies the + blob+inline-vector path works correctly. + """ + pa_schema = pa.schema([ + ('id', pa.int64()), + ('doc', pa.large_binary()), + ('embed', pa.list_(pa.float32(), 3)), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }) + self.catalog.create_table('default.fmt_blob_vector_inline', schema, False) + table = self.catalog.get_table('default.fmt_blob_vector_inline') + wb = table.new_batch_write_builder() + + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.table({ + 'id': pa.array([1, 2], type=pa.int64()), + 'doc': pa.array([b'doc1', b'doc2'], type=pa.large_binary()), + 'embed': pa.FixedSizeListArray.from_arrays( + pa.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6], type=pa.float32()), 3), + })) + cmts = tw.prepare_commit() + tc.commit(cmts) + tw.close() + tc.close() + + # verify parquet + blob files + all_files = [nf for m in cmts for nf in m.new_files] + parquet_files = [f for f in all_files if f.file_name.endswith('.parquet')] + blob_files = [f for f in all_files if f.file_name.endswith('.blob')] + self.assertGreater(len(parquet_files), 0, "should produce parquet files") + self.assertGreater(len(blob_files), 0, "should produce blob files") + + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual.num_rows, 2) + self.assertEqual(actual.column('id').to_pylist(), [1, 2]) + self.assertEqual(actual.column('doc').to_pylist(), [b'doc1', b'doc2']) + embed = actual.column('embed') + self.assertAlmostEqual(embed[0].as_py()[0], 0.1, places=5) + self.assertAlmostEqual(embed[1].as_py()[2], 0.6, places=5) + + def test_blob_and_vector_with_vector_file_format(self): + """Table with blob + vector columns and explicit vector.file.format. + + DedicatedFormatWriter splits data three ways: normal columns to .parquet, + blob columns to .blob, and vector columns to .vector. files. + """ + pa_schema = pa.schema([ + ('id', pa.int64()), + ('doc', pa.large_binary()), + ('embed', pa.list_(pa.float32(), 3)), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'vector.file.format': 'parquet', + }) + self.catalog.create_table('default.fmt_blob_vec_format', schema, False) + table = self.catalog.get_table('default.fmt_blob_vec_format') + wb = table.new_batch_write_builder() + + # commit 1: write all columns + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.table({ + 'id': pa.array([1, 2, 3], type=pa.int64()), + 'doc': pa.array([b'aaa', b'bbb', b'ccc'], type=pa.large_binary()), + 'embed': pa.FixedSizeListArray.from_arrays( + pa.array([1.0, 0.0, 0.0, + 0.0, 1.0, 0.0, + 0.0, 0.0, 1.0], type=pa.float32()), 3), + })) + cmts = tw.prepare_commit() + tc.commit(cmts) + tw.close() + tc.close() + + # DedicatedFormatWriter produces parquet + blob + vector files + all_files = [nf for m in cmts for nf in m.new_files] + parquet_files = [f for f in all_files + if f.file_name.endswith('.parquet') + and not DataFileMeta.is_vector_file(f.file_name)] + blob_files = [f for f in all_files if f.file_name.endswith('.blob')] + vector_files = [f for f in all_files if DataFileMeta.is_vector_file(f.file_name)] + self.assertGreater(len(parquet_files), 0, "should produce normal parquet files") + self.assertGreater(len(blob_files), 0, "should produce blob files") + self.assertGreater(len(vector_files), 0, "should produce vector files") + for vf in vector_files: + self.assertIn('.vector.parquet', vf.file_name) + + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual.num_rows, 3) + self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3]) + self.assertEqual(actual.column('doc').to_pylist(), [b'aaa', b'bbb', b'ccc']) + self.assertEqual(actual.column('embed')[0].as_py(), [1.0, 0.0, 0.0]) + self.assertEqual(actual.column('embed')[1].as_py(), [0.0, 1.0, 0.0]) + self.assertEqual(actual.column('embed')[2].as_py(), [0.0, 0.0, 1.0]) + + # commit 2: append more rows + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.table({ + 'id': pa.array([4, 5], type=pa.int64()), + 'doc': pa.array([b'ddd', b'eee'], type=pa.large_binary()), + 'embed': pa.FixedSizeListArray.from_arrays( + pa.array([0.5, 0.5, 0.0, + 0.0, 0.5, 0.5], type=pa.float32()), 3), + })) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + rb = table.new_read_builder() + actual2 = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual2.num_rows, 5) + self.assertEqual(actual2.column('id').to_pylist(), [1, 2, 3, 4, 5]) + self.assertEqual(actual2.column('doc').to_pylist(), + [b'aaa', b'bbb', b'ccc', b'ddd', b'eee']) + + # ------------------------------------------------------------------ + # Projection and _ROW_ID across formats + # ------------------------------------------------------------------ + + def test_blob_with_row_id_projection(self): + """Read blob table with _ROW_ID projection.""" + pa_schema = pa.schema([ + ('id', pa.int32()), + ('data', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }) + self.catalog.create_table('default.fmt_blob_rowid', schema, False) + table = self.catalog.get_table('default.fmt_blob_rowid') + wb = table.new_batch_write_builder() + + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict( + {'id': [10, 20], 'data': [b'aa', b'bb']}, schema=pa_schema)) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + rb = table.new_read_builder() + rb.with_projection(['id', 'data', '_ROW_ID']) + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual.num_rows, 2) + self.assertEqual(actual.column('_ROW_ID').to_pylist(), [0, 1]) + self.assertEqual(actual.column('id').to_pylist(), [10, 20]) + self.assertEqual(actual.column('data').to_pylist(), [b'aa', b'bb']) + + def test_parquet_large_data_evolution(self): + """Larger dataset: 1000 rows, column-subset write+merge.""" + pa_schema = pa.schema([ + ('id', pa.int32()), + ('col_a', pa.string()), + ('col_b', pa.float64()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'parquet', + }) + self.catalog.create_table('default.fmt_parquet_large', schema, False) + table = self.catalog.get_table('default.fmt_parquet_large') + wb = table.new_batch_write_builder() + + n = 1000 + w0 = wb.new_write().with_write_type(['id', 'col_a']) + w1 = wb.new_write().with_write_type(['col_b']) + c = wb.new_commit() + w0.write_arrow(pa.Table.from_pydict( + {'id': list(range(n)), 'col_a': [f's{i}' for i in range(n)]}, + schema=pa.schema([('id', pa.int32()), ('col_a', pa.string())]))) + w1.write_arrow(pa.Table.from_pydict( + {'col_b': [float(i) for i in range(n)]}, + schema=pa.schema([('col_b', pa.float64())]))) + cmts = w0.prepare_commit() + w1.prepare_commit() + for m in cmts: + for nf in m.new_files: + nf.first_row_id = 0 + c.commit(cmts) + w0.close() + w1.close() + c.close() + + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual.num_rows, n) + self.assertEqual(actual.column('id').to_pylist(), list(range(n))) + self.assertEqual(actual.column('col_a').to_pylist(), [f's{i}' for i in range(n)]) + self.assertEqual(actual.column('col_b').to_pylist(), [float(i) for i in range(n)]) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index c31a9f8a91f3..6a20708a14fd 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -23,7 +23,7 @@ from pypaimon.common.options.core_options import CoreOptions from pypaimon.write.commit_message import CommitMessage from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter -from pypaimon.write.writer.data_blob_writer import DataBlobWriter +from pypaimon.write.writer.dedicated_format_writer import DedicatedFormatWriter from pypaimon.write.writer.data_vector_writer import DataVectorWriter from pypaimon.write.writer.data_writer import DataWriter from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter @@ -65,7 +65,7 @@ def max_seq_number(): # Check if table has blob columns if self._has_blob_columns(): - return DataBlobWriter( + return DedicatedFormatWriter( table=self.table, partition=partition, bucket=bucket, diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py similarity index 78% rename from paimon-python/pypaimon/write/writer/data_blob_writer.py rename to paimon-python/pypaimon/write/writer/dedicated_format_writer.py index cb131f9a54b1..5ddc26f9b804 100644 --- a/paimon-python/pypaimon/write/writer/data_blob_writer.py +++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py @@ -25,53 +25,25 @@ from pypaimon.data.timestamp import Timestamp from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.schema.data_types import VectorType from pypaimon.table.row.generic_row import GenericRow from pypaimon.write.writer.data_writer import DataWriter logger = logging.getLogger(__name__) -class DataBlobWriter(DataWriter): - """ - A rolling file writer that handles both normal data and blob data. This writer creates separate - files for normal columns and blob columns, managing their lifecycle independently. - - For example, given a table schema with normal columns (id INT, name STRING) and blob columns - (pic1 BLOB, pic2 BLOB), this writer will create separate files for normal columns and each - blob-file column. - - Key features: - - Blob data can roll independently when normal data doesn't need rolling - - When normal data rolls, blob data MUST also be closed (Java behavior) - - Blob data uses more aggressive rolling (smaller target size) to prevent memory issues - - One normal data file may correspond to multiple blob data files - - Blob data is written immediately to disk to prevent memory corruption - - Blob file metadata is stored as separate DataFileMeta objects after normal file metadata - - When TableWrite.with_write_type narrows columns, incoming batches only carry that subset; - column lists are narrowed accordingly so splitting never selects missing columns. - - Rolling behavior: - - Normal data rolls: Both normal and blob writers are closed together, blob metadata added after normal metadata - - Blob data rolls independently: Only blob writer is closed, blob metadata is cached until normal data rolls - - Metadata organization: - - Normal file metadata is added first to committed_files - - Blob file metadata is added after normal file metadata in committed_files - - When blob rolls independently, metadata is cached until normal data rolls - - Result: [normal_meta, blob_meta1, blob_meta2, blob_meta3, ...] - - Example file organization: - committed_files = [ - normal_file1_meta, # f1.parquet metadata - blob_file1_meta, # b1.blob metadata - blob_file2_meta, # b2.blob metadata - blob_file3_meta, # b3.blob metadata - normal_file2_meta, # f1-2.parquet metadata - blob_file4_meta, # b4.blob metadata - blob_file5_meta, # b5.blob metadata - ] - - This matches the Java RollingBlobFileWriter behavior exactly. +class DedicatedFormatWriter(DataWriter): + """A rolling file writer that writes normal, blob, and vector columns to dedicated files. + + Splits incoming data three ways: + - Normal columns → standard data files (.parquet / .orc / .vortex / …) + - Blob columns (large_binary) → .blob files + - Vector columns (when vector.file.format is configured) → .vector. files + + This mirrors Java's DedicatedFormatRollingFileWriter. + + Metadata order in committed_files: + [normal_meta, blob_meta1, …, vector_meta1, …] """ # Constant for checking rolling condition periodically @@ -101,6 +73,13 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op full_blob_file_set = set(full_blob_file_column_names) all_column_names = self.table.field_names + # Detect vector columns that should be written to dedicated files. + full_vector_column_names = self._get_vector_columns_from_schema() + full_vector_set = set(full_vector_column_names) + # Only split vector columns when vector.file.format is configured. + has_dedicated_vector = bool(full_vector_column_names) and options.with_vector_format() + dedicated_set = full_blob_file_set | (full_vector_set if has_dedicated_vector else set()) + # Narrow columns when TableWrite.with_write_type(...) supplies a partial column list. # Incoming RecordBatches only contain those columns; selecting full normal/blob lists # would raise KeyError. @@ -109,13 +88,17 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op self.blob_file_column_names = [ col for col in full_blob_file_column_names if col in write_col_set ] + self.vector_write_columns = [ + col for col in full_vector_column_names if col in write_col_set + ] if has_dedicated_vector else [] self.normal_column_names = [ - col for col in write_cols if col not in full_blob_file_set + col for col in write_cols if col not in dedicated_set ] else: self.blob_file_column_names = list(full_blob_file_column_names) + self.vector_write_columns = list(full_vector_column_names) if has_dedicated_vector else [] self.normal_column_names = [ - col for col in all_column_names if col not in full_blob_file_set + col for col in all_column_names if col not in dedicated_set ] normal_name_set = set(self.normal_column_names) self.normal_columns = [ @@ -143,6 +126,20 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op options=options ) + # Initialize vector writer when vector.file.format is configured. + from pypaimon.write.writer.vector_writer import VectorWriter + self.vector_writer: Optional[VectorWriter] = None + if self.vector_write_columns: + self.vector_writer = VectorWriter( + table=self.table, + partition=self.partition, + bucket=self.bucket, + max_seq_number=max_seq_number, + vector_columns=self.vector_write_columns, + vector_file_format=options.vector_file_format(), + options=options, + ) + # Initialize ExternalStorageBlobWriter if configured self._external_storage_writer = None external_storage_fields = self.options.blob_external_storage_fields() @@ -159,10 +156,11 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op ) logger.info( - "Initialized DataBlobWriter with blob columns: %s, blob file columns: %s, descriptor " - "stored columns: %s, external storage fields: %s", + "Initialized DedicatedFormatWriter with blob columns: %s, blob file columns: %s, " + "vector columns: %s, descriptor stored columns: %s, external storage fields: %s", self.blob_column_names, self.blob_file_column_names, + self.vector_write_columns, sorted(self.blob_descriptor_fields), sorted(external_storage_fields) if external_storage_fields else [], ) @@ -178,8 +176,14 @@ def _get_blob_columns_from_schema(self) -> List[str]: raise ValueError("No blob field found in table schema.") return blob_columns + def _get_vector_columns_from_schema(self) -> List[str]: + return [ + field.name for field in self.table.table_schema.fields + if isinstance(field.type, VectorType) + ] + def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch: - normal_data, _ = self._split_data(data) + normal_data, _, _ = self._split_data(data) return normal_data def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table: @@ -192,8 +196,8 @@ def write(self, data: pa.RecordBatch): if self._external_storage_writer: data = self._external_storage_writer.transform_batch(data) - # Split data into normal and blob parts - normal_data, blob_data_map = self._split_data(data) + # Split data into normal, blob, and vector parts + normal_data, blob_data_map, vector_data = self._split_data(data) self._validate_descriptor_stored_fields_input(data) # Process and accumulate normal data @@ -208,6 +212,10 @@ def write(self, data: pa.RecordBatch): if blob_data is not None and blob_data.num_rows > 0: self.blob_writers[blob_column].write(blob_data) + # Write vector columns to dedicated vector writer. + if self.vector_writer is not None and vector_data is not None and vector_data.num_rows > 0: + self.vector_writer.write(vector_data) + self.record_count += data.num_rows # Check if normal data rolling is needed @@ -246,18 +254,27 @@ def abort(self): """Abort all writers and clean up resources.""" for blob_writer in self.blob_writers.values(): blob_writer.abort() + if self.vector_writer is not None: + self.vector_writer.abort() if self._external_storage_writer: self._external_storage_writer.abort() self.pending_normal_data = None self.committed_files.clear() - def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch, Dict[str, pa.RecordBatch]]: - """Split data into normal and blob parts based on column names.""" + def _split_data(self, data: pa.RecordBatch) -> Tuple[ + pa.RecordBatch, Dict[str, pa.RecordBatch], Optional[pa.RecordBatch]]: + """Split data into normal, blob, and vector parts based on column names.""" normal_data = data.select(self.normal_column_names) if self.normal_column_names else None blob_data_map = { blob_column: data.select([blob_column]) for blob_column in self.blob_file_column_names } - return normal_data, blob_data_map + vector_data = ( + pa.RecordBatch.from_arrays( + [data.column(name) for name in self.vector_write_columns], + names=self.vector_write_columns, + ) if self.vector_write_columns else None + ) + return normal_data, blob_data_map, vector_data def _validate_descriptor_stored_fields_input(self, data: pa.RecordBatch): if not self.blob_descriptor_fields: @@ -316,7 +333,7 @@ def _should_roll_normal(self) -> bool: return current_size > self.target_file_size def _close_current_writers(self): - """Close both normal and blob writers and add blob metadata after normal metadata (Java behavior).""" + """Close normal, blob, and vector writers; add metadata in order: normal, blob, vector.""" if self.pending_normal_data is None or self.pending_normal_data.num_rows == 0: return @@ -329,17 +346,23 @@ def _close_current_writers(self): self._validate_consistency(normal_meta, writer_metas, blob_column) blob_metas.extend(writer_metas) - # Add normal file metadata first - self.committed_files.append(normal_meta) + vector_metas = [] + if self.vector_writer is not None: + vector_metas = self.vector_writer.prepare_commit() + self.vector_writer.committed_files.clear() + if vector_metas: + self._validate_consistency(normal_meta, vector_metas, 'vector') - # Add blob file metadata after normal metadata + # Add metadata in order: normal, blob, vector + self.committed_files.append(normal_meta) self.committed_files.extend(blob_metas) + self.committed_files.extend(vector_metas) # Reset pending data self.pending_normal_data = None - logger.info(f"Closed both writers - normal: {normal_meta.file_name}, " - f"added {len(blob_metas)} blob file metadata after normal metadata") + logger.info(f"Closed writers - normal: {normal_meta.file_name}, " + f"{len(blob_metas)} blob metas, {len(vector_metas)} vector metas") def _write_normal_data_to_file(self, data: pa.Table) -> Optional[DataFileMeta]: if data.num_rows == 0: From dc179767a4a0e8755150fd604b049ad088a18f2c Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Fri, 29 May 2026 10:11:05 +0800 Subject: [PATCH 2/3] [python] Fix vortex reader to cast all string_view/binary_view columns Previously _cast_view_types only cast predicate-referenced fields, leaving other string_view columns unconverted. This caused schema mismatch errors when combining batches from multiple vortex files. Co-Authored-By: Claude Opus 4.6 --- .../read/reader/format_vortex_reader.py | 28 ++++++++----------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/format_vortex_reader.py b/paimon-python/pypaimon/read/reader/format_vortex_reader.py index ac48ca1abe91..fcd50a24f005 100644 --- a/paimon-python/pypaimon/read/reader/format_vortex_reader.py +++ b/paimon-python/pypaimon/read/reader/format_vortex_reader.py @@ -86,29 +86,23 @@ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[DataField] PyarrowFieldParser.from_paimon_schema(read_fields) if read_fields else None ) - # Collect predicate-referenced fields for targeted view type casting - self._cast_fields = predicate_fields if predicate_fields and vortex_expr is not None else set() - @staticmethod - def _cast_view_types(batch: RecordBatch, target_fields: Set[str]) -> RecordBatch: - """Cast string_view/binary_view columns to string/binary, only for target fields.""" - if not target_fields: - return batch + def _cast_view_types(batch: RecordBatch) -> RecordBatch: + """Cast all string_view/binary_view columns to string/binary.""" columns = [] fields = [] changed = False for i in range(batch.num_columns): col = batch.column(i) field = batch.schema.field(i) - if field.name in target_fields: - if col.type == pa.string_view(): - col = col.cast(pa.utf8()) - field = field.with_type(pa.utf8()) - changed = True - elif col.type == pa.binary_view(): - col = col.cast(pa.binary()) - field = field.with_type(pa.binary()) - changed = True + if col.type == pa.string_view(): + col = col.cast(pa.utf8()) + field = field.with_type(pa.utf8()) + changed = True + elif col.type == pa.binary_view(): + col = col.cast(pa.binary()) + field = field.with_type(pa.binary()) + changed = True columns.append(col) fields.append(field) if changed: @@ -118,7 +112,7 @@ def _cast_view_types(batch: RecordBatch, target_fields: Set[str]) -> RecordBatch def read_arrow_batch(self) -> Optional[RecordBatch]: try: batch = next(self.record_batch_reader) - batch = self._cast_view_types(batch, self._cast_fields) + batch = self._cast_view_types(batch) if not self.missing_fields: return batch From 3ff1ce266552e474bc0f4ec5024838ccbc049802 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Fri, 29 May 2026 10:53:15 +0800 Subject: [PATCH 3/3] fix comments --- paimon-python/pypaimon/read/table_read.py | 4 +- .../tests/data_evolution_formats_test.py | 79 +++++++++++++++++-- .../write/writer/dedicated_format_writer.py | 42 +++++----- 3 files changed, 94 insertions(+), 31 deletions(-) diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 7f867e4f519a..52a4eaaa7f1a 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -135,7 +135,7 @@ def _add_row_kind_to_schema(schema: pyarrow.Schema) -> pyarrow.Schema: @staticmethod def _try_to_pad_batch_by_schema(batch: pyarrow.RecordBatch, target_schema): - if batch.schema.equals(target_schema): + if batch.schema.names == target_schema.names: return batch columns = [] @@ -144,8 +144,6 @@ def _try_to_pad_batch_by_schema(batch: pyarrow.RecordBatch, target_schema): for field in target_schema: if field.name in batch.schema.names: col = batch.column(field.name) - if not col.type.equals(field.type): - col = col.cast(field.type) else: col = pyarrow.nulls(num_rows, type=field.type) columns.append(col) diff --git a/paimon-python/pypaimon/tests/data_evolution_formats_test.py b/paimon-python/pypaimon/tests/data_evolution_formats_test.py index 406a8feedddd..f89f3290f199 100644 --- a/paimon-python/pypaimon/tests/data_evolution_formats_test.py +++ b/paimon-python/pypaimon/tests/data_evolution_formats_test.py @@ -237,13 +237,7 @@ def test_blob_write_and_read(self): self.assertEqual(actual.column('payload').to_pylist(), blobs) def test_blob_column_subset_evolution(self): - """Write normal+blob cols in one commit, overwrite normal col in another, merge-read. - - Note: writing blob-only subsets (with_write_type containing only blob columns - and no normal columns) is not supported by DedicatedFormatWriter. This test writes - blob alongside a normal column, then uses data evolution to overwrite the - normal column separately. - """ + """Write normal+blob cols in one commit, overwrite normal col in another, merge-read.""" pa_schema = pa.schema([ ('id', pa.int32()), ('name', pa.string()), @@ -917,6 +911,77 @@ def test_blob_and_vector_with_vector_file_format(self): self.assertEqual(actual2.column('doc').to_pylist(), [b'aaa', b'bbb', b'ccc', b'ddd', b'eee']) + def test_blob_vector_partial_write_vector_only(self): + """Blob+vector table with with_write_type(['embed']) — vector-only partial write. + + When normal_column_names is empty, the writer must still flush vector + metadata without crashing on an empty normal data path. + """ + pa_schema = pa.schema([ + ('id', pa.int64()), + ('doc', pa.large_binary()), + ('embed', pa.list_(pa.float32(), 3)), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'vector.file.format': 'parquet', + }) + self.catalog.create_table('default.fmt_blob_vec_partial', schema, False) + table = self.catalog.get_table('default.fmt_blob_vec_partial') + wb = table.new_batch_write_builder() + + # commit 1: write all columns + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.table({ + 'id': pa.array([1, 2, 3], type=pa.int64()), + 'doc': pa.array([b'aaa', b'bbb', b'ccc'], type=pa.large_binary()), + 'embed': pa.FixedSizeListArray.from_arrays( + pa.array([1.0, 0.0, 0.0, + 0.0, 1.0, 0.0, + 0.0, 0.0, 1.0], type=pa.float32()), 3), + })) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + # commit 2: write only vector column — no normal columns + tw = wb.new_write().with_write_type(['embed']) + tc = wb.new_commit() + tw.write_arrow(pa.table({ + 'embed': pa.FixedSizeListArray.from_arrays( + pa.array([0.5, 0.5, 0.0, + 0.0, 0.5, 0.5, + 0.5, 0.0, 0.5], type=pa.float32()), 3), + })) + cmts = tw.prepare_commit() + + # should produce only vector files, no normal or blob files + all_files = [nf for m in cmts for nf in m.new_files] + self.assertGreater(len(all_files), 0, "should produce vector files") + for f in all_files: + self.assertTrue(DataFileMeta.is_vector_file(f.file_name), + f"Expected vector file, got {f.file_name}") + + for m in cmts: + for nf in m.new_files: + nf.first_row_id = 0 + tc.commit(cmts) + tw.close() + tc.close() + + # read back and verify the vector column was updated + rb = table.new_read_builder() + actual = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(actual.num_rows, 3) + self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3]) + self.assertEqual(actual.column('doc').to_pylist(), [b'aaa', b'bbb', b'ccc']) + embed = actual.column('embed') + self.assertEqual(embed[0].as_py(), [0.5, 0.5, 0.0]) + self.assertEqual(embed[1].as_py(), [0.0, 0.5, 0.5]) + self.assertEqual(embed[2].as_py(), [0.5, 0.0, 0.5]) + # ------------------------------------------------------------------ # Projection and _ROW_ID across formats # ------------------------------------------------------------------ diff --git a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py index 5ddc26f9b804..6e7ea5778148 100644 --- a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py +++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py @@ -200,12 +200,13 @@ def write(self, data: pa.RecordBatch): normal_data, blob_data_map, vector_data = self._split_data(data) self._validate_descriptor_stored_fields_input(data) - # Process and accumulate normal data + # Process and accumulate normal data (may be None for partial writes) processed_normal = self._process_normal_data(normal_data) - if self.pending_normal_data is None: - self.pending_normal_data = processed_normal - else: - self.pending_normal_data = self._merge_normal_data(self.pending_normal_data, processed_normal) + if processed_normal is not None: + if self.pending_normal_data is None: + self.pending_normal_data = processed_normal + else: + self.pending_normal_data = self._merge_normal_data(self.pending_normal_data, processed_normal) # Write blob-file columns to dedicated blob writers. for blob_column, blob_data in blob_data_map.items(): @@ -239,8 +240,7 @@ def close(self): return try: - if self.pending_normal_data is not None and self.pending_normal_data.num_rows > 0: - self._close_current_writers() + self._close_current_writers() if self._external_storage_writer: self._external_storage_writer.close() except Exception as e: @@ -310,10 +310,10 @@ def _validate_descriptor_stored_fields_input(self, data: pa.RecordBatch): ) from e @staticmethod - def _process_normal_data(data: pa.RecordBatch) -> pa.Table: + def _process_normal_data(data: pa.RecordBatch) -> Optional[pa.Table]: """Process normal data (similar to base DataWriter).""" if data is None or data.num_rows == 0: - return pa.Table.from_batches([]) + return None return pa.Table.from_batches([data]) @staticmethod @@ -334,35 +334,35 @@ def _should_roll_normal(self) -> bool: def _close_current_writers(self): """Close normal, blob, and vector writers; add metadata in order: normal, blob, vector.""" - if self.pending_normal_data is None or self.pending_normal_data.num_rows == 0: - return - - # Close normal writer and get metadata - normal_meta = self._write_normal_data_to_file(self.pending_normal_data) + normal_meta = None + if self.pending_normal_data is not None and self.pending_normal_data.num_rows > 0: + normal_meta = self._write_normal_data_to_file(self.pending_normal_data) blob_metas = [] for blob_column in self.blob_file_column_names: writer_metas = self.blob_writers[blob_column].prepare_commit() - self._validate_consistency(normal_meta, writer_metas, blob_column) + if normal_meta is not None: + self._validate_consistency(normal_meta, writer_metas, blob_column) blob_metas.extend(writer_metas) vector_metas = [] if self.vector_writer is not None: vector_metas = self.vector_writer.prepare_commit() self.vector_writer.committed_files.clear() - if vector_metas: + if vector_metas and normal_meta is not None: self._validate_consistency(normal_meta, vector_metas, 'vector') - # Add metadata in order: normal, blob, vector - self.committed_files.append(normal_meta) + if normal_meta is not None: + self.committed_files.append(normal_meta) self.committed_files.extend(blob_metas) self.committed_files.extend(vector_metas) - # Reset pending data self.pending_normal_data = None - logger.info(f"Closed writers - normal: {normal_meta.file_name}, " - f"{len(blob_metas)} blob metas, {len(vector_metas)} vector metas") + if normal_meta is not None or blob_metas or vector_metas: + normal_name = normal_meta.file_name if normal_meta is not None else '' + logger.info(f"Closed writers - normal: {normal_name}, " + f"{len(blob_metas)} blob metas, {len(vector_metas)} vector metas") def _write_normal_data_to_file(self, data: pa.Table) -> Optional[DataFileMeta]: if data.num_rows == 0: