Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion paimon-python/pypaimon/read/table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def _add_row_kind_to_schema(schema: pyarrow.Schema) -> pyarrow.Schema:

@staticmethod
def _try_to_pad_batch_by_schema(batch: pyarrow.RecordBatch, target_schema):
if batch.schema.names == target_schema.names:
if batch.schema.equals(target_schema):
return batch

columns = []
Expand All @@ -144,6 +144,8 @@ def _try_to_pad_batch_by_schema(batch: pyarrow.RecordBatch, target_schema):
for field in target_schema:
if field.name in batch.schema.names:
col = batch.column(field.name)
if not col.type.equals(field.type):
col = col.cast(field.type)
else:
col = pyarrow.nulls(num_rows, type=field.type)
columns.append(col)
Expand Down
104 changes: 52 additions & 52 deletions paimon-python/pypaimon/tests/blob_table_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
from pypaimon.write.commit_message import CommitMessage


class DataBlobWriterTest(unittest.TestCase):
"""Tests for DataBlobWriter functionality with paimon table operations."""
class DedicatedFormatWriterTest(unittest.TestCase):
"""Tests for DedicatedFormatWriter functionality with paimon table operations."""

@classmethod
def setUpClass(cls):
Expand All @@ -51,8 +51,8 @@ def tearDownClass(cls):
except OSError:
pass

def test_data_blob_writer_basic_functionality(self):
"""Test basic DataBlobWriter functionality with paimon table."""
def test_dedicated_format_writer_basic_functionality(self):
"""Test basic DedicatedFormatWriter functionality with paimon table."""
from pypaimon import Schema

# Create schema with normal and blob columns
Expand Down Expand Up @@ -82,7 +82,7 @@ def test_data_blob_writer_basic_functionality(self):
'blob_data': [b'blob_data_1', b'blob_data_2', b'blob_data_3']
}, schema=pa_schema)

# Test DataBlobWriter initialization using proper table API
# Test DedicatedFormatWriter initialization using proper table API
# Use proper table API to create writer
write_builder = table.new_batch_write_builder()
blob_writer = write_builder.new_write()
Expand All @@ -108,8 +108,8 @@ def test_data_blob_writer_basic_functionality(self):

blob_writer.close()

def test_data_blob_writer_schema_detection(self):
"""Test that DataBlobWriter correctly detects blob columns from schema."""
def test_dedicated_format_writer_schema_detection(self):
"""Test that DedicatedFormatWriter correctly detects blob columns from schema."""
from pypaimon import Schema

# Test schema with blob column
Expand All @@ -132,7 +132,7 @@ def test_data_blob_writer_schema_detection(self):
write_builder = table.new_batch_write_builder()
blob_writer = write_builder.new_write()

# Test that DataBlobWriter was created internally
# Test that DedicatedFormatWriter was created internally
# We can verify this by checking the internal data writers
test_data = pa.Table.from_pydict({
'id': [1, 2, 3],
Expand All @@ -142,19 +142,19 @@ def test_data_blob_writer_schema_detection(self):
# Write data to trigger writer creation
blob_writer.write_arrow(test_data)

# Verify that a DataBlobWriter was created internally
# Verify that a DedicatedFormatWriter was created internally
data_writers = blob_writer.file_store_write.data_writers
self.assertGreater(len(data_writers), 0)

# Check that the writer is a DataBlobWriter
# Check that the writer is a DedicatedFormatWriter
for writer in data_writers.values():
from pypaimon.write.writer.data_blob_writer import DataBlobWriter
self.assertIsInstance(writer, DataBlobWriter)
from pypaimon.write.writer.dedicated_format_writer import DedicatedFormatWriter
self.assertIsInstance(writer, DedicatedFormatWriter)

blob_writer.close()

def test_data_blob_writer_no_blob_column(self):
"""Test that DataBlobWriter raises error when no blob column is found."""
def test_dedicated_format_writer_no_blob_column(self):
"""Test that DedicatedFormatWriter raises error when no blob column is found."""
from pypaimon import Schema

# Test schema without blob column
Expand All @@ -177,7 +177,7 @@ def test_data_blob_writer_no_blob_column(self):
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()

# Test that a regular writer (not DataBlobWriter) was created
# Test that a regular writer (not DedicatedFormatWriter) was created
test_data = pa.Table.from_pydict({
'id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Charlie']
Expand All @@ -186,19 +186,19 @@ def test_data_blob_writer_no_blob_column(self):
# Write data to trigger writer creation
writer.write_arrow(test_data)

# Verify that a regular writer was created (not DataBlobWriter)
# Verify that a regular writer was created (not DedicatedFormatWriter)
data_writers = writer.file_store_write.data_writers
self.assertGreater(len(data_writers), 0)

# Check that the writer is NOT a DataBlobWriter
# Check that the writer is NOT a DedicatedFormatWriter
for writer_instance in data_writers.values():
from pypaimon.write.writer.data_blob_writer import DataBlobWriter
self.assertNotIsInstance(writer_instance, DataBlobWriter)
from pypaimon.write.writer.dedicated_format_writer import DedicatedFormatWriter
self.assertNotIsInstance(writer_instance, DedicatedFormatWriter)

writer.close()

def test_data_blob_writer_multiple_blob_columns(self):
"""Test that DataBlobWriter supports multiple blob columns."""
def test_dedicated_format_writer_multiple_blob_columns(self):
"""Test that DedicatedFormatWriter supports multiple blob columns."""
from pypaimon import Schema

# Test schema with multiple blob columns
Expand Down Expand Up @@ -242,7 +242,7 @@ def test_data_blob_writer_multiple_blob_columns(self):
result = table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
self.assertEqual(result.num_rows, 3)

def test_data_blob_writer_partial_write_with_write_type(self):
def test_dedicated_format_writer_partial_write_with_write_type(self):
"""Partial write (normal + blob subset) via with_write_type: split must match batch columns."""
from pypaimon import Schema

Expand Down Expand Up @@ -292,7 +292,7 @@ def test_data_blob_writer_partial_write_with_write_type(self):
self.assertEqual(out.column('blob_data').to_pylist(), [b'a', b'b'])
self.assertEqual(out.column('name').to_pylist(), [None, None])

def test_data_blob_writer_partial_write_normal_only_with_write_type(self):
def test_dedicated_format_writer_partial_write_normal_only_with_write_type(self):
"""Partial write without blob columns in write_cols must not touch blob split paths."""
from pypaimon import Schema

Expand Down Expand Up @@ -333,7 +333,7 @@ def test_data_blob_writer_partial_write_normal_only_with_write_type(self):
self.assertEqual(out.column('name').to_pylist(), ['n'])
self.assertEqual(out.column('blob_data').to_pylist(), [None])

def test_data_blob_writer_partial_write_single_blob_of_two_with_write_type(self):
def test_dedicated_format_writer_partial_write_single_blob_of_two_with_write_type(self):
"""with_write_type lists only one blob column: only that column gets .blob files."""
from pypaimon import Schema

Expand Down Expand Up @@ -369,8 +369,8 @@ def test_data_blob_writer_partial_write_single_blob_of_two_with_write_type(self)
write_builder.new_commit().commit(commit_messages)
writer.close()

def test_data_blob_writer_write_operations(self):
"""Test DataBlobWriter write operations with real data."""
def test_dedicated_format_writer_write_operations(self):
"""Test DedicatedFormatWriter write operations with real data."""
from pypaimon import Schema

# Create schema with blob column
Expand Down Expand Up @@ -411,8 +411,8 @@ def test_data_blob_writer_write_operations(self):

blob_writer.close()

def test_data_blob_writer_write_large_blob(self):
"""Test DataBlobWriter with very large blob data (50MB per item) in 10 batches."""
def test_dedicated_format_writer_write_large_blob(self):
"""Test DedicatedFormatWriter with very large blob data (50MB per item) in 10 batches."""
from pypaimon import Schema

# Create schema with blob column
Expand Down Expand Up @@ -468,7 +468,7 @@ def test_data_blob_writer_write_large_blob(self):
# Log progress for large data processing
print(f"Completed batch {batch_num + 1}/10 with {batch.num_rows} rows")

# Record count is tracked internally by DataBlobWriter
# Record count is tracked internally by DedicatedFormatWriter

# Test prepare commit
commit_messages: CommitMessage = blob_writer.prepare_commit()
Expand Down Expand Up @@ -512,8 +512,8 @@ def test_data_blob_writer_write_large_blob(self):

blob_writer.close()

def test_data_blob_writer_abort_functionality(self):
"""Test DataBlobWriter abort functionality."""
def test_dedicated_format_writer_abort_functionality(self):
"""Test DedicatedFormatWriter abort functionality."""
from pypaimon import Schema

# Create schema with blob column
Expand Down Expand Up @@ -547,12 +547,12 @@ def test_data_blob_writer_abort_functionality(self):
blob_writer.write_arrow_batch(batch)

# Test abort - BatchTableWrite doesn't have abort method
# The abort functionality is handled internally by DataBlobWriter
# The abort functionality is handled internally by DedicatedFormatWriter

blob_writer.close()

def test_data_blob_writer_multiple_batches(self):
"""Test DataBlobWriter with multiple batches and verify results."""
def test_dedicated_format_writer_multiple_batches(self):
"""Test DedicatedFormatWriter with multiple batches and verify results."""
from pypaimon import Schema

# Create schema with blob column
Expand Down Expand Up @@ -609,7 +609,7 @@ def test_data_blob_writer_multiple_batches(self):
blob_writer.write_arrow_batch(batch)
total_rows += batch.num_rows

# Record count is tracked internally by DataBlobWriter
# Record count is tracked internally by DedicatedFormatWriter

# Test prepare commit
commit_messages = blob_writer.prepare_commit()
Expand All @@ -620,8 +620,8 @@ def test_data_blob_writer_multiple_batches(self):

blob_writer.close()

def test_data_blob_writer_large_batches(self):
"""Test DataBlobWriter with large batches to test rolling behavior."""
def test_dedicated_format_writer_large_batches(self):
"""Test DedicatedFormatWriter with large batches to test rolling behavior."""
from pypaimon import Schema

# Create schema with blob column
Expand Down Expand Up @@ -672,7 +672,7 @@ def test_data_blob_writer_large_batches(self):
blob_writer.write_arrow_batch(batch)
total_rows += batch.num_rows

# Record count is tracked internally by DataBlobWriter
# Record count is tracked internally by DedicatedFormatWriter

# Test prepare commit
commit_messages = blob_writer.prepare_commit()
Expand All @@ -683,8 +683,8 @@ def test_data_blob_writer_large_batches(self):

blob_writer.close()

def test_data_blob_writer_mixed_data_types(self):
"""Test DataBlobWriter with mixed data types in blob column."""
def test_dedicated_format_writer_mixed_data_types(self):
"""Test DedicatedFormatWriter with mixed data types in blob column."""
from pypaimon import Schema

# Create schema with blob column
Expand Down Expand Up @@ -727,7 +727,7 @@ def test_data_blob_writer_mixed_data_types(self):
blob_writer.write_arrow_batch(batch)
total_rows += batch.num_rows

# Record count is tracked internally by DataBlobWriter
# Record count is tracked internally by DedicatedFormatWriter

# Test prepare commit
commit_messages = blob_writer.prepare_commit()
Expand Down Expand Up @@ -787,8 +787,8 @@ def test_data_blob_writer_mixed_data_types(self):
self.assertEqual(result_type, original_type, f"Row {i + 1}: Type should match")
self.assertEqual(result_data, original_data, f"Row {i + 1}: Blob data should match")

def test_data_blob_writer_empty_batches(self):
"""Test DataBlobWriter with empty batches."""
def test_dedicated_format_writer_empty_batches(self):
"""Test DedicatedFormatWriter with empty batches."""
from pypaimon import Schema

# Create schema with blob column
Expand Down Expand Up @@ -843,17 +843,17 @@ def test_data_blob_writer_empty_batches(self):
total_rows += batch.num_rows

# Verify record count (empty batch should not affect count)
# Record count is tracked internally by DataBlobWriter
# Record count is tracked internally by DataBlobWriter
# Record count is tracked internally by DedicatedFormatWriter
# Record count is tracked internally by DedicatedFormatWriter

# Test prepare commit
commit_messages = blob_writer.prepare_commit()
self.assertIsInstance(commit_messages, list)

blob_writer.close()

def test_data_blob_writer_rolling_behavior(self):
"""Test DataBlobWriter rolling behavior with multiple commits."""
def test_dedicated_format_writer_rolling_behavior(self):
"""Test DedicatedFormatWriter rolling behavior with multiple commits."""
from pypaimon import Schema

# Create schema with blob column
Expand Down Expand Up @@ -893,7 +893,7 @@ def test_data_blob_writer_rolling_behavior(self):
blob_writer.write_arrow_batch(batch)

# Verify total record count
# Record count is tracked internally by DataBlobWriter
# Record count is tracked internally by DedicatedFormatWriter

# Test prepare commit
commit_messages = blob_writer.prepare_commit()
Expand Down Expand Up @@ -2715,8 +2715,8 @@ def test_blob_large_data_volume_with_shard(self):
actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
self.assertEqual(actual, expected)

def test_data_blob_writer_with_slice(self):
"""Test DataBlobWriter with mixed data types in blob column."""
def test_dedicated_format_writer_with_slice(self):
"""Test DedicatedFormatWriter with mixed data types in blob column."""

# Create schema with blob column
pa_schema = pa.schema([
Expand Down Expand Up @@ -2777,8 +2777,8 @@ def test_data_blob_writer_with_slice(self):
self.assertEqual(result.num_columns, 3, "Should have 3 columns")
self.assertEqual(result["id"].unique().to_pylist(), [2, 3], "Get incorrect column ID")

def test_data_blob_writer_with_shard(self):
"""Test DataBlobWriter with mixed data types in blob column."""
def test_dedicated_format_writer_with_shard(self):
"""Test DedicatedFormatWriter with mixed data types in blob column."""

# Create schema with blob column
pa_schema = pa.schema([
Expand Down
Loading
Loading