diff --git a/CHANGES.md b/CHANGES.md index 74209bb7499c..89eb775b238b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,7 +68,7 @@ ## New Features / Improvements -* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* (Java) Enabled state tag encoding v2 by default for new Dataflow Streaming Engine jobs. It can be disabled by passing `--experiments=disable_streaming_engine_state_tag_encoding_v2` or `--updateCompatibilityVersion=2.74.0` pipeline option. Note that the tag encoding version cannot change during a job update. Jobs using tag encoding v2 (enabled by default for new jobs on 2.75.0+) cannot be downgraded to Beam versions prior to 2.73.0, as only versions 2.73.0 and later support tag encoding v2. ([#38705](https://github.com/apache/beam/issues/38705)). ## Breaking Changes diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index d04afb351e44..4864f2cf4537 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -101,6 +101,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.options.SdkHarnessOptions; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverride; @@ -1310,6 +1311,11 @@ public DataflowPipelineJob run(Pipeline pipeline) { // Experiment marking that the harness supports tag encoding v2 // Backend will enable tag encoding v2 only if the harness supports it. experiments.add("streaming_engine_state_tag_encoding_v2_supported"); + // Experiment requesting tag encoding v2 on new jobs starting with 2.75.0. During job + // updates old job's tag encoding version is carried over by the backend. + if (!StreamingOptions.updateCompatibilityVersionLessThan(options, "2.75.0")) { + experiments.add("enable_streaming_engine_state_tag_encoding_v2"); + } options.setExperiments(ImmutableList.copyOf(experiments)); } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 122c4aeee34f..073b30f928dc 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -2923,4 +2923,48 @@ public void processElement( PAssert.that(output).containsInAnyOrder("value:UPDATE_BEFORE"); pipeline.run(); } + + @Test + public void testStreamingStateTagEncodingV2PreCompatibility() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + options.as(StreamingOptions.class).setStreaming(true); + options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.74.0"); + Pipeline p = Pipeline.create(options); + + p.run(); + + List experiments = options.getExperiments(); + assertNotNull(experiments); + assertTrue(experiments.contains("streaming_engine_state_tag_encoding_v2_supported")); + assertFalse(experiments.contains("enable_streaming_engine_state_tag_encoding_v2")); + } + + @Test + public void testStreamingStateTagEncodingV2PostCompatibility() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + options.as(StreamingOptions.class).setStreaming(true); + options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.75.0"); + Pipeline p = Pipeline.create(options); + + p.run(); + + List experiments = options.getExperiments(); + assertNotNull(experiments); + assertTrue(experiments.contains("streaming_engine_state_tag_encoding_v2_supported")); + assertTrue(experiments.contains("enable_streaming_engine_state_tag_encoding_v2")); + } + + @Test + public void testStreamingStateTagEncodingV2NoCompatibility() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + options.as(StreamingOptions.class).setStreaming(true); + Pipeline p = Pipeline.create(options); + + p.run(); + + List experiments = options.getExperiments(); + assertNotNull(experiments); + assertTrue(experiments.contains("streaming_engine_state_tag_encoding_v2_supported")); + assertTrue(experiments.contains("enable_streaming_engine_state_tag_encoding_v2")); + } }