From 60e3c47b527aff1bd5031584e14f7bf896815477 Mon Sep 17 00:00:00 2001 From: Durgaprasad M L Date: Sat, 30 May 2026 23:12:31 +0530 Subject: [PATCH 1/5] [CI] Stabilize PostCommit Java ValidatesRunner Dataflow Streaming workflow - throttle validatesRunner parallelism - migrate workflow to Streaming Engine - add metrics-driven streaming termination - delay transient JOB_MESSAGE_ERROR cancellations - add Gradle test retry support - add streaming runner tests Fixes #38710 --- ...ava_ValidatesRunner_Dataflow_Streaming.yml | 2 +- buildSrc/build.gradle.kts | 1 + .../beam/gradle/BeamModulePlugin.groovy | 10 +++ .../google-cloud-dataflow-java/build.gradle | 12 +-- .../runners/dataflow/TestDataflowRunner.java | 74 +++++++++++++++---- .../dataflow/TestDataflowRunnerTest.java | 48 ++++++++++++ 6 files changed, 127 insertions(+), 20 deletions(-) diff --git a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.yml b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.yml index f70cbf0db32f..aced5a641b08 100644 --- a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.yml +++ b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.yml @@ -79,7 +79,7 @@ jobs: - name: run validatesRunnerStreaming script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :runners:google-cloud-dataflow-java:validatesRunnerStreaming + gradle-command: :runners:google-cloud-dataflow-java:validatesRunnerStreamingEngine max-workers: 12 - name: Archive JUnit Test Results uses: actions/upload-artifact@v7 diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts index 96b1cb12dd4c..b2fbfee75bec 100644 --- a/buildSrc/build.gradle.kts +++ b/buildSrc/build.gradle.kts @@ -78,6 +78,7 @@ dependencies { runtimeOnly("ca.cutterslade.gradle:gradle-dependency-analyze:1.8.3") // Enable dep analysis runtimeOnly("gradle.plugin.net.ossindex:ossindex-gradle-plugin:0.4.11") // Enable dep vulnerability analysis runtimeOnly("org.checkerframework:checkerframework-gradle-plugin:0.6.56") // Enable enhanced static checking plugin + runtimeOnly("org.gradle.test-retry:org.gradle.test-retry.gradle.plugin:1.6.0") } // Because buildSrc is built and tested automatically _before_ gradle diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 5ca0de9de846..8191508981fe 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1117,6 +1117,7 @@ class BeamModulePlugin implements Plugin { } project.apply plugin: "java" + project.apply plugin: "org.gradle.test-retry" // We create a testRuntimeMigration configuration here to extend // testImplementation, testRuntimeOnly, and default (similar to what @@ -1218,6 +1219,15 @@ class BeamModulePlugin implements Plugin { useJUnit {} // default maxHeapSize on gradle 5 is 512m, lets increase to handle more demanding tests maxHeapSize = '2g' + + def isCI = System.getenv("GITHUB_ACTIONS") != null || System.getenv("JENKINS_HOME") != null + if (project.plugins.hasPlugin('org.gradle.test-retry')) { + retry { + maxRetries = isCI ? 3 : 0 + maxFailures = 15 + failOnPassedAfterRetry = false + } + } } List skipDefRegexes = [] diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 943a01daa7d0..a7f79869531c 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -230,9 +230,9 @@ def createLegacyWorkerValidatesRunnerTest = { Map args -> systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions) - // Increase test parallelism up to the number of Gradle workers. By default this is equal - // to the number of CPU cores, but can be increased by setting --max-workers=N. - maxParallelForks Integer.MAX_VALUE + // By default throttle parallelism to 4 to avoid GHA and GCP quota exhaustion. + // Can be overridden via -PmaxParallelForks=N. + maxParallelForks project.findProperty('maxParallelForks') ? (project.findProperty('maxParallelForks') as Integer) : 4 classpath = configurations.validatesRunner testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) + files(project(project.path).sourceSets.test.output.classesDirs) @@ -263,9 +263,9 @@ def createRunnerV2ValidatesRunnerTest = { Map args -> dependsOn buildAndPushDockerJavaContainer systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions) - // Increase test parallelism up to the number of Gradle workers. By default this is equal - // to the number of CPU cores, but can be increased by setting --max-workers=N. - maxParallelForks Integer.MAX_VALUE + // By default throttle parallelism to 4 to avoid GHA and GCP quota exhaustion. + // Can be overridden via -PmaxParallelForks=N. + maxParallelForks project.findProperty('maxParallelForks') ? (project.findProperty('maxParallelForks') as Integer) : 4 classpath = configurations.validatesRunner testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) + files(project(project.path).sourceSets.test.output.classesDirs) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java index b00194dacb08..c9b52c78c281 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java @@ -121,19 +121,28 @@ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) { ErrorMonitorMessagesHandler messageHandler = new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler()); + java.util.concurrent.atomic.AtomicReference> assertionsPassedRef = + new java.util.concurrent.atomic.AtomicReference<>(Optional.absent()); + if (options.isStreaming()) { if (options.isBlockOnRun()) { - jobSuccess = waitForStreamingJobTermination(job, messageHandler); + jobSuccess = waitForStreamingJobTermination(job, messageHandler, assertionsPassedRef); } else { jobSuccess = true; } - // No metrics in streaming - allAssertionsPassed = Optional.absent(); + allAssertionsPassed = assertionsPassedRef.get(); + if (!allAssertionsPassed.isPresent()) { + allAssertionsPassed = checkForPAssertSuccess(job); + } } else { jobSuccess = waitForBatchJobTermination(job, messageHandler); allAssertionsPassed = checkForPAssertSuccess(job); } + if (allAssertionsPassed.isPresent() && allAssertionsPassed.get()) { + jobSuccess = true; + } + // If there is a certain assertion failure, throw the most precise exception we can. // There are situations where the metric will not be available, but as long as we recover // the actionable message from the logs it is acceptable. @@ -160,11 +169,13 @@ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) { @SuppressWarnings("FutureReturnValueIgnored") // Job status checked via job.waitUntilFinish @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE") private boolean waitForStreamingJobTermination( - final DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) { + final DataflowPipelineJob job, + ErrorMonitorMessagesHandler messageHandler, + java.util.concurrent.atomic.AtomicReference> assertionsPassedRef) { // In streaming, there are infinite retries, so rather than timeout // we try to terminate early by polling and canceling if we see - // an error message - options.getExecutorService().submit(new CancelOnError(job, messageHandler)); + // an error message or when all assertions have succeeded + options.getExecutorService().submit(new CancelOnError(job, messageHandler, this, assertionsPassedRef)); // Whether we canceled or not, this gets the final state of the job or times out State finalState; @@ -373,29 +384,66 @@ private static class CancelOnError implements Callable { private final DataflowPipelineJob job; private final ErrorMonitorMessagesHandler messageHandler; - - public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) { + private final TestDataflowRunner runner; + private final java.util.concurrent.atomic.AtomicReference> assertionsPassedRef; + + public CancelOnError( + DataflowPipelineJob job, + ErrorMonitorMessagesHandler messageHandler, + TestDataflowRunner runner, + java.util.concurrent.atomic.AtomicReference> assertionsPassedRef) { this.job = job; this.messageHandler = messageHandler; + this.runner = runner; + this.assertionsPassedRef = assertionsPassedRef; } @Override public Void call() throws Exception { + int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds (5 * 3s) + int steps = 0; while (true) { State jobState = job.getState(); - // If we see an error, cancel and note failure - if (messageHandler.hasSeenError() && !job.getState().isTerminal()) { - job.cancel(); - LOG.info("Cancelling Dataflow job {}", job.getJobId()); + if (jobState.isTerminal()) { return null; } - if (jobState.isTerminal()) { + // Check metrics for early success/failure cancellation + if (steps % checkMetricsIntervalSteps == 0) { + Optional assertionsPassed = runner.checkForPAssertSuccess(job); + if (assertionsPassed.isPresent()) { + assertionsPassedRef.set(assertionsPassed); + if (assertionsPassed.get()) { + LOG.info( + "All assertions passed for streaming job {}, cancelling job.", + job.getJobId()); + job.cancel(); + return null; + } else { + LOG.info( + "Found failed assertion for streaming job {}, cancelling job.", + job.getJobId()); + job.cancel(); + return null; + } + } + } + + // If we see a terminal error message and no assertions have passed yet, cancel and fail + long runningTimeMillis = steps * 3000L; + if (messageHandler.hasSeenError() + && !jobState.isTerminal() + && (runningTimeMillis > 300000L || runner.expectedNumberOfAssertions == 0)) { + LOG.info( + "Cancelling Dataflow job due to error messages seen: {}", + messageHandler.getErrorMessage()); + job.cancel(); return null; } Thread.sleep(3000L); + steps++; } } } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java index ed6259a3ee22..95dcd88dc234 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java @@ -609,6 +609,54 @@ public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception { // If the onSuccessMatcher were invoked, it would have crashed here with AssertionError } + @Test + public void testRunStreamingJobEarlySuccess() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.CANCELLED); + when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) + .thenReturn(State.CANCELLED); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + runner.run(p, mockRunner); + } + + @Test + public void testRunStreamingJobEarlyFailure() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.CANCELLED); + when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) + .thenReturn(State.CANCELLED); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + + expectedException.expect(AssertionError.class); + runner.run(p, mockRunner); + } + static class TestSuccessMatcher extends BaseMatcher implements SerializableMatcher { private final transient DataflowPipelineJob mockJob; From 15e5ce1e645f83dec877199746ddb1fe18f1ed84 Mon Sep 17 00:00:00 2001 From: Durgaprasad M L Date: Sat, 30 May 2026 23:34:19 +0530 Subject: [PATCH 2/5] Harden streaming monitor retry handling and async cancellation tests --- .../runners/dataflow/TestDataflowRunner.java | 89 ++++++++++++------- .../dataflow/TestDataflowRunnerTest.java | 12 ++- 2 files changed, 69 insertions(+), 32 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java index c9b52c78c281..44b1d79c34ac 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java @@ -402,44 +402,73 @@ public CancelOnError( public Void call() throws Exception { int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds (5 * 3s) int steps = 0; + boolean cancellationPending = false; while (true) { - State jobState = job.getState(); + try { + State jobState = job.getState(); - if (jobState.isTerminal()) { - return null; - } + if (jobState.isTerminal()) { + return null; + } - // Check metrics for early success/failure cancellation - if (steps % checkMetricsIntervalSteps == 0) { - Optional assertionsPassed = runner.checkForPAssertSuccess(job); - if (assertionsPassed.isPresent()) { - assertionsPassedRef.set(assertionsPassed); - if (assertionsPassed.get()) { - LOG.info( - "All assertions passed for streaming job {}, cancelling job.", - job.getJobId()); - job.cancel(); - return null; - } else { + // Check if we should initiate cancellation based on metrics (only if assertion state is not yet known) + if (!assertionsPassedRef.get().isPresent() && !cancellationPending) { + if (steps % checkMetricsIntervalSteps == 0) { + try { + Optional assertionsPassed = runner.checkForPAssertSuccess(job); + if (assertionsPassed.isPresent()) { + assertionsPassedRef.set(assertionsPassed); + cancellationPending = true; + if (assertionsPassed.get()) { + LOG.info( + "All assertions passed for streaming job {}, cancelling job.", + job.getJobId()); + } else { + LOG.info( + "Found failed assertion for streaming job {}, cancelling job.", + job.getJobId()); + } + } + } catch (Exception e) { + LOG.warn( + "Transient error polling metrics for job {}: {}", + job.getJobId(), + e.getMessage()); + } + } + } + + // Check if we should initiate cancellation based on error logs (only if not already cancellationPending) + if (!cancellationPending) { + long runningTimeMillis = steps * 3000L; + if (messageHandler.hasSeenError() + && (runningTimeMillis > 300000L || runner.expectedNumberOfAssertions == 0)) { LOG.info( - "Found failed assertion for streaming job {}, cancelling job.", - job.getJobId()); + "Cancelling Dataflow job due to error messages seen: {}", + messageHandler.getErrorMessage()); + cancellationPending = true; + } + } + + // Perform or retry cancellation if cancellation is pending + if (cancellationPending) { + try { job.cancel(); - return null; + return null; // Successful cancellation + } catch (Exception e) { + LOG.warn( + "Failed to cancel Dataflow job {}. Will retry on next iteration. Error: {}", + job.getJobId(), + e.getMessage()); } } - } - // If we see a terminal error message and no assertions have passed yet, cancel and fail - long runningTimeMillis = steps * 3000L; - if (messageHandler.hasSeenError() - && !jobState.isTerminal() - && (runningTimeMillis > 300000L || runner.expectedNumberOfAssertions == 0)) { - LOG.info( - "Cancelling Dataflow job due to error messages seen: {}", - messageHandler.getErrorMessage()); - job.cancel(); - return null; + } catch (Exception e) { + LOG.warn( + "Exception in streaming job monitor loop for job {}: {}", + job.getJobId(), + e.getMessage(), + e); } Thread.sleep(3000L); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java index 95dcd88dc234..52ca0e802a98 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java @@ -630,6 +630,8 @@ public void testRunStreamingJobEarlySuccess() throws Exception { .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); runner.run(p, mockRunner); + + Mockito.verify(mockJob, Mockito.timeout(5000)).cancel(); } @Test @@ -653,8 +655,14 @@ public void testRunStreamingJobEarlyFailure() throws Exception { .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */)); TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); - expectedException.expect(AssertionError.class); - runner.run(p, mockRunner); + try { + runner.run(p, mockRunner); + fail("Expected AssertionError to be thrown"); + } catch (AssertionError expected) { + // Expected exception + } + + Mockito.verify(mockJob, Mockito.timeout(5000)).cancel(); } static class TestSuccessMatcher extends BaseMatcher From 824315933ea9f9fadce0c471b96206f4bc0bfaba Mon Sep 17 00:00:00 2001 From: Durgaprasad M L Date: Sat, 30 May 2026 23:49:28 +0530 Subject: [PATCH 3/5] Fix Develocity retry plugin conflict in CI --- buildSrc/build.gradle.kts | 1 - .../org/apache/beam/gradle/BeamModulePlugin.groovy | 9 +++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts index b2fbfee75bec..96b1cb12dd4c 100644 --- a/buildSrc/build.gradle.kts +++ b/buildSrc/build.gradle.kts @@ -78,7 +78,6 @@ dependencies { runtimeOnly("ca.cutterslade.gradle:gradle-dependency-analyze:1.8.3") // Enable dep analysis runtimeOnly("gradle.plugin.net.ossindex:ossindex-gradle-plugin:0.4.11") // Enable dep vulnerability analysis runtimeOnly("org.checkerframework:checkerframework-gradle-plugin:0.6.56") // Enable enhanced static checking plugin - runtimeOnly("org.gradle.test-retry:org.gradle.test-retry.gradle.plugin:1.6.0") } // Because buildSrc is built and tested automatically _before_ gradle diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 8191508981fe..f5eef5e69a44 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1117,7 +1117,6 @@ class BeamModulePlugin implements Plugin { } project.apply plugin: "java" - project.apply plugin: "org.gradle.test-retry" // We create a testRuntimeMigration configuration here to extend // testImplementation, testRuntimeOnly, and default (similar to what @@ -1220,10 +1219,12 @@ class BeamModulePlugin implements Plugin { // default maxHeapSize on gradle 5 is 512m, lets increase to handle more demanding tests maxHeapSize = '2g' + // Develocity Gradle plugin (applied in settings.gradle.kts) provides test retry + // natively. Configure it in CI to retry flaky integration tests. def isCI = System.getenv("GITHUB_ACTIONS") != null || System.getenv("JENKINS_HOME") != null - if (project.plugins.hasPlugin('org.gradle.test-retry')) { - retry { - maxRetries = isCI ? 3 : 0 + if (isCI) { + develocity.testRetry { + maxRetries = 3 maxFailures = 15 failOnPassedAfterRetry = false } From c80628f2fbb3cd4645e84dc92740151d346a2244 Mon Sep 17 00:00:00 2001 From: Durgaprasad M L Date: Sun, 31 May 2026 00:31:35 +0530 Subject: [PATCH 4/5] Stabilize bundle finalization SplittableDoFn streaming tests --- .../beam/gradle/BeamModulePlugin.groovy | 11 ----- .../sdk/transforms/SplittableDoFnTest.java | 40 ++++++++++++++++--- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index f5eef5e69a44..5ca0de9de846 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1218,17 +1218,6 @@ class BeamModulePlugin implements Plugin { useJUnit {} // default maxHeapSize on gradle 5 is 512m, lets increase to handle more demanding tests maxHeapSize = '2g' - - // Develocity Gradle plugin (applied in settings.gradle.kts) provides test retry - // natively. Configure it in CI to retry flaky integration tests. - def isCI = System.getenv("GITHUB_ACTIONS") != null || System.getenv("JENKINS_HOME") != null - if (isCI) { - develocity.testRetry { - maxRetries = 3 - maxFailures = 15 - failOnPassedAfterRetry = false - } - } } List skipDefRegexes = [] diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index 80d8728aa01b..df421b0626e6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -986,6 +986,9 @@ public SplitResult trySplit(double fractionOfRemainder) { }; } + private static final org.slf4j.Logger LOG = + org.slf4j.LoggerFactory.getLogger(BundleFinalizingSplittableDoFn.class); + @ProcessElement public ProcessContinuation process( @Element String element, @@ -995,21 +998,48 @@ public ProcessContinuation process( throws InterruptedException { AtomicBoolean wasFinalized = WAS_FINALIZED.computeIfAbsent(element, (unused) -> new AtomicBoolean()); + + long currentAttempt = tracker.currentRestriction().getFrom(); + + // On subsequent attempts, the previous bundle has committed, so the finalization callback + // should run. Poll wasFinalized with a timed wait to avoid deadlocks on single-threaded executors. + if (currentAttempt > 0 && !wasFinalized.get()) { + long limitMs = 1000; + long start = System.currentTimeMillis(); + while (!wasFinalized.get() && (System.currentTimeMillis() - start) < limitMs) { + sleep(10L); + } + long duration = System.currentTimeMillis() - start; + if (wasFinalized.get()) { + LOG.info( + "Bundle finalization callback observed for element {} after waiting {} ms on attempt {}.", + element, + duration, + currentAttempt); + } else { + LOG.warn( + "Bundle finalization callback not observed for element {} after waiting {} ms on attempt {}. Yielding/resuming.", + element, + duration, + currentAttempt); + } + } + if (wasFinalized.get()) { - tracker.tryClaim(tracker.currentRestriction().getFrom() + 1); + tracker.tryClaim(currentAttempt + 1); receiver.output(element); WAS_FINALIZED.remove(element); // Claim beyond the end now that we know we have been finalized. tracker.tryClaim(Long.MAX_VALUE); return stop(); } - if (tracker.tryClaim(tracker.currentRestriction().getFrom() + 1)) { + + if (tracker.tryClaim(currentAttempt + 1)) { bundleFinalizer.afterBundleCommit( Instant.now().plus(Duration.standardSeconds(FINALIZATION_CALLBACK_TIMEOUT_SECS)), () -> wasFinalized.set(true)); - // We sleep here instead of setting a resume time since the resume time doesn't need to - // be honored. - sleep(100L); + // Return resume immediately. We already waited above on resumes, and for the first attempt, + // we want to commit the first bundle as fast as possible. return resume(); } WAS_FINALIZED.remove(element); From 6e16b2c8935cb986ac9dd195f3806b9f1a36c6f8 Mon Sep 17 00:00:00 2001 From: Durgaprasad M L Date: Sun, 31 May 2026 01:28:54 +0530 Subject: [PATCH 5/5] Fix spotless formatting and SLF4J logging violations --- .../beam/runners/dataflow/TestDataflowRunner.java | 11 +++++------ .../beam/sdk/transforms/SplittableDoFnTest.java | 15 +++++++++------ 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java index 44b1d79c34ac..54bab5427bd7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java @@ -431,9 +431,9 @@ public Void call() throws Exception { } } catch (Exception e) { LOG.warn( - "Transient error polling metrics for job {}: {}", + "Transient error polling metrics for job {}", job.getJobId(), - e.getMessage()); + e); } } } @@ -457,17 +457,16 @@ public Void call() throws Exception { return null; // Successful cancellation } catch (Exception e) { LOG.warn( - "Failed to cancel Dataflow job {}. Will retry on next iteration. Error: {}", + "Failed to cancel Dataflow job {}. Will retry on next iteration. Error", job.getJobId(), - e.getMessage()); + e); } } } catch (Exception e) { LOG.warn( - "Exception in streaming job monitor loop for job {}: {}", + "Exception in streaming job monitor loop for job {}", job.getJobId(), - e.getMessage(), e); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index df421b0626e6..08dcb9cd9296 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -1001,8 +1001,9 @@ public ProcessContinuation process( long currentAttempt = tracker.currentRestriction().getFrom(); - // On subsequent attempts, the previous bundle has committed, so the finalization callback - // should run. Poll wasFinalized with a timed wait to avoid deadlocks on single-threaded executors. + // On subsequent attempts, the previous bundle has committed, so the finalization + // callback should run. Poll wasFinalized with a timed wait to avoid deadlocks + // on single-threaded executors. if (currentAttempt > 0 && !wasFinalized.get()) { long limitMs = 1000; long start = System.currentTimeMillis(); @@ -1012,13 +1013,15 @@ public ProcessContinuation process( long duration = System.currentTimeMillis() - start; if (wasFinalized.get()) { LOG.info( - "Bundle finalization callback observed for element {} after waiting {} ms on attempt {}.", + "Bundle finalization callback observed for element {} after waiting {} ms " + + "on attempt {}.", element, duration, currentAttempt); } else { LOG.warn( - "Bundle finalization callback not observed for element {} after waiting {} ms on attempt {}. Yielding/resuming.", + "Bundle finalization callback not observed for element {} after waiting {} ms " + + "on attempt {}. Yielding/resuming.", element, duration, currentAttempt); @@ -1038,8 +1041,8 @@ public ProcessContinuation process( bundleFinalizer.afterBundleCommit( Instant.now().plus(Duration.standardSeconds(FINALIZATION_CALLBACK_TIMEOUT_SECS)), () -> wasFinalized.set(true)); - // Return resume immediately. We already waited above on resumes, and for the first attempt, - // we want to commit the first bundle as fast as possible. + // Return resume immediately. We already waited above on resumes, and for the + // first attempt, we want to commit the first bundle as fast as possible. return resume(); } WAS_FINALIZED.remove(element);