Skip to content

Adds a new coder translator for Java SchemaCoder. #37631

Open
acrites wants to merge 3 commits intoapache:masterfrom
acrites:new-schema-coder
Open

Adds a new coder translator for Java SchemaCoder. #37631
acrites wants to merge 3 commits intoapache:masterfrom
acrites:new-schema-coder

Conversation

@acrites
Copy link
Contributor

@acrites acrites commented Feb 17, 2026

The new translator splits out the Schema proto from the toRowFn and fromRowFn's so that runners can check the Schema proto for compatibility updates such as reordering fields. Previously, the coder was simply translated as a custom java coder, which runners couldn't reason about.

Since this is a potentially backwards incompatible change, this PR also adds PipelineOptions to the CoderTranslator's contexts so that we can preserve the old-style translation if the compatibility version is 2.71 or below (this is assuming this change makes it into the 2.72 release).

This fixes a few issues related to Schemas and pipeline updates: addresses #36496, #30276, #29245.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

…s to translation context so we can disable the new translator based on pipeline compatibility version.
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @acrites, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Apache Beam Java SDK's ability to handle SchemaCoders in a portable and forward-compatible manner. By introducing a dedicated CoderTranslator for SchemaCoder, the system can now separate the schema definition from the toRowFn and fromRowFn logic, which allows runners to better understand and adapt to schema changes during pipeline updates. A crucial aspect of this change is the inclusion of PipelineOptions in the translation context, providing a mechanism to maintain backward compatibility for older pipelines by reverting to the previous custom Java coder translation when a specific compatibility version is set.

Highlights

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • CHANGES.md
    • Updated breaking changes section to document portable SchemaCoder encoding, compatibility options, and fixed issues.
  • runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
    • Modified SdkComponents.create calls to include PipelineOptions for context.
  • runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
    • Updated SdkComponents.create calls to pass PipelineOptions and refined a comment regarding job submission types.
  • runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
    • Added necessary imports for AutoValue and schema-related classes.
    • Updated createSdkComponents to use PipelineOptions.
    • Introduced a new test case for SchemaCoder translation with compatibility checks.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java
    • Modified the toKnownCoder method to pass PipelineOptions via a new TranslationContextWithOptions to CoderTranslator methods.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java
    • Updated getUrn and getPayload methods to accept a TranslationContext.
    • Provided a default getUrn implementation.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java
    • Defined TranslationContextWithOptions to expose PipelineOptions.
    • Updated getPayload signatures for ParamWindowedValueCoder and RowCoder to accept TranslationContext.
    • Implemented a new schema() translator for SchemaCoder that handles portable encoding and backward compatibility.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java
    • Registered the new SchemaCoder and its corresponding translator.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java
    • Defined a new URN constant for SchemaCoder.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java
    • Modified getSdkComponents to include PipelineOptions when creating SdkComponents.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java
    • Added a PipelineOptions field.
    • Updated constructors and create methods to manage these options.
    • Exposed a getPipelineOptions method.
  • sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java
    • Added a test AutoValue class, SchemaRegistry setup, and a helper for SchemaCoder creation.
    • Included SchemaCoder in the set of known coders for testing.
  • sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
    • Corrected a typo in a log message.
    • Added a clarifying comment about SdkComponents and PipelineOptions.
  • sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderTranslator.java
    • Updated the getPayload method signature to accept a TranslationContext.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@Override
public SchemaCoder<T> fromComponents(
List<Coder<?>> components, byte[] payload, CoderTranslation.TranslationContext context) {
checkArgument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enforce that this isn't called if !isStructuredCoderEnabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a checkArgumentcall.


// Needed to find which transform was new...
// This SdkComponents comes from rehydratedComponents, but doesn't take into account any
// additional translation options specified in PipelineOptions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it? Does this mean that the new format doesn't work with xlang?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think it will use pipeline options because the rehydratedComponents object is constructed using withPipeline(), which copies over the pipeline options.

…factors common code into helper method, and removes an incorrect comment.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments