Skip to content

feat: Added IcebergArrowInputSourceReader #19510

Open
Shekharrajak wants to merge 19 commits into
apache:masterfrom
Shekharrajak:feature/iceberg-arrow-reader
Open

feat: Added IcebergArrowInputSourceReader #19510
Shekharrajak wants to merge 19 commits into
apache:masterfrom
Shekharrajak:feature/iceberg-arrow-reader

Conversation

@Shekharrajak
Copy link
Copy Markdown
Contributor

@Shekharrajak Shekharrajak commented May 23, 2026

Fixes #19498

Description

  • Added IcebergArrowInputSourceReader using iceberg-arrow vectorized API
  • Returns the live Table object so the Arrow reader can drive scan planning
  • Two new optional JSON properties on the input spec. useArrowReader=false keeps existing behaviour byte-for-byte; useArrowReader=true switches to the Arrow path. arrowBatchSize defaults to 1024.

Release note

Apache Iceberg ingestion now supports an opt-in vectorized reader path backed by iceberg-arrow. Enable by setting "useArrowReader": true on the iceberg input source. The Arrow path automatically applies V2 delete files (positional and equality), handles schema evolution, pushes column projection and predicates into the scan planner, and is 2x–3x faster than the existing path.
Future Iceberg spec features (V3 deletion vectors, row lineage, V4+) become available on Iceberg version bumps with no Druid code changes. Default remains the existing path; both coexist.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@Shekharrajak
Copy link
Copy Markdown
Contributor Author

Benchmark :

| numRows  | numCols | icebergArrowInputSourceReader    | icebergInputSourceReader         | Speedup |
|         |         |  ms/op         throughput        |  ms/op         throughput        | (vs baseline) |
+---------+---------+----------------------------------+----------------------------------+---------+
| 100,000 |   5     |   17.70   5,651,074 rows/s       |   51.35   1,947,542 rows/s       |  2.90x  |
| 100,000 |  15     |   44.11   2,266,881 rows/s       |   95.81   1,043,680 rows/s       |  2.17x  |
| 500,000 |   5     |   74.92   6,673,450 rows/s       |  187.73   2,663,420 rows/s       |  2.51x  |
| 500,000 |  15     |  197.97   2,525,637 rows/s       |  420.94   1,187,817 rows/s       |  2.13x  |
================================================================================
Summary:
  Best speedup:    2.90x  (numRows=100,000, numCols=5)
  Worst speedup:   2.13x  (numRows=500,000, numCols=15)
  Geomean speedup: 2.41x

@Shekharrajak
Copy link
Copy Markdown
Contributor Author

Looking into benchmark module which will be used throughout the arrow and datafusion integration to quickly check the improvements.

private final boolean useArrowReader;

@JsonProperty
private final int arrowBatchSize;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With a batch of 1024 values in a contiguous buffer -> emit SIMD instructions that process 8-16 rows per CPU cycle.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decompression amortization -> batch-read helps in decompress once and consume many row, since parquet compressed pages .

@Shekharrajak
Copy link
Copy Markdown
Contributor Author

ingestion time at 100k × 5

Read

Arrow implementation : 17ms
Current implementaiton : 53ms

Index + Persist (= total − read)

~330ms
Similar time on both implementation

total time

Arrow implementation: 347 ms
current implementation: 410 ms

Index+persist does substantially more work per row than read. So even though Arrow makes read ~3x faster, that gain is dwarfed when amortized over the much-larger indexing cost.

Benchmark added https://github.com/Shekharrajak/druid/pull/1/changes

@Shekharrajak
Copy link
Copy Markdown
Contributor Author

looks like network error https://github.com/apache/druid/actions/runs/26333466916/job/77523245546?pr=19510 - please trigger the CI check again.

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 2
P2 1
P3 0
Total 3

Reviewed 7 of 7 changed files.


This is an automated review by Codex GPT-5.5

@Shekharrajak Shekharrajak force-pushed the feature/iceberg-arrow-reader branch from 8808ec4 to f151f5b Compare May 24, 2026 14:59
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException
{
if (useArrowReader) {
return 1;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future PR: Enable parallel ingestion (maxNumConcurrentSubTasks > 1) when useArrowReader: true by adding split-coordination plumbing to IcebergInputSource. Currently Arrow mode forces a single subtask to guarantee correctness;

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the follow-up changes. The projection issue and Arrow split handling thread look resolved; I left a separate inline reply on the remaining residual-filter snapshot-time parity gap.

Reviewed 7 of 7 changed files.


This is an automated review by Codex GPT-5.5

@Shekharrajak
Copy link
Copy Markdown
Contributor Author

@FrankChen021 @jtuglu1 - please have a look. CI checks are green.

scan.splitLookback(),
scan.splitOpenFileCost()
);
final ArrowReader arrowReader = new ArrowReader(scan, batchSize, true);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reads Iceberg data files as Arrow columnar batches, then iterates rows lazily as InputRow objects.

{
// Pin Arrow to Unsafe allocator: Netty backend fails on JDK 25 (EmptyByteBuf.memoryAddress UnsupportedOperationException).
static {
if (System.getProperty("arrow.allocation.manager.type") == null) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NettyAllocationManager. throws UnsupportedOperationException on JDK 25 (incompatible with newer JDK module encapsulation). Pinning to "Unsafe" uses sun.misc.Unsafe which works on all supported JDKs (21, 25).

CI checks were failing .

* Column projection and predicate push-down are applied at scan planning time so only requested
* columns and matching files are read from storage.
*
* Note: iceberg-arrow currently supports Parquet data files only. ORC and Avro files will throw
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supports only parquet data file

private final ResidualFilterMode residualFilterMode;

@JsonProperty
private final boolean useArrowReader;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adds useArrowReader opt-in flag

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the follow-up changes for correctness, edge cases, concurrency, and integration risks; no new issues found.

Reviewed 7 of 7 changed files.


This is an automated review by Codex GPT-5.5

@Shekharrajak
Copy link
Copy Markdown
Contributor Author

Hi @jtuglu1 @clintropolis @FrankChen021 - Please have a look, I have benchmark PR drafted which we will use across all features to see the improvements we are getting through vectorization & arrow implementations.

Copy link
Copy Markdown
Contributor

@cecemei cecemei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benchmark results look good! I don't see any similar benchmark coverage in this area, but it's definitely worth adding.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The arrow based approach is so different from the file catalog based, it also doesn't really implement SplittableInputSource any more, have you considered adding it as a separate InputSource? maybe we can extract some shared logic into a separate abstract class.

}

@Override
public CloseableIterator<InputRow> read(final InputStats inputStats) throws IOException
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InputStats is nullable, since the default read() just pass in null.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Iceberg source: adopt Iceberg's native Arrow reader stack for forward-compatibility with Iceberg spec evolution and performance improvements

3 participants