From 3565551072ead75879d714677f800cfc1d306073 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Wed, 27 May 2026 07:15:24 +0000 Subject: [PATCH 1/2] Enable state tag encoding v2 by default for new Dataflow Streaming Engine jobs Also added CHANGES.md entry detailing the change, how to disable it, and job update/downgrade limitations. --- CHANGES.md | 36 +++++++++++++++++++ .../beam/runners/dataflow/DataflowRunner.java | 3 ++ 2 files changed, 39 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index c87c1271280b..58df50d335f2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -55,6 +55,42 @@ * ([#X](https://github.com/apache/beam/issues/X)). --> +# [2.75.0] - Unreleased + +## Highlights + +* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). +* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). + +## I/Os + +* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). + +## New Features / Improvements + +* (Java) Enabled state tag encoding v2 by default for new Dataflow Streaming Engine jobs. It can be disabled by passing `--experiments=disable_streaming_engine_state_tag_encoding_v2` pipeline option. Note that the tag encoding version cannot change during a job update. Jobs using tag encoding v2 (enabled by default for new jobs on 2.75.0+) cannot be downgraded to Beam versions prior to 2.73.0, as only versions 2.73.0 and later support tag encoding v2. ([#38705](https://github.com/apache/beam/issues/38705)). + +## Breaking Changes + +* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). + +## Deprecations + +* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)). + +## Bugfixes + +* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). + +## Security Fixes + +* Fixed [CVE-YYYY-NNNN](https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN) (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). + +## Known Issues + +[comment]: # ( When updating known issues after release, make sure also update website blog in website/www/site/content/blog.) +* ([#X](https://github.com/apache/beam/issues/X)). + # [2.74.0] - Unreleased ## Highlights diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 299e7fa21ed1..f613aaa42425 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1310,6 +1310,9 @@ public DataflowPipelineJob run(Pipeline pipeline) { // Experiment marking that the harness supports tag encoding v2 // Backend will enable tag encoding v2 only if the harness supports it. experiments.add("streaming_engine_state_tag_encoding_v2_supported"); + // Experiment requesting tag encoding v2 on new jobs. During job updates + // old job's tag encoding version is carried over by the backend. + experiments.add("enable_streaming_engine_state_tag_encoding_v2"); options.setExperiments(ImmutableList.copyOf(experiments)); } From e75123d997bf27484d254da499bedf807cfc0f28 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 29 May 2026 08:43:28 +0000 Subject: [PATCH 2/2] Respect UpdateCompatibility for tag encoding v2 --- CHANGES.md | 2 +- .../beam/runners/dataflow/DataflowRunner.java | 9 ++-- .../runners/dataflow/DataflowRunnerTest.java | 44 +++++++++++++++++++ 3 files changed, 51 insertions(+), 4 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 58df50d335f2..133150a44ece 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,7 +68,7 @@ ## New Features / Improvements -* (Java) Enabled state tag encoding v2 by default for new Dataflow Streaming Engine jobs. It can be disabled by passing `--experiments=disable_streaming_engine_state_tag_encoding_v2` pipeline option. Note that the tag encoding version cannot change during a job update. Jobs using tag encoding v2 (enabled by default for new jobs on 2.75.0+) cannot be downgraded to Beam versions prior to 2.73.0, as only versions 2.73.0 and later support tag encoding v2. ([#38705](https://github.com/apache/beam/issues/38705)). +* (Java) Enabled state tag encoding v2 by default for new Dataflow Streaming Engine jobs. It can be disabled by passing `--experiments=disable_streaming_engine_state_tag_encoding_v2` or `--updateCompatibilityVersion=2.74.0` pipeline option. Note that the tag encoding version cannot change during a job update. Jobs using tag encoding v2 (enabled by default for new jobs on 2.75.0+) cannot be downgraded to Beam versions prior to 2.73.0, as only versions 2.73.0 and later support tag encoding v2. ([#38705](https://github.com/apache/beam/issues/38705)). ## Breaking Changes diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index f613aaa42425..45dde118b949 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -101,6 +101,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.options.SdkHarnessOptions; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverride; @@ -1310,9 +1311,11 @@ public DataflowPipelineJob run(Pipeline pipeline) { // Experiment marking that the harness supports tag encoding v2 // Backend will enable tag encoding v2 only if the harness supports it. experiments.add("streaming_engine_state_tag_encoding_v2_supported"); - // Experiment requesting tag encoding v2 on new jobs. During job updates - // old job's tag encoding version is carried over by the backend. - experiments.add("enable_streaming_engine_state_tag_encoding_v2"); + // Experiment requesting tag encoding v2 on new jobs starting with 2.75.0. During job + // updates old job's tag encoding version is carried over by the backend. + if (!StreamingOptions.updateCompatibilityVersionLessThan(options, "2.75.0")) { + experiments.add("enable_streaming_engine_state_tag_encoding_v2"); + } options.setExperiments(ImmutableList.copyOf(experiments)); } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index f2f8b44a1626..e94ca4c16ad6 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -2920,4 +2920,48 @@ public void processElement( PAssert.that(output).containsInAnyOrder("value:UPDATE_BEFORE"); pipeline.run(); } + + @Test + public void testStreamingStateTagEncodingV2PreCompatibility() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + options.as(StreamingOptions.class).setStreaming(true); + options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.74.0"); + Pipeline p = Pipeline.create(options); + + p.run(); + + List experiments = options.getExperiments(); + assertNotNull(experiments); + assertTrue(experiments.contains("streaming_engine_state_tag_encoding_v2_supported")); + assertFalse(experiments.contains("enable_streaming_engine_state_tag_encoding_v2")); + } + + @Test + public void testStreamingStateTagEncodingV2PostCompatibility() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + options.as(StreamingOptions.class).setStreaming(true); + options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.75.0"); + Pipeline p = Pipeline.create(options); + + p.run(); + + List experiments = options.getExperiments(); + assertNotNull(experiments); + assertTrue(experiments.contains("streaming_engine_state_tag_encoding_v2_supported")); + assertTrue(experiments.contains("enable_streaming_engine_state_tag_encoding_v2")); + } + + @Test + public void testStreamingStateTagEncodingV2NoCompatibility() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + options.as(StreamingOptions.class).setStreaming(true); + Pipeline p = Pipeline.create(options); + + p.run(); + + List experiments = options.getExperiments(); + assertNotNull(experiments); + assertTrue(experiments.contains("streaming_engine_state_tag_encoding_v2_supported")); + assertTrue(experiments.contains("enable_streaming_engine_state_tag_encoding_v2")); + } }