From 3e9340414eeca4f9c480ec26b96d0ad9cc5b2a41 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 3 Jun 2026 15:05:14 +0800 Subject: [PATCH 1/6] proto --- .../read/reader/limited_record_reader.py | 14 +++ paimon-python/pypaimon/read/split_read.py | 33 ++++++- paimon-python/pypaimon/read/table_read.py | 2 + .../tests/test_limited_record_reader.py | 98 +++++++++++++++++++ 4 files changed, 144 insertions(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/limited_record_reader.py b/paimon-python/pypaimon/read/reader/limited_record_reader.py index 74f2612ebdc0..948207807def 100644 --- a/paimon-python/pypaimon/read/reader/limited_record_reader.py +++ b/paimon-python/pypaimon/read/reader/limited_record_reader.py @@ -26,6 +26,8 @@ from typing import Optional +from pyarrow import RecordBatch + from pypaimon.read.reader.iface.record_iterator import RecordIterator from pypaimon.read.reader.iface.record_reader import RecordReader @@ -50,6 +52,18 @@ def read_batch(self) -> Optional[RecordIterator]: return None return _LimitedRecordIterator(batch, self) + def read_arrow_batch(self) -> Optional[RecordBatch]: + if self.count >= self._limit: + return None + batch = self._inner.read_arrow_batch() + if batch is None: + return None + remaining = self._limit - self.count + if batch.num_rows > remaining: + batch = batch.slice(0, remaining) + self.count += batch.num_rows + return batch + def close(self) -> None: self._inner.close() diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index ddb349e0cab3..87b8ec2dccd7 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -572,6 +572,21 @@ def _genarate_deletion_file_readers(self): class RawFileSplitRead(SplitRead): + def __init__( + self, + table, + predicate: Optional[Predicate], + read_type: List[DataField], + split: Split, + row_tracking_enabled: bool, + nested_name_paths: Optional[List[List[str]]] = None, + limit: Optional[int] = None): + super().__init__( + table, predicate, read_type, split, row_tracking_enabled, + nested_name_paths=nested_name_paths, + ) + self.limit = limit + def raw_reader_supplier(self, file: DataFileMeta, dv_factory: Optional[Callable] = None) -> Optional[RecordReader]: read_fields = self._get_final_read_data_fields() # Check if this is a SlicedSplit to get shard_file_idx_map @@ -620,9 +635,14 @@ def create_reader(self) -> RecordReader: vector_field_indices=_vector_field_indices(self.read_fields)) # if the table is appendonly table, we don't need extra filter, all predicates has pushed down if self.table.is_primary_key_table and self.predicate_for_reader: - return FilterRecordReader(concat_reader, self.predicate_for_reader) + reader = FilterRecordReader(concat_reader, self.predicate_for_reader) else: - return concat_reader + reader = concat_reader + if self.limit is not None: + from pypaimon.read.reader.limited_record_reader import \ + LimitedRecordReader + reader = LimitedRecordReader(reader, self.limit) + return reader def _get_all_data_fields(self): if self.row_tracking_enabled: @@ -775,7 +795,8 @@ def __init__( read_type: List[DataField], split: Split, row_tracking_enabled: bool, - nested_name_paths: Optional[List[List[str]]] = None): + nested_name_paths: Optional[List[List[str]]] = None, + limit: Optional[int] = None): self.row_ranges = None actual_split = split if isinstance(split, IndexedSplit): @@ -785,6 +806,7 @@ def __init__( table, predicate, read_type, actual_split, row_tracking_enabled, nested_name_paths=nested_name_paths, ) + self.limit = limit def _push_down_predicate(self) -> Optional[Predicate]: # Data evolution: files may have different schemas, so we don't push predicate @@ -827,6 +849,11 @@ def create_reader(self) -> RecordReader: and CoreOptions.blob_descriptor_fields(self.table.options)): reader = BlobDescriptorConvertReader(reader, self.table) + if self.limit is not None: + from pypaimon.read.reader.limited_record_reader import \ + LimitedRecordReader + reader = LimitedRecordReader(reader, self.limit) + return reader def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 7c717df24d93..826b2b4024a1 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -580,6 +580,7 @@ def _create_split_read(self, split: Split) -> SplitRead: split=split, row_tracking_enabled=True, nested_name_paths=self.nested_name_paths, + limit=self.limit, ) else: return RawFileSplitRead( @@ -589,6 +590,7 @@ def _create_split_read(self, split: Split) -> SplitRead: split=split, row_tracking_enabled=self.table.options.row_tracking_enabled(), nested_name_paths=self.nested_name_paths, + limit=self.limit, ) def _widen_to_top_level_for_merge(self) -> List[DataField]: diff --git a/paimon-python/pypaimon/tests/test_limited_record_reader.py b/paimon-python/pypaimon/tests/test_limited_record_reader.py index edbc1b75f433..76d4885d7efb 100644 --- a/paimon-python/pypaimon/tests/test_limited_record_reader.py +++ b/paimon-python/pypaimon/tests/test_limited_record_reader.py @@ -19,6 +19,9 @@ import unittest from typing import List, Optional +import pyarrow as pa + +from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader from pypaimon.read.reader.iface.record_iterator import RecordIterator from pypaimon.read.reader.iface.record_reader import RecordReader from pypaimon.read.reader.limited_record_reader import LimitedRecordReader @@ -137,5 +140,100 @@ def test_does_not_drain_inner_when_limit_met_within_first_batch(self): self.assertEqual(inner.read_batch_calls, 1) +class _StaticBatchReader(RecordBatchReader): + """Hands back arrow batches one at a time for testing + ``read_arrow_batch`` on ``LimitedRecordReader``.""" + + def __init__(self, batches: List[pa.RecordBatch]): + self._batches = batches + self._idx = 0 + self.closed = False + self.read_arrow_batch_calls = 0 + + def read_arrow_batch(self) -> Optional[pa.RecordBatch]: + self.read_arrow_batch_calls += 1 + if self._idx >= len(self._batches): + return None + batch = self._batches[self._idx] + self._idx += 1 + return batch + + def close(self): + self.closed = True + + +def _make_batch(values: List[int]) -> pa.RecordBatch: + return pa.RecordBatch.from_arrays( + [pa.array(values, type=pa.int64())], names=["v"]) + + +def _drain_arrow(reader) -> List[int]: + out = [] + while True: + batch = reader.read_arrow_batch() + if batch is None: + break + out.extend(batch.column("v").to_pylist()) + return out + + +class LimitedRecordReaderArrowBatchTest(unittest.TestCase): + + def test_arrow_batch_limit_within_single_batch(self): + inner = _StaticBatchReader([_make_batch([1, 2, 3, 4, 5])]) + reader = LimitedRecordReader(inner, limit=3) + self.assertEqual(_drain_arrow(reader), [1, 2, 3]) + + def test_arrow_batch_limit_spans_multiple_batches(self): + inner = _StaticBatchReader([ + _make_batch([1, 2]), + _make_batch([3, 4]), + _make_batch([5, 6]), + ]) + reader = LimitedRecordReader(inner, limit=5) + self.assertEqual(_drain_arrow(reader), [1, 2, 3, 4, 5]) + + def test_arrow_batch_limit_larger_than_total(self): + inner = _StaticBatchReader([_make_batch([1, 2, 3])]) + reader = LimitedRecordReader(inner, limit=999) + self.assertEqual(_drain_arrow(reader), [1, 2, 3]) + + def test_arrow_batch_limit_zero(self): + inner = _StaticBatchReader([_make_batch([1, 2, 3])]) + reader = LimitedRecordReader(inner, limit=0) + self.assertEqual(_drain_arrow(reader), []) + self.assertIsNone(reader.read_arrow_batch()) + + def test_arrow_batch_and_read_batch_share_count(self): + """Verify that consuming rows via ``read_arrow_batch`` advances + the shared ``count`` so ``read_batch`` respects the limit too.""" + batches = [_make_batch([10, 20, 30])] + row_batches = [[100, 200]] + inner = _StaticBatchReader(batches) + # Patch read_batch onto the batch reader so we can test the + # shared counter across both paths. + inner_row_batches = list(row_batches) + original_read_batch = inner.read_batch + + def patched_read_batch(): + if not inner_row_batches: + return None + items = inner_row_batches.pop(0) + return _ListIterator(items) + + inner.read_batch = patched_read_batch + + reader = LimitedRecordReader(inner, limit=4) + # Consume 3 rows via arrow batch + batch = reader.read_arrow_batch() + self.assertEqual(batch.column("v").to_pylist(), [10, 20, 30]) + self.assertEqual(reader.count, 3) + # Only 1 more row allowed via read_batch + it = reader.read_batch() + self.assertIsNotNone(it) + self.assertEqual(it.next(), 100) + self.assertIsNone(it.next()) # limit reached + + if __name__ == '__main__': unittest.main() From d9c823e186154dd1c3bf89477503cd9bacb982bb Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 3 Jun 2026 15:42:00 +0800 Subject: [PATCH 2/6] addTest --- paimon-python/pypaimon/read/split_read.py | 23 +---- .../pypaimon/tests/test_limit_pushdown.py | 45 +++++++++ .../tests/test_limited_record_reader.py | 95 ------------------- 3 files changed, 50 insertions(+), 113 deletions(-) diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 87b8ec2dccd7..31189bd00fd8 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -109,7 +109,8 @@ def __init__( read_type: List[DataField], split: Split, row_tracking_enabled: bool, - nested_name_paths: Optional[List[List[str]]] = None): + nested_name_paths: Optional[List[List[str]]] = None, + limit: Optional[int] = None): from pypaimon.table.file_store_table import FileStoreTable self.table: FileStoreTable = table @@ -119,6 +120,7 @@ def __init__( self.row_tracking_enabled = row_tracking_enabled self.value_arity = len(read_type) self.nested_name_paths = nested_name_paths + self.limit = limit # Snapshot the raw value-side schema before _create_key_value_fields # wraps it, so MergeFileSplitRead can hand per-value-field nullable # flags to merge functions that mirror Java's NOT-NULL check. @@ -572,21 +574,6 @@ def _genarate_deletion_file_readers(self): class RawFileSplitRead(SplitRead): - def __init__( - self, - table, - predicate: Optional[Predicate], - read_type: List[DataField], - split: Split, - row_tracking_enabled: bool, - nested_name_paths: Optional[List[List[str]]] = None, - limit: Optional[int] = None): - super().__init__( - table, predicate, read_type, split, row_tracking_enabled, - nested_name_paths=nested_name_paths, - ) - self.limit = limit - def raw_reader_supplier(self, file: DataFileMeta, dv_factory: Optional[Callable] = None) -> Optional[RecordReader]: read_fields = self._get_final_read_data_fields() # Check if this is a SlicedSplit to get shard_file_idx_map @@ -670,9 +657,9 @@ def __init__( split=split, row_tracking_enabled=row_tracking_enabled, nested_name_paths=None, + limit=limit, ) self.outer_extract_name_paths = outer_extract_name_paths - self.limit = limit # Built once per split-read (value_fields and options are constant # for the object's life), not per section. ``None`` when # ``sequence.field`` is unset, in which case the heap falls back to @@ -805,8 +792,8 @@ def __init__( super().__init__( table, predicate, read_type, actual_split, row_tracking_enabled, nested_name_paths=nested_name_paths, + limit=limit, ) - self.limit = limit def _push_down_predicate(self) -> Optional[Predicate]: # Data evolution: files may have different schemas, so we don't push predicate diff --git a/paimon-python/pypaimon/tests/test_limit_pushdown.py b/paimon-python/pypaimon/tests/test_limit_pushdown.py index 2e717c28c7e1..97107a0af8e5 100644 --- a/paimon-python/pypaimon/tests/test_limit_pushdown.py +++ b/paimon-python/pypaimon/tests/test_limit_pushdown.py @@ -207,6 +207,51 @@ def test_to_iterator_limit_short_circuits(self): rows = list(it) self.assertEqual(len(rows), 7) + # ---- SplitRead-level limit pushdown verification --------------------- + + def test_append_only_split_read_creates_limited_reader(self): + """Verify that RawFileSplitRead.create_reader() returns a + LimitedRecordReader when limit is set, proving SplitRead-level + limit pushdown is in effect.""" + from pypaimon.read.reader.limited_record_reader import LimitedRecordReader + + table = self._create_ao_table('limit_ao_split_read') + self._write_ao_partitions(table, [('p1', list(range(10)))]) + rb = table.new_read_builder().with_limit(3) + table_read = rb.new_read() + splits = rb.new_scan().plan().splits() + self.assertGreater(len(splits), 0) + for split in splits: + split_read = table_read._create_split_read(split) + self.assertEqual(split_read.limit, 3) + reader = split_read.create_reader() + self.assertIsInstance(reader, LimitedRecordReader, + "RawFileSplitRead.create_reader() should wrap with LimitedRecordReader") + reader.close() + + def test_append_only_split_read_limit_truncates_within_split(self): + """Directly read from a single split's reader with limit and verify + the reader itself stops at the limit boundary, not relying on + TableRead-level truncation.""" + table = self._create_ao_table('limit_ao_split_truncate') + self._write_ao_partitions(table, [('p1', list(range(20)))]) + rb = table.new_read_builder().with_limit(5) + table_read = rb.new_read() + splits = rb.new_scan().plan().splits() + self.assertEqual(len(splits), 1) + split_read = table_read._create_split_read(splits[0]) + reader = split_read.create_reader() + # Drain the reader directly, bypassing TableRead-level control + total_rows = 0 + while True: + batch = reader.read_arrow_batch() + if batch is None: + break + total_rows += batch.num_rows + reader.close() + self.assertEqual(total_rows, 5, + "SplitRead-level reader should stop at limit=5, got %d" % total_rows) + if __name__ == '__main__': unittest.main() diff --git a/paimon-python/pypaimon/tests/test_limited_record_reader.py b/paimon-python/pypaimon/tests/test_limited_record_reader.py index 76d4885d7efb..ea7df9947b9b 100644 --- a/paimon-python/pypaimon/tests/test_limited_record_reader.py +++ b/paimon-python/pypaimon/tests/test_limited_record_reader.py @@ -140,100 +140,5 @@ def test_does_not_drain_inner_when_limit_met_within_first_batch(self): self.assertEqual(inner.read_batch_calls, 1) -class _StaticBatchReader(RecordBatchReader): - """Hands back arrow batches one at a time for testing - ``read_arrow_batch`` on ``LimitedRecordReader``.""" - - def __init__(self, batches: List[pa.RecordBatch]): - self._batches = batches - self._idx = 0 - self.closed = False - self.read_arrow_batch_calls = 0 - - def read_arrow_batch(self) -> Optional[pa.RecordBatch]: - self.read_arrow_batch_calls += 1 - if self._idx >= len(self._batches): - return None - batch = self._batches[self._idx] - self._idx += 1 - return batch - - def close(self): - self.closed = True - - -def _make_batch(values: List[int]) -> pa.RecordBatch: - return pa.RecordBatch.from_arrays( - [pa.array(values, type=pa.int64())], names=["v"]) - - -def _drain_arrow(reader) -> List[int]: - out = [] - while True: - batch = reader.read_arrow_batch() - if batch is None: - break - out.extend(batch.column("v").to_pylist()) - return out - - -class LimitedRecordReaderArrowBatchTest(unittest.TestCase): - - def test_arrow_batch_limit_within_single_batch(self): - inner = _StaticBatchReader([_make_batch([1, 2, 3, 4, 5])]) - reader = LimitedRecordReader(inner, limit=3) - self.assertEqual(_drain_arrow(reader), [1, 2, 3]) - - def test_arrow_batch_limit_spans_multiple_batches(self): - inner = _StaticBatchReader([ - _make_batch([1, 2]), - _make_batch([3, 4]), - _make_batch([5, 6]), - ]) - reader = LimitedRecordReader(inner, limit=5) - self.assertEqual(_drain_arrow(reader), [1, 2, 3, 4, 5]) - - def test_arrow_batch_limit_larger_than_total(self): - inner = _StaticBatchReader([_make_batch([1, 2, 3])]) - reader = LimitedRecordReader(inner, limit=999) - self.assertEqual(_drain_arrow(reader), [1, 2, 3]) - - def test_arrow_batch_limit_zero(self): - inner = _StaticBatchReader([_make_batch([1, 2, 3])]) - reader = LimitedRecordReader(inner, limit=0) - self.assertEqual(_drain_arrow(reader), []) - self.assertIsNone(reader.read_arrow_batch()) - - def test_arrow_batch_and_read_batch_share_count(self): - """Verify that consuming rows via ``read_arrow_batch`` advances - the shared ``count`` so ``read_batch`` respects the limit too.""" - batches = [_make_batch([10, 20, 30])] - row_batches = [[100, 200]] - inner = _StaticBatchReader(batches) - # Patch read_batch onto the batch reader so we can test the - # shared counter across both paths. - inner_row_batches = list(row_batches) - original_read_batch = inner.read_batch - - def patched_read_batch(): - if not inner_row_batches: - return None - items = inner_row_batches.pop(0) - return _ListIterator(items) - - inner.read_batch = patched_read_batch - - reader = LimitedRecordReader(inner, limit=4) - # Consume 3 rows via arrow batch - batch = reader.read_arrow_batch() - self.assertEqual(batch.column("v").to_pylist(), [10, 20, 30]) - self.assertEqual(reader.count, 3) - # Only 1 more row allowed via read_batch - it = reader.read_batch() - self.assertIsNotNone(it) - self.assertEqual(it.next(), 100) - self.assertIsNone(it.next()) # limit reached - - if __name__ == '__main__': unittest.main() From 1898c7e9806f55510669922c07cb9655057eac18 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 3 Jun 2026 16:02:00 +0800 Subject: [PATCH 3/6] fix --- .../read/reader/limited_record_reader.py | 33 +++++++++++++++++++ paimon-python/pypaimon/read/split_read.py | 21 +++++------- .../pypaimon/tests/test_limit_pushdown.py | 19 ++++++----- 3 files changed, 53 insertions(+), 20 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/limited_record_reader.py b/paimon-python/pypaimon/read/reader/limited_record_reader.py index 948207807def..db0de6446f5f 100644 --- a/paimon-python/pypaimon/read/reader/limited_record_reader.py +++ b/paimon-python/pypaimon/read/reader/limited_record_reader.py @@ -28,6 +28,7 @@ from pyarrow import RecordBatch +from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader from pypaimon.read.reader.iface.record_iterator import RecordIterator from pypaimon.read.reader.iface.record_reader import RecordReader @@ -82,3 +83,35 @@ def next(self): return None self._limiter.count += 1 return row + + +class LimitedRecordBatchReader(RecordBatchReader): + """Stop emitting rows once ``limit`` rows have been delivered. + + Unlike ``LimitedRecordReader`` (which inherits ``RecordReader``), + this class inherits ``RecordBatchReader`` so that the + ``isinstance(..., RecordBatchReader)`` gate in TableRead picks the + arrow-batch code path. + """ + + def __init__(self, inner: RecordBatchReader, limit: int): + if limit < 0: + raise ValueError("limit must be non-negative, got %d" % limit) + self._inner = inner + self._limit = limit + self.count = 0 + + def read_arrow_batch(self) -> Optional[RecordBatch]: + if self.count >= self._limit: + return None + batch = self._inner.read_arrow_batch() + if batch is None: + return None + remaining = self._limit - self.count + if batch.num_rows > remaining: + batch = batch.slice(0, remaining) + self.count += batch.num_rows + return batch + + def close(self) -> None: + self._inner.close() diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 31189bd00fd8..37a47e6d927e 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -42,6 +42,7 @@ from pypaimon.read.reader.format_avro_reader import FormatAvroReader from pypaimon.read.reader.blob_descriptor_convert_reader import BlobDescriptorConvertReader from pypaimon.read.reader.filter_record_batch_reader import FilterRecordBatchReader +from pypaimon.read.reader.limited_record_reader import LimitedRecordBatchReader, LimitedRecordReader from pypaimon.read.reader.row_range_filter_record_reader import RowIdFilterRecordBatchReader from pypaimon.read.reader.format_blob_reader import FormatBlobReader from pypaimon.read.reader.format_lance_reader import FormatLanceReader @@ -146,8 +147,8 @@ def __init__( # the space FilterRecordReader actually evaluates against. read_type_names = {f.name for f in read_type} if ( - self.predicate is not None - and _get_all_fields(self.predicate).issubset(read_type_names) + self.predicate is not None + and _get_all_fields(self.predicate).issubset(read_type_names) ): self.predicate_for_reader = rewrite_predicate_indices( self.predicate, read_type @@ -325,7 +326,7 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, effective_first_row_id = file.first_row_id if (shard_range is not None and file.first_row_id is not None and file_format in ( - CoreOptions.FILE_FORMAT_VORTEX, CoreOptions.FILE_FORMAT_LANCE)): + CoreOptions.FILE_FORMAT_VORTEX, CoreOptions.FILE_FORMAT_LANCE)): effective_first_row_id = file.first_row_id + shard_range[0] if for_merge_read: @@ -623,12 +624,12 @@ def create_reader(self) -> RecordReader: # if the table is appendonly table, we don't need extra filter, all predicates has pushed down if self.table.is_primary_key_table and self.predicate_for_reader: reader = FilterRecordReader(concat_reader, self.predicate_for_reader) + if self.limit is not None: + reader = LimitedRecordReader(reader, self.limit) else: reader = concat_reader - if self.limit is not None: - from pypaimon.read.reader.limited_record_reader import \ - LimitedRecordReader - reader = LimitedRecordReader(reader, self.limit) + if self.limit is not None: + reader = LimitedRecordBatchReader(reader, self.limit) return reader def _get_all_data_fields(self): @@ -764,8 +765,6 @@ def create_reader(self) -> RecordReader: blob_field_indices=_blob_field_indices(inner_value_fields), vector_field_indices=_vector_field_indices(inner_value_fields)) if self.limit is not None: - from pypaimon.read.reader.limited_record_reader import \ - LimitedRecordReader reader = LimitedRecordReader(reader, self.limit) return reader @@ -837,9 +836,7 @@ def create_reader(self) -> RecordReader: reader = BlobDescriptorConvertReader(reader, self.table) if self.limit is not None: - from pypaimon.read.reader.limited_record_reader import \ - LimitedRecordReader - reader = LimitedRecordReader(reader, self.limit) + reader = LimitedRecordBatchReader(reader, self.limit) return reader diff --git a/paimon-python/pypaimon/tests/test_limit_pushdown.py b/paimon-python/pypaimon/tests/test_limit_pushdown.py index 97107a0af8e5..a4b51ee3c00c 100644 --- a/paimon-python/pypaimon/tests/test_limit_pushdown.py +++ b/paimon-python/pypaimon/tests/test_limit_pushdown.py @@ -110,8 +110,8 @@ def test_append_only_limit_stops_within_first_split(self): exactly 3 rows — even though each partition split has 5 rows.""" table = self._create_ao_table('limit_ao_within_split') self._write_ao_partitions(table, [ - ('p1', list(range(5))), # 5 rows - ('p2', list(range(5, 10))), # 5 rows + ('p1', list(range(5))), # 5 rows + ('p2', list(range(5, 10))), # 5 rows ]) rb = table.new_read_builder().with_limit(3) result = rb.new_read().to_arrow(rb.new_scan().plan().splits()) @@ -209,11 +209,12 @@ def test_to_iterator_limit_short_circuits(self): # ---- SplitRead-level limit pushdown verification --------------------- - def test_append_only_split_read_creates_limited_reader(self): + def test_append_only_split_read_creates_limited_batch_reader(self): """Verify that RawFileSplitRead.create_reader() returns a - LimitedRecordReader when limit is set, proving SplitRead-level - limit pushdown is in effect.""" - from pypaimon.read.reader.limited_record_reader import LimitedRecordReader + LimitedRecordBatchReader (inherits RecordBatchReader) when limit + is set, so the arrow-batch read path is taken.""" + from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader + from pypaimon.read.reader.limited_record_reader import LimitedRecordBatchReader table = self._create_ao_table('limit_ao_split_read') self._write_ao_partitions(table, [('p1', list(range(10)))]) @@ -225,8 +226,10 @@ def test_append_only_split_read_creates_limited_reader(self): split_read = table_read._create_split_read(split) self.assertEqual(split_read.limit, 3) reader = split_read.create_reader() - self.assertIsInstance(reader, LimitedRecordReader, - "RawFileSplitRead.create_reader() should wrap with LimitedRecordReader") + self.assertIsInstance(reader, LimitedRecordBatchReader, + "RawFileSplitRead.create_reader() should wrap with LimitedRecordBatchReader") + self.assertIsInstance(reader, RecordBatchReader, + "LimitedRecordBatchReader should be a RecordBatchReader") reader.close() def test_append_only_split_read_limit_truncates_within_split(self): From cd3f4c5d12e0557548a2be4adc1279f305744816 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 3 Jun 2026 16:05:57 +0800 Subject: [PATCH 4/6] fix --- .../pypaimon/read/reader/limited_record_reader.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/limited_record_reader.py b/paimon-python/pypaimon/read/reader/limited_record_reader.py index db0de6446f5f..afcf4427699a 100644 --- a/paimon-python/pypaimon/read/reader/limited_record_reader.py +++ b/paimon-python/pypaimon/read/reader/limited_record_reader.py @@ -53,18 +53,6 @@ def read_batch(self) -> Optional[RecordIterator]: return None return _LimitedRecordIterator(batch, self) - def read_arrow_batch(self) -> Optional[RecordBatch]: - if self.count >= self._limit: - return None - batch = self._inner.read_arrow_batch() - if batch is None: - return None - remaining = self._limit - self.count - if batch.num_rows > remaining: - batch = batch.slice(0, remaining) - self.count += batch.num_rows - return batch - def close(self) -> None: self._inner.close() From 7372dbe6f8b04bdb3b983ddaa01f16f808c3b114 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 3 Jun 2026 16:25:45 +0800 Subject: [PATCH 5/6] fmt --- paimon-python/pypaimon/read/split_read.py | 2 +- paimon-python/pypaimon/tests/test_limited_record_reader.py | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 37a47e6d927e..4e9b7644cd20 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -326,7 +326,7 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, effective_first_row_id = file.first_row_id if (shard_range is not None and file.first_row_id is not None and file_format in ( - CoreOptions.FILE_FORMAT_VORTEX, CoreOptions.FILE_FORMAT_LANCE)): + CoreOptions.FILE_FORMAT_VORTEX, CoreOptions.FILE_FORMAT_LANCE)): effective_first_row_id = file.first_row_id + shard_range[0] if for_merge_read: diff --git a/paimon-python/pypaimon/tests/test_limited_record_reader.py b/paimon-python/pypaimon/tests/test_limited_record_reader.py index ea7df9947b9b..edbc1b75f433 100644 --- a/paimon-python/pypaimon/tests/test_limited_record_reader.py +++ b/paimon-python/pypaimon/tests/test_limited_record_reader.py @@ -19,9 +19,6 @@ import unittest from typing import List, Optional -import pyarrow as pa - -from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader from pypaimon.read.reader.iface.record_iterator import RecordIterator from pypaimon.read.reader.iface.record_reader import RecordReader from pypaimon.read.reader.limited_record_reader import LimitedRecordReader From 9643e5b09ddc0e460b843cda546c22823ee7f9c6 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 3 Jun 2026 19:24:17 +0800 Subject: [PATCH 6/6] blobRowAccessWithLimit --- .../read/reader/limited_record_reader.py | 3 +++ paimon-python/pypaimon/tests/blob_table_test.py | 16 ++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/paimon-python/pypaimon/read/reader/limited_record_reader.py b/paimon-python/pypaimon/read/reader/limited_record_reader.py index afcf4427699a..f78221d3f87b 100644 --- a/paimon-python/pypaimon/read/reader/limited_record_reader.py +++ b/paimon-python/pypaimon/read/reader/limited_record_reader.py @@ -88,6 +88,9 @@ def __init__(self, inner: RecordBatchReader, limit: int): self._inner = inner self._limit = limit self.count = 0 + self.file_io = inner.file_io + self.blob_field_indices = inner.blob_field_indices + self.vector_field_indices = inner.vector_field_indices def read_arrow_batch(self) -> Optional[RecordBatch]: if self.count >= self._limit: diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index ca9161bd1424..fbacdb892033 100755 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -3369,6 +3369,22 @@ def test_get_blob_access(self): self.assertEqual(results[1], (2, b'img_data_2')) self.assertEqual(results[2], (3, b'img_data_3')) + def test_get_blob_access_with_limit(self): + read_builder = self.table.new_read_builder().with_limit(2) + splits = read_builder.new_scan().plan().splits() + read = read_builder.new_read() + + results = [] + for row in read.to_iterator(splits): + blob = row.get_blob(2) + self.assertIsNotNone(blob) + results.append((row.get_field(0), blob.to_data())) + + self.assertEqual(len(results), 2) + for row_id, data in results: + self.assertIn(row_id, (1, 2, 3)) + self.assertIn(data, (b'img_data_1', b'img_data_2', b'img_data_3')) + def test_get_blob_streaming(self): read_builder = self.table.new_read_builder() splits = read_builder.new_scan().plan().splits()