Skip to content

[IO] Implement DeltaIO reader and add Delta Lake perf tests#38750

Closed
durgaprasadml wants to merge 2 commits into
apache:masterfrom
durgaprasadml:feature/delta-io-perf-tests-clean
Closed

[IO] Implement DeltaIO reader and add Delta Lake perf tests#38750
durgaprasadml wants to merge 2 commits into
apache:masterfrom
durgaprasadml:feature/delta-io-perf-tests-clean

Conversation

@durgaprasadml
Copy link
Copy Markdown
Contributor

Description:

What does this PR do?

This PR implements the Delta Lake source reader using the Delta Kernel API and adds performance/integration tests for Delta Lake reads.

The implementation introduces a parallelized read path for Delta tables by planning scans on the coordinator and distributing Parquet file reads across Beam workers.

Changes Included

DeltaIO Reader Implementation

  • Completed DeltaIO.ReadRows
  • Added Delta Kernel snapshot loading support
  • Added scan planning and file descriptor generation
  • Implemented parallel Parquet reads using Beam transforms
  • Added Beam Schema inference from Delta schemas
  • Added logical Delta Row → Beam Row conversion
  • Added support for:
    • primitive types
    • nested structs
    • arrays
    • maps

Performance / Integration Tests

Added:

  • DeltaIOIT
  • DeltaIOTestPipelineOptions

Test scenarios:

  • testReadSmall
  • testReadLarge
  • testReadPartitioned

The tests:

  • generate Delta tables locally
  • create Delta logs dynamically
  • validate partitioned reads
  • collect throughput and latency metrics
  • publish metrics using IOITMetrics

Build Updates

Updated sdks/java/io/delta/build.gradle with required integration test dependencies and Hadoop runtime dependencies required by Delta Kernel.

Verification

Executed:

bash ./gradlew :sdks:java:io:delta:compileJava ./gradlew :sdks:java:io:delta:compileTestJava ./gradlew :sdks:java:io:delta:test --tests org.apache.beam.sdk.io.delta.DeltaIOIT

Fixes #38559

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a new Delta Lake source reader for Apache Beam, leveraging the Delta Kernel API to provide a robust and scalable way to read Delta tables. The implementation includes support for parallelized Parquet file reads, automatic mapping of Delta types to Beam schemas, and full integration testing to ensure performance and correctness across various table configurations.

Highlights

  • DeltaIO Reader Implementation: Implemented the DeltaIO.ReadRows transform using the Delta Kernel API, enabling parallelized reads of Delta tables and automatic schema inference.
  • Performance and Integration Testing: Added DeltaIOIT and DeltaIOTestPipelineOptions to support comprehensive testing of small, large, and partitioned Delta table reads, including metric publishing.
  • Build Configuration: Updated the build.gradle file for the Delta IO module to include necessary dependencies for integration testing and Hadoop runtime support.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements the read path for Delta Lake tables in DeltaIO using the Delta Kernel API, including schema mapping, row conversion, and integration tests. While the implementation is a great step forward, there are several critical architectural and performance issues that need to be addressed. Specifically, planning the table scan during pipeline construction freezes the file list and requires client-side access, while re-scanning the entire file list in ReadFileFn for every file descriptor results in $O(N^2)$ complexity. Additionally, the row conversion logic is missing support for several complex data types inside vectors, and the schema inference does not preserve field nullability.

Comment thread sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java Outdated
Comment thread sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/DeltaIO.java Outdated
@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @Abacn for label java.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@derrickaw
Copy link
Copy Markdown
Collaborator

CC: @chamikaramj

@chamikaramj
Copy link
Copy Markdown
Contributor

Thanks for the PR but please note that this is ongoing work and a more complete source with SDF support is being implemented here: #38706

Could you please comment there instead of re-implementing so that we don't repeat work ?

Feel free to take any unassigned sub-tasks here: #21100

Also for more context.

Design: https://s.apache.org/beam-delta-lake-source

Dev-list thread: https://lists.apache.org/thread/8wqox64s68o2mbqmpr1mlcg30pq3r91k

@durgaprasadml
Copy link
Copy Markdown
Contributor Author

Thanks for the clarification and for pointing me to #38706.

I wasn’t aware that a more complete SDF-based implementation was already in progress. I’ll avoid duplicating the work here and will continue the discussion/contributions on #38706 instead.

I appreciate the references to the design doc and dev-list thread as well — I’ll review those for better alignment with the ongoing implementation effort.

Thanks again for the guidance.

@chamikaramj
Copy link
Copy Markdown
Contributor

Great! Thanks. As mentioned, pls feel free to take any unassigned sub-tasks from here to contribute : #21100

(for example perf testing)

You can add them once the initial source is in.

Also any reviews/comments on the existing source PR or the design doc is welcome since you already have the context :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Task]: Add perf tests

3 participants