[FnApi Java] Add support for separate named data streams to provide bundle isolation#38863
[FnApi Java] Add support for separate named data streams to provide bundle isolation#38863scwhittle wants to merge 1 commit into
Conversation
e3734bd to
86dfa9f
Compare
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces support for named data streams in the FnApi, enabling runners to isolate bundle processing by utilizing separate gRPC streams. This architecture prevents slow bundles from causing head-of-line blocking on shared streams by providing dedicated flow control. The changes include protocol updates, a refactoring of the outbound data aggregator to handle instruction lifecycles, and improved documentation for stream metadata. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces support for named data streams in the Apache Beam Fn API, allowing the runner to request specific data stream IDs during bundle processing. It refactors BeamFnDataOutboundAggregator to be reusable across multiple bundles using explicit instruction lifecycle methods, and updates BeamFnDataGrpcClient to multiplex connections based on both the API service descriptor and the data stream ID. The review feedback identifies several critical issues: a potential NullPointerException in the background flushing thread of BeamFnDataOutboundAggregator if a flush occurs after an instruction finishes, the need to reset bytesWrittenSinceFlush at the end of each bundle, and the importance of normalizing dataStreamId in MultiplexerKey to prevent duplicate multiplexers. Additionally, minor documentation copy-paste errors in beam_fn_api.proto and grammatical issues in FnHarness.java should be addressed.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces support for named data streams in the Beam Fn API, allowing runners to request specific data stream IDs in bundle processing. Key changes include updating protobuf definitions, refactoring BeamFnDataOutboundAggregator to manage its lifecycle per instruction, and updating BeamFnDataGrpcClient to cache multiplexers using a composite key of ApiServiceDescriptor and dataStreamId. The review feedback identifies critical issues, including potential concurrent NullPointerExceptions and thread-safety violations on gRPC stream observers in BeamFnDataOutboundAggregator. Additionally, it recommends using empty JSON brackets instead of an empty string for empty pipeline options to avoid parsing errors, and normalizing dataStreamId in MultiplexerKey to prevent duplicate multiplexers and simplify the code.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| PipelineOptions options = | ||
| PipelineOptionsTranslation.fromJson(pipelineOptionsJson == null ? "" : pipelineOptionsJson); |
There was a problem hiding this comment.
If pipelineOptionsJson is null, passing "" (empty string) to PipelineOptionsTranslation.fromJson will likely throw a JSON parsing exception because an empty string is not valid JSON. Passing "{}" (empty JSON object) is the standard way to represent empty options and is safe to parse.
| PipelineOptions options = | |
| PipelineOptionsTranslation.fromJson(pipelineOptionsJson == null ? "" : pipelineOptionsJson); | |
| PipelineOptions options = | |
| PipelineOptionsTranslation.fromJson(pipelineOptionsJson == null ? "{}" : pipelineOptionsJson); |
…undle isolation. This is advertised to the runner via a new NAMED_DATA_STREAMS protocol capability. The runner is then free to assign bundles to named data streams as it chooses to isolate bundle processing from each other. Instead of single data stream from the sdk, the sdk will create a data stream for each name. The benefit of doing so is that the multiplexing currently performed on data stream messages being received allows a slow bundle to fill up buffers and block the shared stream. With separate named streams, bundles on other data streams have separate grpc flow control from the blocked stream and are not affected.
|
previous snapshots had all tests pass but some flakes, going to assign reviewers (just made some minor fixups) |
|
assign set of reviewers |
|
Assigning reviewers: R: @kennknowles for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
| "nullness" // TODO(https://github.com/apache/beam/issues/20497) | ||
| }) | ||
| // The calling thread that invokes sendBufferedDataAndFinishOutboundStreams synchronizes on | ||
| // The calling thread that invokes sendOrCollectBufferedDataAndFinishOutboundStreams synchronizes on |
There was a problem hiding this comment.
the locking in this class is complicated, the synchronization is just needed if there is the background flush future. With a single implementation the duplicated code is at least co-located but we could possibly restructure to two separate implementations to simplify the non-background flushing implementation.
This is advertised to the runner via a new NAMED_DATA_STREAMS protocol capability. The runner is then free to assign bundles to named data streams as it chooses to isolate bundle processing from each other. Instead of single data stream from the sdk, the sdk will create a data stream for each name. The benefit of doing so is that the multiplexing currently performed on data stream messages being received allows a slow bundle to fill up buffers and block the shared stream. With separate named streams, bundles on other data streams have separate grpc flow control from the blocked stream and are not affected.
While making changes:
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.