feat(lance): Implement columnar batch reading for Lance (COW only)#18403
Open
wombatu-kun wants to merge 2 commits intoapache:masterfrom
Open
feat(lance): Implement columnar batch reading for Lance (COW only)#18403wombatu-kun wants to merge 2 commits intoapache:masterfrom
wombatu-kun wants to merge 2 commits intoapache:masterfrom
Conversation
341ca2e to
859b4c9
Compare
Collaborator
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18403 +/- ##
============================================
- Coverage 68.18% 68.18% -0.01%
- Complexity 27670 27709 +39
============================================
Files 2439 2440 +1
Lines 134099 134285 +186
Branches 16158 16206 +48
============================================
+ Hits 91437 91559 +122
- Misses 35565 35601 +36
- Partials 7097 7125 +28
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Describe the issue this Pull Request addresses
Closes #17736
Summary and Changelog
Spark queries on COPY_ON_WRITE Lance tables now use columnar batch (vectorized) reading. Instead of decomposing Arrow batches into individual rows, Spark receives entire ColumnarBatch objects backed by zero-copy LanceArrowColumnVector wrappers. This eliminates per-row materialization overhead on the read path for COW base-file-only scans.
MOR tables, incremental queries are unaffected — they continue to use the row-based path, which is an existing Hudi constraint for all file formats (Parquet, ORC, Lance).
Limitations
supportVectorizedRead = !isIncremental && !isBootstrap && supportBatch).Detailed changelog
LanceBatchIterator— NEW (hudi-client/hudi-spark-client)Iterator<ColumnarBatch> + Closeablethat reads Arrow batches from a Lance file and wraps each batch's field vectors inLanceArrowColumnVector[]to produce aColumnarBatch.VectorSchemaRoot).BufferAllocator, LanceFileReader, and ArrowReader; closes them in order on close().LanceRecordIterator.SparkLanceReaderBase— MODIFIED (hudi-spark-datasource/hudi-spark-common)read()method now branches intoreadBatch()(columnar) andreadRows()(row-based) based onenableVectorizedReaderand whether implicit type changes exist.readBatch():LanceBatchIteratorfor zero-copy batch iteration.LanceArrowColumnVector. This satisfies Spark's vectorTypes() contract which expects all data columns to beLanceArrowColumnVector. A dedicated child allocator (nullAllocator) manages these vectors and is closed before the data allocator.OnHeapColumnVectorarrays are filled with constant partition values viapopulatePartitionVectors(), which handles all Spark primitive types, strings, decimals, and binary. Vectors are reused across batches; re-populated only when batch size changes.implicitTypeChangeInfois non-empty (e.g., file has FLOAT, query requires DOUBLE), falls back toreadRows()which applies cast projections at the row level. Batch-level type casting is deferred to a follow-up.readRows(): the original row-based logic, extracted into its own method. Behavior unchanged.populatePartitionVectors(): new private helper, supports Boolean, Byte, Short, Int/Date, Long/Timestamp, Float, Double, String, Decimal (int/long/big), Binary. Unsupported types fall back to nulls.HoodieFileGroupReaderBasedFileFormat— MODIFIED (hudi-spark-datasource/hudi-spark-common)supportBatch(): changedval lanceBatchSupported = falsetoval lanceBatchSupported = true. The existing guardssupportVectorizedRead = !isIncremental && !isBootstrap && supportBatch and supportReturningBatch = !isMOR && supportVectorizedReadremain unchanged and apply to all formats.vectorTypes(): added a branch for LANCE format (when not in multi-format mode) returningLanceArrowColumnVectorclass name for all data columns andOnHeapColumnVectorfor partition columns. The existing Parquet/ORC logic is wrapped in the else branch, unchanged.readBaseFilecall sites (case _ branches): introducedbaseFileOnlyReaderto select the correct reader for direct base-file reads (no log merging). Parquet's vectorized reader can returnInternalRowwhenreturningBatch=false(MOR), so it continues to usebaseFileReaderfor performance. Lance's vectorized reader always returnsColumnarBatch, which causes aClassCastExceptionon MOR tables (wheresupportReturningBatch=falseand Spark expectsInternalRow), so it falls back tofileGroupBaseFileReader(non-vectorized). For COW tables,baseFileOnlyReader == baseFileReadersincefileGroupBaseFileReaderis not created separately.TestLanceColumnarBatch— NEW (hudi-spark-datasource/hudi-spark/src/test)SparkLanceReaderBasedirectly:testRowPathReturnsInternalRows— verifies enableVectorizedReader=false returns InternalRow, never ColumnarBatchtestColumnarPathReturnsBatches— verifies enableVectorizedReader=true returns ColumnarBatch with correct datatestColumnarPathNullPadsAbsentColumns— schema evolution: missing column is null-padded in batch modetestColumnarPathAppendsPartitionVectors— partition values appended as constant columns to each batchtestTypeChangeFallsBackToRowPath— implicit type change (FLOAT→DOUBLE) forces row path with correct cast valuesSpark DataFrame/SQL API:testCOWTableDataFrameRead— COW round-trip with vectorized reads activetestCOWTableSchemaEvolutionNullPadding— two bulk_inserts with schema widening; old files null-paddedtestCOWTableSparkSqlQuery— SELECT ... WHERE predicate evaluation on columnar batchesImpact
Public API / user-facing changes
Performance impact
Risk Level
low
Documentation Update
none
Contributor's checklist