Implement Asynchronous wrapper for DoFn in Java SDK#38609
Implement Asynchronous wrapper for DoFn in Java SDK#38609tejasiyer-dev wants to merge 11 commits into
Conversation
…he Apache Beam Java SDK (apache#38529)
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces an asynchronous wrapper for Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces AsyncDoFn, a wrapper designed to convert synchronous DoFn operations into asynchronous ones within Apache Beam pipelines. It includes a core implementation using a thread pool and persistent state for buffering, along with a comprehensive test suite. The review identified several critical issues: a performance bottleneck caused by global map scans during timer firing, potential data loss due to ignored timestamps in the output receiver, and logic bugs in the reconciliation loop that could lead to redundant task rescheduling and quadratic complexity. Additionally, the use of a single static lock for all instances may cause thread contention.
…emoved O(N) global activeElements scan. Fixed logic bug where duplicate elements were incorrectly marked for rescheduling. Optimized lookups by converting finishedItems from a list to a HashSet.
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
…ng sleep. Added two more tests to match Python SDK. Fixed formatting issues.
…ixes. Spot Bugs potential fixes.
… that preserves deterministic jitter behavior and avoids pressure on garbage collector (apache#38529)
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces AsyncDoFn, a wrapper class designed to process elements asynchronously using a background thread pool, along with a comprehensive test suite AsyncDoFnTest. The code review highlights several critical concurrency and correctness issues in the implementation. Specifically, the wrapped syncFn is shared across background threads without being thread-safe, exceptions during async execution are caught and result in silent data loss, and tearing down cloned instances can prematurely shut down shared thread pools. Additionally, the reviewer noted a potential division-by-zero bug if timerFrequency is non-positive, and recommended documenting the class's limitations in the Javadoc.
…mini-code-assist - Propagates asynchronous task exceptions as RuntimeExceptions to prevent silent data loss and enable runner-level retries. - Implements a static refCounts registry to safely tear down the shared executor service only when the last cloned instance is destroyed. - Validates timerFrequency in the constructor to prevent zero/negative values from entering infinite loops. - Documents multi-threading requirements, multi-output limitations, and bundle lifecycle behaviors in a class-level comment.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces AsyncDoFn, a wrapper class that converts synchronous Beam DoFns into asynchronous ones using thread pools and state-backed timers, along with a comprehensive test suite. The review feedback highlights several critical areas for improvement: avoiding blocking the runner's timer thread during rescheduling, resolving a performance bottleneck caused by a single JVM-wide static lock, removing redundant locking around atomic counter decrements, ensuring thread safety for the wrapped DoFn via thread-local isolation, preventing potential memory leaks in the static tracking map using an expiring cache, leveraging Beam's relative timer API for deterministic testing, and cleaning up an unused test rule.
| private static final ConcurrentHashMap<String, ConcurrentHashMap<Object, InFlightElement<?>>> | ||
| processingElements = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
The 'processingElements' map is a static 'ConcurrentHashMap' that holds active and completed futures. If a key or window is garbage collected or abandoned by the runner (e.g., due to window expiration), the timer for that key might never fire. Consequently, the corresponding elements will never be removed from 'processingElements', leading to a progressive memory leak and eventual 'OutOfMemoryError' in long-running pipelines.\n\nTo prevent this, use a Guava 'Cache' with an expiration policy (e.g., 'expireAfterWrite' of 1 hour) instead of a raw 'ConcurrentHashMap' for 'processingElements'. This ensures that abandoned or leaked futures are eventually evicted.
private static final ConcurrentHashMap<String, com.google.common.cache.Cache<Object, InFlightElement<?>>>\n processingElements = new ConcurrentHashMap<>();There was a problem hiding this comment.
@gemini-code-assist Intentionally maintaining parity with the Python SDK's tracking mechanism (async_dofn.py), which also uses a standard dictionary without an expiration or eviction policy for its in-memory task tracking
There was a problem hiding this comment.
While maintaining parity with the Python SDK is a valid design goal, it is important to consider the differences in memory management between the two runtimes. In Java, static ConcurrentHashMap entries that are never evicted will persist for the lifetime of the JVM, leading to inevitable OutOfMemoryError in long-running streaming pipelines. Even if the Python implementation currently lacks an eviction policy, it is safer to implement one in the Java SDK to ensure production stability. If you prefer not to use Guava, you could implement a periodic cleanup task or use a LinkedHashMap with access-order eviction, but some form of lifecycle management is necessary for a robust Java implementation.
Stores the partition key inside InFlightElement and cancels/purges orphaned futures inside commitFinishedItems. This prevents silent memory leaks on bundle rollbacks (apache#38529)
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces AsyncDoFn, a wrapper class designed to convert synchronous Apache Beam DoFn operations into asynchronous ones, along with a comprehensive test suite. The reviewer identified several critical and high-severity issues in the implementation: event-time timestamp corruption in AccumulatingOutputReceiver, severe JVM-wide lock contention from a static ReentrantLock, timer starvation in processElement under continuous element arrival, redundant rescheduling of completed tasks on retry when sibling tasks fail, unnecessary thread pool creation when useThreadPool is false, and potential state corruption in commitFinishedItems if InputT lacks proper equals and hashCode implementations.
…r to preserve event-time downstream. Initializes the ExecutorService thread pool only when useThreadPool is true. Refactors state filtering to use finishedElementIds instead of finishedItems, preventing duplicate processing (apache#38529)
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces AsyncDoFn, a wrapper class that converts synchronous DoFn operations into asynchronous ones, along with a comprehensive test suite to verify its behavior under various scenarios. The code review identified several critical issues: high complexity in commitFinishedItems due to flat map iteration over all active elements across all keys, JVM-wide lock contention caused by a static ReentrantLock, thread-safety issues when iterating over a synchronized list in AccumulatingOutputReceiver without manual synchronization, and the ineffectiveness of CompletableFuture.cancel(true) in interrupting background threads.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces AsyncDoFn, a wrapper class designed to convert synchronous DoFn operations into asynchronous ones, along with a comprehensive suite of unit tests in AsyncDoFnTest. The review feedback highlights several critical issues: severe thread contention caused by a JVM-wide static lock, performance degradation during timer firing due to scanning all active elements across all keys, loss of custom windowing and watermark tracking from hardcoding GlobalWindow.INSTANCE and shifting timestamps when rescheduling, and unnecessary synchronization overhead in AccumulatingOutputReceiver from using ConcurrentLinkedQueue instead of ArrayList.
…parameter without Hardcoding GlobalWindow.Instance (apache#38529)
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces AsyncDoFn, a wrapper class designed to convert synchronous DoFn operations into asynchronous ones within Apache Beam pipelines, accompanied by a comprehensive suite of unit tests. The review feedback highlights several critical performance and design improvements: replacing the global static ReentrantLock with per-UUID locks to prevent severe thread contention, partitioning the processingElements map by key to eliminate expensive O(N) scans during timer processing, simplifying the timer firing calculations with pure integer math, and unwrapping ExecutionException to improve error trace readability.
fixes #38529
R: @AMOOOMA
This PR introduces AsyncDoFn and AsyncDoFnTest to the Apache Beam Java SDK.
AsyncDoFn acts as an execution wrapper around a standard synchronous DoFn, offloading element processing to a background thread pool. Decoupling the runner's event loop (main thread) from high-latency, I/O-heavy element processing (background threads) prevents synchronous blocking, implements backpressure, and significantly increases pipeline throughput.
1. Ingestion & Local Deduplication (Main Thread)
2. Backpressure & Capacity Check (Main Thread)
3. Task Creation & Durable State Writing (Main Thread)
When capacity is available, the main thread performs the following steps sequentially:
4. Background Execution (Background Worker Threads)
5. Timer Reconciliation & Cleanup (Main Thread)
When the Timer fires for Key K, the main thread executes a synchronous reconciliation cycle:
Early Exit: If BagState for Key K is empty, it exits immediately to free up CPU.
State Reconciliation: Iterates through the elements listed in BagState:
Timer Reset: If any elements remain unfinished, a new timer is scheduled for the next check cycle.
Testing & Verification
AsyncDoFnTest.java): Includes 15 unit tests covering all concurrency and logical paths, including:./gradlew :sdks:java:core:test --tests "org.apache.beam.sdk.transforms.AsyncDoFnTest"
Considering renaming this to ConcurrentDoFnWrapper or ConcurrentDoFn
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.