Skip to content

feat(lance): Implement columnar batch reading for Lance (COW only)#18403

Open
wombatu-kun wants to merge 2 commits intoapache:masterfrom
wombatu-kun:lance-read-columnar-batch-2
Open

feat(lance): Implement columnar batch reading for Lance (COW only)#18403
wombatu-kun wants to merge 2 commits intoapache:masterfrom
wombatu-kun:lance-read-columnar-batch-2

Conversation

@wombatu-kun
Copy link
Copy Markdown
Contributor

@wombatu-kun wombatu-kun commented Mar 27, 2026

Describe the issue this Pull Request addresses

Closes #17736

Summary and Changelog

Spark queries on COPY_ON_WRITE Lance tables now use columnar batch (vectorized) reading. Instead of decomposing Arrow batches into individual rows, Spark receives entire ColumnarBatch objects backed by zero-copy LanceArrowColumnVector wrappers. This eliminates per-row materialization overhead on the read path for COW base-file-only scans.

MOR tables, incremental queries are unaffected — they continue to use the row-based path, which is an existing Hudi constraint for all file formats (Parquet, ORC, Lance).

Limitations

  1. COW only. Vectorized batch reading is enabled only for COPY_ON_WRITE tables in the base-file-only read path. MOR tables always use the row-based path — this is an existing Hudi-wide constraint (supportReturningBatch = !isMOR), not Lance-specific. Even MOR file groups with no log files go through HoodieFileGroupReader, which merges at the row level.
  2. Type-change fallback. When the file schema and the query schema have implicit type differences (e.g., INT→LONG, FLOAT→DOUBLE), the reader falls back to the row-based path for that file. Batch-level type casting is deferred to a follow-up.
  3. No filter pushdown in batch mode. Lance filter pushdown is not yet implemented; the filters parameter is passed as null to lanceReader.readAll(). Spark applies filters on top of the returned batches. This is unchanged from the row-based path.
  4. Multi-format tables. When isMultipleBaseFileFormatsEnabled is true (table has mixed Parquet/ORC/Lance base files), Lance batch reading is disabled to avoid vector type conflicts between formats. The vectorTypes() method returns the Parquet/ORC vector types in that case.
  5. Incremental and bootstrap queries. These disable vectorized reading for all formats (supportVectorizedRead = !isIncremental && !isBootstrap && supportBatch).
  6. Fixed batch size. The Lance batch size is hardcoded at 512 rows (DEFAULT_BATCH_SIZE). It is not configurable via Spark session settings.

Detailed changelog
LanceBatchIterator — NEW (hudi-client/hudi-spark-client)

  • Iterator<ColumnarBatch> + Closeable that reads Arrow batches from a Lance file and wraps each batch's field vectors in LanceArrowColumnVector[] to produce a ColumnarBatch.
  • Column vector wrappers are created once and reused across batches (Arrow's ArrowReader reuses the same VectorSchemaRoot).
  • Owns and manages the lifecycle of BufferAllocator, LanceFileReader, and ArrowReader; closes them in order on close().
  • Follows the same lifecycle pattern as the existing LanceRecordIterator.

SparkLanceReaderBase — MODIFIED (hudi-spark-datasource/hudi-spark-common)

  • The single read() method now branches into readBatch() (columnar) and readRows() (row-based) based on enableVectorizedReader and whether implicit type changes exist.
  • readBatch():
    • Creates a LanceBatchIterator for zero-copy batch iteration.
    • Computes a column mapping (requiredSchema → requestSchema) to reorder columns and identify columns missing from the file.
    • Schema evolution (column addition): missing columns are backed by all-null Arrow FieldVector instances (allocated via LanceArrowUtils.toArrowField → Field.createVector), wrapped in LanceArrowColumnVector. This satisfies Spark's vectorTypes() contract which expects all data columns to be LanceArrowColumnVector. A dedicated child allocator (nullAllocator) manages these vectors and is closed before the data allocator.
    • Partition columns: pre-created OnHeapColumnVector arrays are filled with constant partition values via populatePartitionVectors(), which handles all Spark primitive types, strings, decimals, and binary. Vectors are reused across batches; re-populated only when batch size changes.
    • A mappedIterator (implementing Iterator[ColumnarBatch] with Closeable) assembles the final batch per iteration and is registered with TaskContext for cleanup.
    • Type-change fallback: when implicitTypeChangeInfo is non-empty (e.g., file has FLOAT, query requires DOUBLE), falls back to readRows() which applies cast projections at the row level. Batch-level type casting is deferred to a follow-up.
  • readRows(): the original row-based logic, extracted into its own method. Behavior unchanged.
  • populatePartitionVectors(): new private helper, supports Boolean, Byte, Short, Int/Date, Long/Timestamp, Float, Double, String, Decimal (int/long/big), Binary. Unsupported types fall back to nulls.

HoodieFileGroupReaderBasedFileFormat — MODIFIED (hudi-spark-datasource/hudi-spark-common)

  • supportBatch(): changed val lanceBatchSupported = false to val lanceBatchSupported = true. The existing guards supportVectorizedRead = !isIncremental && !isBootstrap && supportBatch and supportReturningBatch = !isMOR && supportVectorizedRead remain unchanged and apply to all formats.
  • vectorTypes(): added a branch for LANCE format (when not in multi-format mode) returning LanceArrowColumnVector class name for all data columns and OnHeapColumnVector for partition columns. The existing Parquet/ORC logic is wrapped in the else branch, unchanged.
  • Two fallback readBaseFile call sites (case _ branches): introduced baseFileOnlyReader to select the correct reader for direct base-file reads (no log merging). Parquet's vectorized reader can return InternalRow when returningBatch=false (MOR), so it continues to use baseFileReader for performance. Lance's vectorized reader always returns ColumnarBatch, which causes a ClassCastException on MOR tables (where supportReturningBatch=false and Spark expects InternalRow), so it falls back to fileGroupBaseFileReader (non-vectorized). For COW tables, baseFileOnlyReader == baseFileReader since fileGroupBaseFileReader is not created separately.

TestLanceColumnarBatch — NEW (hudi-spark-datasource/hudi-spark/src/test)

  • 5 unit tests invoking SparkLanceReaderBase directly:
    • testRowPathReturnsInternalRows — verifies enableVectorizedReader=false returns InternalRow, never ColumnarBatch
    • testColumnarPathReturnsBatches — verifies enableVectorizedReader=true returns ColumnarBatch with correct data
    • testColumnarPathNullPadsAbsentColumns — schema evolution: missing column is null-padded in batch mode
    • testColumnarPathAppendsPartitionVectors — partition values appended as constant columns to each batch
    • testTypeChangeFallsBackToRowPath — implicit type change (FLOAT→DOUBLE) forces row path with correct cast values
  • 3 integration tests via Spark DataFrame/SQL API:
    • testCOWTableDataFrameRead — COW round-trip with vectorized reads active
    • testCOWTableSchemaEvolutionNullPadding — two bulk_inserts with schema widening; old files null-padded
    • testCOWTableSparkSqlQuery — SELECT ... WHERE predicate evaluation on columnar batches

Impact

Public API / user-facing changes

  • No new configuration options. Vectorized reading activates automatically for Lance COW tables. There is no feature flag to toggle it.
  • No API changes. All changes are internal to the Spark datasource read path. The SparkLanceReaderBase constructor already accepted enableVectorizedReader: Boolean; the new batch path is selected when this is true.
  • Behavioral change: queries on Lance COW tables that previously returned InternalRow one-by-one now return ColumnarBatch. From the user's perspective, query results are identical; only the internal execution changes.

Performance impact

  • Read throughput improvement for COW Lance tables. Eliminates per-row UnsafeProjection + .copy() overhead. Arrow vectors are wrapped zero-copy in LanceArrowColumnVector and passed directly to Spark's columnar execution engine.
  • No regression for MOR tables. MOR continues to use the row-based path through fileGroupBaseFileReader, which is explicitly non-vectorized.
  • Minimal memory overhead for schema evolution. Null-padding columns allocate a lightweight Arrow FieldVector (validity buffer only, no data buffer) via a dedicated child allocator. The allocator is released when the iterator is closed.

Risk Level

low

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@wombatu-kun wombatu-kun requested a review from rahil-c March 27, 2026 09:37
@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Mar 27, 2026
@wombatu-kun wombatu-kun marked this pull request as draft March 27, 2026 12:44
@wombatu-kun wombatu-kun force-pushed the lance-read-columnar-batch-2 branch from 341ca2e to 859b4c9 Compare March 27, 2026 15:46
@wombatu-kun wombatu-kun marked this pull request as ready for review March 27, 2026 15:46
@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 66.81034% with 77 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.18%. Comparing base (bb5abb6) to head (859b4c9).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
...ution/datasources/lance/SparkLanceReaderBase.scala 64.51% 40 Missing and 15 partials ⚠️
...org/apache/hudi/io/storage/LanceBatchIterator.java 66.07% 10 Missing and 9 partials ⚠️
...parquet/HoodieFileGroupReaderBasedFileFormat.scala 85.71% 0 Missing and 3 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18403      +/-   ##
============================================
- Coverage     68.18%   68.18%   -0.01%     
- Complexity    27670    27709      +39     
============================================
  Files          2439     2440       +1     
  Lines        134099   134285     +186     
  Branches      16158    16206      +48     
============================================
+ Hits          91437    91559     +122     
- Misses        35565    35601      +36     
- Partials       7097     7125      +28     
Flag Coverage Δ
common-and-other-modules 44.19% <2.16%> (-0.07%) ⬇️
hadoop-mr-java-client 45.08% <ø> (+<0.01%) ⬆️
spark-client-hadoop-common 48.37% <0.00%> (-0.04%) ⬇️
spark-java-tests 48.77% <66.81%> (-0.01%) ⬇️
spark-scala-tests 45.22% <6.03%> (-0.08%) ⬇️
utilities 38.36% <5.19%> (-0.07%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...parquet/HoodieFileGroupReaderBasedFileFormat.scala 85.34% <85.71%> (-0.44%) ⬇️
...org/apache/hudi/io/storage/LanceBatchIterator.java 66.07% <66.07%> (ø)
...ution/datasources/lance/SparkLanceReaderBase.scala 66.47% <64.51%> (-19.53%) ⬇️

... and 7 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@rahil-c rahil-c requested a review from voonhous March 31, 2026 04:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XL PR with lines of changes > 1000

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement columnar batch reading for Lance

3 participants