Skip to content

[FnApi Java] Add support for separate named data streams to provide bundle isolation#38863

Open
scwhittle wants to merge 1 commit into
apache:masterfrom
scwhittle:named_streams
Open

[FnApi Java] Add support for separate named data streams to provide bundle isolation#38863
scwhittle wants to merge 1 commit into
apache:masterfrom
scwhittle:named_streams

Conversation

@scwhittle

@scwhittle scwhittle commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

This is advertised to the runner via a new NAMED_DATA_STREAMS protocol capability. The runner is then free to assign bundles to named data streams as it chooses to isolate bundle processing from each other. Instead of single data stream from the sdk, the sdk will create a data stream for each name. The benefit of doing so is that the multiplexing currently performed on data stream messages being received allows a slow bundle to fill up buffers and block the shared stream. With separate named streams, bundles on other data streams have separate grpc flow control from the blocked stream and are not affected.

While making changes:

  • remove some nullness suppressions
  • document the existing metadata headers used on fnapi streams

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@scwhittle scwhittle changed the title [FnApi] Add support for a new name field to ApiServiceDescriptor under StandardProtocols.API_SERVICE_DESCRIPTOR_NAME [FnApi] Add support for a sending header metadata when connecting to ApiServiceDescriptor under StandardProtocols.API_SERVICE_DESCRIPTOR_HEADER_METADATA Jun 9, 2026
@scwhittle scwhittle changed the title [FnApi] Add support for a sending header metadata when connecting to ApiServiceDescriptor under StandardProtocols.API_SERVICE_DESCRIPTOR_HEADER_METADATA [FnApi Java] Add support for separate named data streams to provide bundle isolation Jun 12, 2026
@scwhittle scwhittle force-pushed the named_streams branch 2 times, most recently from e3734bd to 86dfa9f Compare June 12, 2026 12:07
@scwhittle scwhittle marked this pull request as ready for review June 12, 2026 13:37
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces support for named data streams in the FnApi, enabling runners to isolate bundle processing by utilizing separate gRPC streams. This architecture prevents slow bundles from causing head-of-line blocking on shared streams by providing dedicated flow control. The changes include protocol updates, a refactoring of the outbound data aggregator to handle instruction lifecycles, and improved documentation for stream metadata.

Highlights

  • Protocol Changes: Added the NAMED_DATA_STREAMS protocol capability and a data_stream_id field to the FnApi to support bundle isolation.
  • API Refactoring: Refactored BeamFnDataOutboundAggregator to explicitly manage instruction lifecycles using prepareForInstruction and finishInstruction methods.
  • Code Cleanup and Documentation: Removed several nullness suppressions and added comprehensive documentation for metadata headers used in FnApi streams.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for named data streams in the Apache Beam Fn API, allowing the runner to request specific data stream IDs during bundle processing. It refactors BeamFnDataOutboundAggregator to be reusable across multiple bundles using explicit instruction lifecycle methods, and updates BeamFnDataGrpcClient to multiplex connections based on both the API service descriptor and the data stream ID. The review feedback identifies several critical issues: a potential NullPointerException in the background flushing thread of BeamFnDataOutboundAggregator if a flush occurs after an instruction finishes, the need to reset bytesWrittenSinceFlush at the end of each bundle, and the importance of normalizing dataStreamId in MultiplexerKey to prevent duplicate multiplexers. Additionally, minor documentation copy-paste errors in beam_fn_api.proto and grammatical issues in FnHarness.java should be addressed.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java Outdated
@github-actions

Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@scwhittle

Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for named data streams in the Beam Fn API, allowing runners to request specific data stream IDs in bundle processing. Key changes include updating protobuf definitions, refactoring BeamFnDataOutboundAggregator to manage its lifecycle per instruction, and updating BeamFnDataGrpcClient to cache multiplexers using a composite key of ApiServiceDescriptor and dataStreamId. The review feedback identifies critical issues, including potential concurrent NullPointerExceptions and thread-safety violations on gRPC stream observers in BeamFnDataOutboundAggregator. Additionally, it recommends using empty JSON brackets instead of an empty string for empty pipeline options to avoid parsing errors, and normalizing dataStreamId in MultiplexerKey to prevent duplicate multiplexers and simplify the code.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +195 to +196
PipelineOptions options =
PipelineOptionsTranslation.fromJson(pipelineOptionsJson == null ? "" : pipelineOptionsJson);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If pipelineOptionsJson is null, passing "" (empty string) to PipelineOptionsTranslation.fromJson will likely throw a JSON parsing exception because an empty string is not valid JSON. Passing "{}" (empty JSON object) is the standard way to represent empty options and is safe to parse.

Suggested change
PipelineOptions options =
PipelineOptionsTranslation.fromJson(pipelineOptionsJson == null ? "" : pipelineOptionsJson);
PipelineOptions options =
PipelineOptionsTranslation.fromJson(pipelineOptionsJson == null ? "{}" : pipelineOptionsJson);

…undle isolation.

This is advertised to the runner via a new NAMED_DATA_STREAMS protocol capability. The runner is then free to assign bundles to named data streams as it chooses
to isolate bundle processing from each other. Instead of single data stream
from the sdk, the sdk will create a data stream for each name.  The benefit of
doing so is that the multiplexing currently performed on data stream messages being received allows a slow bundle to fill up buffers and block the shared stream.  With separate named streams, bundles on other data streams have separate grpc flow control from the blocked stream and are not affected.
@scwhittle

Copy link
Copy Markdown
Contributor Author

previous snapshots had all tests pass but some flakes, going to assign reviewers (just made some minor fixups)

@scwhittle

Copy link
Copy Markdown
Contributor Author

assign set of reviewers

@github-actions

Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @kennknowles for label java.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
// The calling thread that invokes sendBufferedDataAndFinishOutboundStreams synchronizes on
// The calling thread that invokes sendOrCollectBufferedDataAndFinishOutboundStreams synchronizes on

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the locking in this class is complicated, the synchronization is just needed if there is the background flush future. With a single implementation the duplicated code is at least co-located but we could possibly restructure to two separate implementations to simplify the non-background flushing implementation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant