feat: Added IcebergArrowInputSourceReader #19510
Conversation
|
Benchmark : |
|
Looking into benchmark module which will be used throughout the arrow and datafusion integration to quickly check the improvements. |
6297e2f to
9da5049
Compare
| private final boolean useArrowReader; | ||
|
|
||
| @JsonProperty | ||
| private final int arrowBatchSize; |
There was a problem hiding this comment.
With a batch of 1024 values in a contiguous buffer -> emit SIMD instructions that process 8-16 rows per CPU cycle.
There was a problem hiding this comment.
Decompression amortization -> batch-read helps in decompress once and consume many row, since parquet compressed pages .
|
ingestion time at 100k × 5 ReadArrow implementation : 17ms Index + Persist (= total − read)~330ms total timeArrow implementation: 347 ms Index+persist does substantially more work per row than read. So even though Arrow makes read ~3x faster, that gain is dwarfed when amortized over the much-larger indexing cost. Benchmark added https://github.com/Shekharrajak/druid/pull/1/changes |
|
looks like network error https://github.com/apache/druid/actions/runs/26333466916/job/77523245546?pr=19510 - please trigger the CI check again. |
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 2 |
| P2 | 1 |
| P3 | 0 |
| Total | 3 |
Reviewed 7 of 7 changed files.
This is an automated review by Codex GPT-5.5
…st constructor arity
…t line breaking, DateTimes.utc, Maps.newHashMapWithExpectedSize)
…Allocation init failure
8808ec4 to
f151f5b
Compare
| public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException | ||
| { | ||
| if (useArrowReader) { | ||
| return 1; |
There was a problem hiding this comment.
Future PR: Enable parallel ingestion (maxNumConcurrentSubTasks > 1) when useArrowReader: true by adding split-coordination plumbing to IcebergInputSource. Currently Arrow mode forces a single subtask to guarantee correctness;
FrankChen021
left a comment
There was a problem hiding this comment.
I reviewed the follow-up changes. The projection issue and Arrow split handling thread look resolved; I left a separate inline reply on the remaining residual-filter snapshot-time parity gap.
Reviewed 7 of 7 changed files.
This is an automated review by Codex GPT-5.5
…lready opens java.nio
…end failure on JDK 25
…wnstream resolvers
|
@FrankChen021 @jtuglu1 - please have a look. CI checks are green. |
| scan.splitLookback(), | ||
| scan.splitOpenFileCost() | ||
| ); | ||
| final ArrowReader arrowReader = new ArrowReader(scan, batchSize, true); |
There was a problem hiding this comment.
Reads Iceberg data files as Arrow columnar batches, then iterates rows lazily as InputRow objects.
| { | ||
| // Pin Arrow to Unsafe allocator: Netty backend fails on JDK 25 (EmptyByteBuf.memoryAddress UnsupportedOperationException). | ||
| static { | ||
| if (System.getProperty("arrow.allocation.manager.type") == null) { |
There was a problem hiding this comment.
NettyAllocationManager. throws UnsupportedOperationException on JDK 25 (incompatible with newer JDK module encapsulation). Pinning to "Unsafe" uses sun.misc.Unsafe which works on all supported JDKs (21, 25).
CI checks were failing .
| * Column projection and predicate push-down are applied at scan planning time so only requested | ||
| * columns and matching files are read from storage. | ||
| * | ||
| * Note: iceberg-arrow currently supports Parquet data files only. ORC and Avro files will throw |
There was a problem hiding this comment.
Supports only parquet data file
| private final ResidualFilterMode residualFilterMode; | ||
|
|
||
| @JsonProperty | ||
| private final boolean useArrowReader; |
There was a problem hiding this comment.
Adds useArrowReader opt-in flag
FrankChen021
left a comment
There was a problem hiding this comment.
I reviewed the follow-up changes for correctness, edge cases, concurrency, and integration risks; no new issues found.
Reviewed 7 of 7 changed files.
This is an automated review by Codex GPT-5.5
|
Hi @jtuglu1 @clintropolis @FrankChen021 - Please have a look, I have benchmark PR drafted which we will use across all features to see the improvements we are getting through vectorization & arrow implementations. |
cecemei
left a comment
There was a problem hiding this comment.
The benchmark results look good! I don't see any similar benchmark coverage in this area, but it's definitely worth adding.
There was a problem hiding this comment.
The arrow based approach is so different from the file catalog based, it also doesn't really implement SplittableInputSource any more, have you considered adding it as a separate InputSource? maybe we can extract some shared logic into a separate abstract class.
| } | ||
|
|
||
| @Override | ||
| public CloseableIterator<InputRow> read(final InputStats inputStats) throws IOException |
There was a problem hiding this comment.
InputStats is nullable, since the default read() just pass in null.
Fixes #19498
Description
Release note
Apache Iceberg ingestion now supports an opt-in vectorized reader path backed by iceberg-arrow. Enable by setting "useArrowReader": true on the iceberg input source. The Arrow path automatically applies V2 delete files (positional and equality), handles schema evolution, pushes column projection and predicates into the scan planner, and is 2x–3x faster than the existing path.
Future Iceberg spec features (V3 deletion vectors, row lineage, V4+) become available on Iceberg version bumps with no Druid code changes. Default remains the existing path; both coexist.
This PR has: