From 3ea0a15cc88681d198be90787b98d232039b5f90 Mon Sep 17 00:00:00 2001 From: Paul Mathew Date: Thu, 7 May 2026 15:23:43 -0400 Subject: [PATCH] Implement rolling writer --- mkdocs/docs/api.md | 2 +- pyiceberg/io/pyarrow.py | 218 ++++++++++++++----- pyiceberg/table/__init__.py | 36 +-- tests/catalog/test_catalog_behaviors.py | 122 ++++++++++- tests/integration/test_writes/test_writes.py | 20 +- tests/io/test_pyarrow.py | 46 ---- 6 files changed, 309 insertions(+), 135 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index ca6995a727..4c246805de 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -367,7 +367,7 @@ for buf in tbl.scan().to_arrow_batch_reader(): ### Streaming writes from a `RecordBatchReader` -`tbl.append()` and `tbl.overwrite()` also accept a `pyarrow.RecordBatchReader` directly, which lets you write datasets that don't fit in memory without materialising them as a `pa.Table` first. PyIceberg consumes the reader once and microbatches it into Parquet files of approximately `write.target-file-size-bytes` (default 512 MiB), keeping memory usage bounded by the target size. All files are committed in a single snapshot. +`tbl.append()` and `tbl.overwrite()` also accept a `pyarrow.RecordBatchReader` directly, which lets you write datasets that don't fit in memory without materialising them as a `pa.Table` first. PyIceberg consumes the reader once, writing batches through a rolling Parquet writer that rolls a new file each time the on-disk file size hits `write.target-file-size-bytes` (default 512 MiB). Each input `RecordBatch` becomes a Parquet row group, capped at `write.parquet.row-group-limit` rows (default 1M) — caller batch size sets the lower bound on row group size, the property enforces the upper bound. All files are committed in a single snapshot. ```python reader = pa.RecordBatchReader.from_batches(schema, batch_iter) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index db9a035bdc..0e500c23aa 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2674,18 +2674,158 @@ def write_parquet(task: WriteTask) -> DataFile: return iter(data_files) +def _record_batches_to_data_files( + table_metadata: TableMetadata, + reader: pa.RecordBatchReader, + io: FileIO, + write_uuid: uuid.UUID | None = None, + counter: itertools.count[int] | None = None, +) -> Iterator[DataFile]: + """Stream a ``pa.RecordBatchReader`` into Parquet data files via a rolling ``pq.ParquetWriter``. + + Each input ``RecordBatch`` is written directly via + ``writer.write_batch``. File rollover is driven by ``OutputStream.tell()`` + (#2998): after each batch, if ``tell() >= write.target-file-size-bytes`` + the current writer is closed (footer written) and a new file is opened. + The threshold is measured in compressed on-disk bytes — matching the + spec interpretation of ``write.target-file-size-bytes``. + + Row groups are capped at ``write.parquet.row-group-limit`` rows (default + 1M) via the ``row_group_size`` argument to ``write_batch``: a batch + larger than the cap is split into multiple row groups, each ≤ the cap; + a batch smaller than the cap becomes a single row group of its own + size. Callers control the lower bound of row group size by their + choice of input batch size; pyiceberg enforces the upper bound. This + matches the materialised ``pa.Table`` write path's treatment of the + same property. + + Memory per writer is bounded by one input ``RecordBatch`` plus the + ``ParquetWriter``'s internal page buffer (~1 MiB by default). The + materialised ``pa.Table`` write path (``write_file``) keeps its + existing ``executor.map``-based file-level parallelism; streaming + writes are sequential — one rolling file at a time, with concurrency + provided by the underlying multipart upload pool. + + Streaming writes to partitioned tables are not yet supported — see + https://github.com/apache/iceberg-python/issues/2152. + """ + from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties + + if not table_metadata.spec().is_unpartitioned(): + raise NotImplementedError( + "Writing a pa.RecordBatchReader to a partitioned table is not yet supported. " + "Materialise the reader as a pa.Table first, or follow " + "https://github.com/apache/iceberg-python/issues/2152 for partitioned streaming support." + ) + + counter = counter or itertools.count(0) + write_uuid = write_uuid or uuid.uuid4() + target_file_size: int = property_as_int( # type: ignore # The property is set with non-None value. + properties=table_metadata.properties, + property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, + default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, + ) + name_mapping = table_metadata.schema().name_mapping + downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False + task_schema = pyarrow_to_schema( + reader.schema, + name_mapping=name_mapping, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=table_metadata.format_version, + ) + table_schema = table_metadata.schema() + if (sanitized_schema := sanitize_column_names(table_schema)) != table_schema: + file_schema = sanitized_schema + else: + file_schema = table_schema + + parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) + row_group_size = property_as_int( + properties=table_metadata.properties, + property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT, + default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT, + ) + location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties) + stats_columns = compute_statistics_plan(file_schema, table_metadata.properties) + column_mapping = parquet_path_to_id_mapping(file_schema) + + def _transform(batch: pa.RecordBatch) -> pa.RecordBatch: + return _to_requested_schema( + requested_schema=file_schema, + file_schema=task_schema, + batch=batch, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + include_field_ids=True, + ) + + def _new_data_file_path() -> str: + # Mirrors WriteTask.generate_data_file_filename to keep file names compatible + # with the materialised write path. + filename = f"00000-{next(counter)}-{write_uuid}.parquet" + return location_provider.new_data_location(data_file_name=filename) + + def _build_data_file(file_path: str, output_file: OutputFile, parquet_metadata: Any) -> DataFile: + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=parquet_metadata, + stats_columns=stats_columns, + parquet_column_mapping=column_mapping, + ) + return DataFile.from_args( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(), + file_size_in_bytes=len(output_file), + sort_order_id=None, + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + + batches = iter(reader) + while True: + # Pull the next batch up front. If the reader is exhausted (either at the + # very start or between rolled files), we're done — yield nothing further. + try: + first_batch = next(batches) + except StopIteration: + return + + transformed_first = _transform(first_batch) + file_path = _new_data_file_path() + output_file = io.new_output(file_path) + with output_file.create(overwrite=True) as fos: + with pq.ParquetWriter( + fos, + schema=transformed_first.schema, + store_decimal_as_integer=True, + **parquet_writer_kwargs, + ) as writer: + writer.write_batch(transformed_first, row_group_size=row_group_size) + # Keep writing into this file until the on-disk byte threshold + # is crossed. ``tell()`` advances as ``write_batch`` flushes + # encoded pages to the stream — files end up close to but + # slightly above ``target_file_size`` (lag bounded by one + # Parquet data page, ~1 MiB by default). + while fos.tell() < target_file_size: + try: + batch = next(batches) + except StopIteration: + break + writer.write_batch(_transform(batch), row_group_size=row_group_size) + # writer is closed (footer written) and the OutputStream is flushed. + # writer.writer.metadata is still readable post-close — same access + # pattern used by write_file(). + yield _build_data_file(file_path, output_file, writer.writer.metadata) + + def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[pa.RecordBatch]]: - """Bin-pack ``tbl`` into groups of RecordBatches, each ~``target_file_size``. - - Note: - ``target_file_size`` is measured in **uncompressed in-memory** Arrow bytes - (``Table.nbytes`` / ``RecordBatch.nbytes``), not compressed on-disk Parquet - bytes. The resulting Parquet file after compression (zstd by default, - plus dictionary/RLE encoding) is typically 3-10× smaller than - ``target_file_size``. This is a coarse proxy for the spec-defined - ``write.target-file-size-bytes`` and will be tightened to true on-disk - bytes once the writer is switched to a rolling-``ParquetWriter`` with - ``OutputStream.tell()`` (#2998). + """Bin-pack ``tbl`` into groups of RecordBatches, each ~``target_file_size`` uncompressed Arrow bytes. + + Used by the materialised ``pa.Table`` write path (``_dataframe_to_data_files`` + + ``write_file``) to split a fully in-memory table into multiple Parquet + files written in parallel. """ from pyiceberg.utils.bin_packing import PackingIterator @@ -2702,41 +2842,6 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[ return bin_packed_record_batches -def bin_pack_record_batches(batches: Iterable[pa.RecordBatch], target_file_size: int) -> Iterator[list[pa.RecordBatch]]: - """Microbatch a single-pass stream of RecordBatches into target-sized groups. - - Unlike :func:`bin_pack_arrow_table`, this consumes ``batches`` lazily and - holds at most one in-flight buffer in memory, bounded by ``target_file_size``. - Suitable for streaming inputs (``pa.RecordBatchReader``, - ``Iterator[pa.RecordBatch]``) where the total size is unknown up front and - the caller cannot afford to materialise the full dataset. - - Each yielded list of batches is intended to be written as a single Parquet - data file. Because this is single-pass FIFO accumulation (no lookback), the - last bin may be smaller than ``target_file_size``. - - Note: - ``target_file_size`` is measured in **uncompressed in-memory** Arrow - bytes (``RecordBatch.nbytes``), not compressed on-disk Parquet bytes. - The resulting Parquet file after compression is typically 3-10× - smaller than ``target_file_size``. Matches the existing - :func:`bin_pack_arrow_table` semantics; both will be tightened to true - on-disk bytes once the writer is switched to a rolling- - ``ParquetWriter`` with ``OutputStream.tell()`` (#2998). - """ - buffer: list[pa.RecordBatch] = [] - buffer_bytes = 0 - for batch in batches: - buffer.append(batch) - buffer_bytes += batch.nbytes - if buffer_bytes >= target_file_size: - yield buffer - buffer = [] - buffer_bytes = 0 - if buffer: - yield buffer - - def _check_pyarrow_schema_compatible( requested_schema: Schema, provided_schema: pa.Schema, @@ -2894,19 +2999,16 @@ def _dataframe_to_data_files( ) if isinstance(df, pa.RecordBatchReader): - if not table_metadata.spec().is_unpartitioned(): - raise NotImplementedError( - "Writing a pa.RecordBatchReader to a partitioned table is not yet supported. " - "Materialise the reader as a pa.Table first, or follow " - "https://github.com/apache/iceberg-python/issues/2152 for partitioned streaming support." - ) - yield from write_file( - io=io, + # Streaming path: rolling ParquetWriter driven by OutputStream.tell() + # for constant-memory writes and on-disk-accurate file sizes. + # Partitioned-table support is the responsibility of + # _record_batches_to_data_files; the NotImplementedError is raised there. + yield from _record_batches_to_data_files( table_metadata=table_metadata, - tasks=( - WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema) - for batches in bin_pack_record_batches(df, target_file_size) - ), + reader=df, + io=io, + write_uuid=write_uuid, + counter=counter, ) return diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 64ad10050d..2a4dbf7ca6 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -460,8 +460,9 @@ def append( Shorthand API for appending PyArrow data to a table transaction. Accepts either a fully materialised ``pa.Table`` or a streaming - ``pa.RecordBatchReader``. Streaming is microbatched by - ``write.target-file-size-bytes`` so memory stays bounded; the reader is + ``pa.RecordBatchReader``. For a reader, batches are written through a + rolling ``pq.ParquetWriter`` and a new file is rolled each time the + on-disk file size hits ``write.target-file-size-bytes``. The reader is consumed once and cannot be reused. Streaming writes are currently only supported on unpartitioned tables; @@ -487,13 +488,12 @@ def append( in storage that are not referenced by any snapshot. Clean these up with expire/orphan-file maintenance jobs. - ``write.target-file-size-bytes`` is currently interpreted as - uncompressed in-memory Arrow bytes (the bin-packing weight) rather - than compressed on-disk Parquet bytes. The resulting files are - typically 3-10× smaller than the property suggests after - compression. This matches the existing ``pa.Table`` write path and - will be tightened once the writer is switched to a - rolling-``ParquetWriter`` with ``OutputStream.tell()`` (#2998). + For streaming inputs (``pa.RecordBatchReader``) each input + ``RecordBatch`` becomes one Parquet row group. The + ``write.parquet.row-group-limit`` property (rows, default 1M) + caps row group size — batches larger than the cap are split, + smaller batches are not combined. Caller batch size sets the + lower bound; pyiceberg enforces the upper bound. Args: df: An Arrow Table or a RecordBatchReader of records to append. @@ -608,8 +608,9 @@ def overwrite( Shorthand for adding a table overwrite with a PyArrow table or RecordBatchReader to the transaction. Accepts either a fully materialised ``pa.Table`` or a streaming - ``pa.RecordBatchReader``. Streaming is microbatched by - ``write.target-file-size-bytes`` so memory stays bounded; the reader is + ``pa.RecordBatchReader``. For a reader, batches are written through a + rolling ``pq.ParquetWriter`` and a new file is rolled each time the + on-disk file size hits ``write.target-file-size-bytes``. The reader is consumed once and cannot be reused. Streaming writes are currently only supported on unpartitioned tables; @@ -635,13 +636,12 @@ def overwrite( in storage that are not referenced by any snapshot. Clean these up with expire/orphan-file maintenance jobs. - ``write.target-file-size-bytes`` is currently interpreted as - uncompressed in-memory Arrow bytes (the bin-packing weight) rather - than compressed on-disk Parquet bytes. The resulting files are - typically 3-10× smaller than the property suggests after - compression. This matches the existing ``pa.Table`` write path and - will be tightened once the writer is switched to a - rolling-``ParquetWriter`` with ``OutputStream.tell()`` (#2998). + For streaming inputs (``pa.RecordBatchReader``) each input + ``RecordBatch`` becomes one Parquet row group. The + ``write.parquet.row-group-limit`` property (rows, default 1M) + caps row group size — batches larger than the cap are split, + smaller batches are not combined. Caller batch size sets the + lower bound; pyiceberg enforces the upper bound. An overwrite may produce zero or more snapshots based on the operation: diff --git a/tests/catalog/test_catalog_behaviors.py b/tests/catalog/test_catalog_behaviors.py index b859e2d541..57594f2ef0 100644 --- a/tests/catalog/test_catalog_behaviors.py +++ b/tests/catalog/test_catalog_behaviors.py @@ -1215,8 +1215,10 @@ def test_drop_namespace_raises_error_when_namespace_not_empty( # RecordBatchReader streaming append/overwrite tests # -# Streaming writes accept a pa.RecordBatchReader and microbatch it into target-sized -# Parquet files instead of materialising the full Arrow Table in memory. Tracks +# Streaming writes accept a pa.RecordBatchReader and write it through a rolling +# Parquet writer (row groups flushed at write.parquet.row-group-limit, files +# rolled at write.target-file-size-bytes via OutputStream.tell()) instead of +# materialising the full Arrow Table in memory. Tracks # https://github.com/apache/iceberg-python/issues/2152. @@ -1248,11 +1250,15 @@ def test_append_record_batch_reader(catalog: Catalog) -> None: def test_append_record_batch_reader_microbatched(catalog: Catalog) -> None: """A reader bigger than the per-file target produces multiple Parquet files - in a single snapshot — verifying the byte-budget microbatching path.""" + in a single snapshot — verifies file rollover via ``OutputStream.tell()``. + + Sets a tiny ``target-file-size-bytes`` so each batch's flush rolls a new + file. Each input ``RecordBatch`` is its own row group, so ``tell()`` + advances after every ``write_batch``. + """ catalog.create_namespace("default") identifier = f"default.append_record_batch_reader_microbatch_{catalog.name}" reader, total_rows = _simple_record_batch_reader(num_batches=8) - # Force every batch to roll a new file by setting an absurdly small target size. tbl = catalog.create_table( identifier=identifier, schema=reader.schema, @@ -1269,6 +1275,114 @@ def test_append_record_batch_reader_microbatched(catalog: Catalog) -> None: assert len(tbl.scan().to_arrow()) == total_rows +def test_append_record_batch_reader_row_group_limit_is_cap(catalog: Catalog) -> None: + """``write.parquet.row-group-limit`` caps the maximum rows per Parquet + row group. A single input batch larger than the cap is split into + multiple row groups, each ≤ the cap. The streaming path enforces the + upper bound; callers control the lower bound by their choice of input + batch size. + """ + import pyarrow.parquet as pq + + catalog.create_namespace("default") + identifier = f"default.append_record_batch_reader_row_group_limit_cap_{catalog.name}" + + row_group_cap = 250 + total_rows = 1000 # 4× the cap + schema = pa.schema([("id", pa.int64())]) + # One big batch — pyiceberg should split it into ⌈1000 / 250⌉ = 4 row groups + # of exactly 250 rows each. + big_batch = pa.RecordBatch.from_pylist( + [{"id": i} for i in range(total_rows)], + schema=schema, + ) + reader = pa.RecordBatchReader.from_batches(schema, iter([big_batch])) + + tbl = catalog.create_table( + identifier=identifier, + schema=schema, + properties={ + TableProperties.PARQUET_ROW_GROUP_LIMIT: str(row_group_cap), + # Big enough that everything fits in one file; we're testing row + # group size, not file rollover. + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: str(64 * 1024 * 1024), + }, + ) + tbl.append(reader) + + assert len(tbl.scan().to_arrow()) == total_rows + + files = tbl.inspect.files().select(["file_path"]).to_pylist() + assert len(files) == 1, files + + # Read the parquet footer and check row group sizes + file_path = files[0]["file_path"] + metadata = pq.read_metadata(tbl.io.new_input(file_path).open()) + row_group_sizes = [metadata.row_group(i).num_rows for i in range(metadata.num_row_groups)] + + # Expect 4 row groups of exactly row_group_cap rows each. Without the cap + # passed to write_batch, the whole 1000-row batch would become one row + # group — the test would fail loudly. + assert metadata.num_row_groups == total_rows // row_group_cap, row_group_sizes + for rg_size in row_group_sizes: + assert rg_size == row_group_cap, row_group_sizes + + +def test_append_record_batch_reader_target_file_size_is_on_disk_bytes(catalog: Catalog) -> None: + """The streaming write path interprets ``write.target-file-size-bytes`` as + actual on-disk compressed Parquet bytes (via ``OutputStream.tell()``), not + uncompressed in-memory Arrow bytes. This test sets a small file target, + streams several batches, and asserts each rolled file is close to the + target size — proving the spec-correct semantics. + """ + catalog.create_namespace("default") + identifier = f"default.append_record_batch_reader_target_size_{catalog.name}" + + target_bytes = 32 * 1024 # 32 KiB target — small so we get multiple files quickly + schema = pa.schema([("id", pa.int64()), ("payload", pa.large_string())]) + # ~80 bytes per row uncompressed; with zstd ~10x compression we expect + # approximately 4000 rows per ~32 KiB file. + rows_per_batch = 1000 + total_batches = 12 + batches = [ + pa.RecordBatch.from_pylist( + [{"id": i * rows_per_batch + j, "payload": f"row_{i * rows_per_batch + j:08d}"} for j in range(rows_per_batch)], + schema=schema, + ) + for i in range(total_batches) + ] + reader = pa.RecordBatchReader.from_batches(schema, iter(batches)) + expected_rows = total_batches * rows_per_batch + + tbl = catalog.create_table( + identifier=identifier, + schema=schema, + properties={TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: str(target_bytes)}, + ) + tbl.append(reader) + + # All rows landed + assert len(tbl.scan().to_arrow()) == expected_rows + + # Multiple files were rolled + snapshot = tbl.metadata.current_snapshot() + assert snapshot is not None and snapshot.summary is not None + added_files = int(snapshot.summary["added-data-files"]) # type: ignore[arg-type] + assert added_files >= 2, snapshot.summary + + # Per-file size: every rolled file (i.e. all but possibly the last) should be + # *close to* target_bytes. The lag between tell() and write_batch is bounded + # by one Parquet data page (~1 MiB by default), so files end up slightly + # above target. We assert each rolled file is between 0.5x and 5x the + # target — a loose bound that catches the old uncompressed-Arrow-bytes + # behaviour (where files would be ~3-10x SMALLER than target). + files = tbl.inspect.files().select(["file_path", "file_size_in_bytes"]).to_pylist() + rolled_files = files[:-1] if len(files) > 1 else files + for f in rolled_files: + size = f["file_size_in_bytes"] + assert target_bytes // 2 <= size <= target_bytes * 5, f"{f['file_path']}: {size} bytes (target {target_bytes})" + + def test_append_record_batch_reader_empty(catalog: Catalog) -> None: catalog.create_namespace("default") identifier = f"default.append_record_batch_reader_empty_{catalog.name}" diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 609c1863bc..a2b61995d1 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -2575,10 +2575,11 @@ def test_v3_write_and_read_row_lineage(spark: SparkSession, session_catalog: Cat # RecordBatchReader streaming append/overwrite — see https://github.com/apache/iceberg-python/issues/2152 # -# These integration tests prove Spark can read tables written via the new streaming -# path. Equivalent in-process scan coverage lives in tests/catalog/test_catalog_behaviors.py -# but only Spark exercises the manifest stats + Parquet metadata produced by the -# write_file → fast_append pipeline against an external reader. +# These integration tests prove Spark can read tables written via the new +# streaming path (rolling pq.ParquetWriter + fast_append commit). Equivalent +# in-process scan coverage lives in tests/catalog/test_catalog_behaviors.py; +# only Spark exercises the resulting manifest stats and Parquet metadata +# against an external reader. @pytest.mark.integration @@ -2631,14 +2632,17 @@ def test_append_record_batch_reader_multifile( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: """Forcing a tiny target file size should produce >1 data file in a single - snapshot, proving the byte-budget rollover in bin_pack_record_batches fires - end-to-end and the resulting files are valid Iceberg data files (Spark reads - them all).""" + snapshot, proving the rolling ParquetWriter's tell()-based rollover fires + end-to-end and the resulting files are valid Iceberg data files (Spark + reads them all).""" identifier = f"default.streaming_append_multifile_v{format_version}" tbl = _create_table( session_catalog, identifier, - {"format-version": str(format_version), TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: "1"}, + { + "format-version": str(format_version), + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: "1", + }, ) batches = arrow_table_with_null.to_batches(max_chunksize=1) * 4 diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index a05b295fc1..2170741bdd 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -20,7 +20,6 @@ import tempfile import uuid import warnings -from collections.abc import Iterator from datetime import date, datetime, timezone from pathlib import Path from typing import Any @@ -77,7 +76,6 @@ _task_to_record_batches, _to_requested_schema, bin_pack_arrow_table, - bin_pack_record_batches, compute_statistics_plan, data_file_statistics_from_parquet_metadata, expression_to_pyarrow, @@ -2366,50 +2364,6 @@ def test_bin_pack_arrow_table_target_size_smaller_than_row(arrow_table_with_null assert sum(batch.num_rows for bin_ in bin_packed for batch in bin_) == arrow_table_with_null.num_rows -def test_bin_pack_record_batches_single_bin(arrow_table_with_null: pa.Table) -> None: - batches = arrow_table_with_null.to_batches() - bins = list(bin_pack_record_batches(iter(batches), target_file_size=arrow_table_with_null.nbytes * 10)) - # everything fits in one bin - assert len(bins) == 1 - assert sum(b.num_rows for b in bins[0]) == arrow_table_with_null.num_rows - - -def test_bin_pack_record_batches_microbatched(arrow_table_with_null: pa.Table) -> None: - # repeat the per-row batches so we have many small inputs to pack - batches = list(arrow_table_with_null.to_batches(max_chunksize=1)) * 5 - bin_size = arrow_table_with_null.nbytes // 2 # forces multiple bins - bins = list(bin_pack_record_batches(iter(batches), target_file_size=bin_size)) - assert len(bins) > 1 - assert sum(b.num_rows for bin_ in bins for b in bin_) == arrow_table_with_null.num_rows * 5 - # All but the last bin should have crossed the size threshold. - for bin_ in bins[:-1]: - assert sum(b.nbytes for b in bin_) >= bin_size - - -def test_bin_pack_record_batches_empty() -> None: - assert list(bin_pack_record_batches(iter([]), target_file_size=1024)) == [] - - -def test_bin_pack_record_batches_is_lazy(arrow_table_with_null: pa.Table) -> None: - # Streams are single-pass: confirm the helper consumes its input batch-by-batch - # rather than materialising the whole iterator before yielding the first bin. - consumed: list[int] = [] - - def tracking_iter() -> Iterator[pa.RecordBatch]: - for i, batch in enumerate(arrow_table_with_null.to_batches(max_chunksize=1)): - consumed.append(i) - yield batch - - target = max(1, arrow_table_with_null.nbytes // 4) - bins_iter = bin_pack_record_batches(tracking_iter(), target_file_size=target) - first_bin = next(bins_iter) - assert len(first_bin) >= 1 - # Generator should not have walked the entire input upon yielding the first bin - assert len(consumed) < arrow_table_with_null.num_rows - list(bins_iter) - assert len(consumed) == arrow_table_with_null.num_rows - - def test_schema_mismatch_type(table_schema_simple: Schema) -> None: other_schema = pa.schema( (