diff --git a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.yml b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.yml index f70cbf0db32f..aced5a641b08 100644 --- a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.yml +++ b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.yml @@ -79,7 +79,7 @@ jobs: - name: run validatesRunnerStreaming script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :runners:google-cloud-dataflow-java:validatesRunnerStreaming + gradle-command: :runners:google-cloud-dataflow-java:validatesRunnerStreamingEngine max-workers: 12 - name: Archive JUnit Test Results uses: actions/upload-artifact@v7 diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 943a01daa7d0..a7f79869531c 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -230,9 +230,9 @@ def createLegacyWorkerValidatesRunnerTest = { Map args -> systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions) - // Increase test parallelism up to the number of Gradle workers. By default this is equal - // to the number of CPU cores, but can be increased by setting --max-workers=N. - maxParallelForks Integer.MAX_VALUE + // By default throttle parallelism to 4 to avoid GHA and GCP quota exhaustion. + // Can be overridden via -PmaxParallelForks=N. + maxParallelForks project.findProperty('maxParallelForks') ? (project.findProperty('maxParallelForks') as Integer) : 4 classpath = configurations.validatesRunner testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) + files(project(project.path).sourceSets.test.output.classesDirs) @@ -263,9 +263,9 @@ def createRunnerV2ValidatesRunnerTest = { Map args -> dependsOn buildAndPushDockerJavaContainer systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions) - // Increase test parallelism up to the number of Gradle workers. By default this is equal - // to the number of CPU cores, but can be increased by setting --max-workers=N. - maxParallelForks Integer.MAX_VALUE + // By default throttle parallelism to 4 to avoid GHA and GCP quota exhaustion. + // Can be overridden via -PmaxParallelForks=N. + maxParallelForks project.findProperty('maxParallelForks') ? (project.findProperty('maxParallelForks') as Integer) : 4 classpath = configurations.validatesRunner testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) + files(project(project.path).sourceSets.test.output.classesDirs) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java index b00194dacb08..54bab5427bd7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java @@ -121,19 +121,28 @@ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) { ErrorMonitorMessagesHandler messageHandler = new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler()); + java.util.concurrent.atomic.AtomicReference> assertionsPassedRef = + new java.util.concurrent.atomic.AtomicReference<>(Optional.absent()); + if (options.isStreaming()) { if (options.isBlockOnRun()) { - jobSuccess = waitForStreamingJobTermination(job, messageHandler); + jobSuccess = waitForStreamingJobTermination(job, messageHandler, assertionsPassedRef); } else { jobSuccess = true; } - // No metrics in streaming - allAssertionsPassed = Optional.absent(); + allAssertionsPassed = assertionsPassedRef.get(); + if (!allAssertionsPassed.isPresent()) { + allAssertionsPassed = checkForPAssertSuccess(job); + } } else { jobSuccess = waitForBatchJobTermination(job, messageHandler); allAssertionsPassed = checkForPAssertSuccess(job); } + if (allAssertionsPassed.isPresent() && allAssertionsPassed.get()) { + jobSuccess = true; + } + // If there is a certain assertion failure, throw the most precise exception we can. // There are situations where the metric will not be available, but as long as we recover // the actionable message from the logs it is acceptable. @@ -160,11 +169,13 @@ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) { @SuppressWarnings("FutureReturnValueIgnored") // Job status checked via job.waitUntilFinish @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE") private boolean waitForStreamingJobTermination( - final DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) { + final DataflowPipelineJob job, + ErrorMonitorMessagesHandler messageHandler, + java.util.concurrent.atomic.AtomicReference> assertionsPassedRef) { // In streaming, there are infinite retries, so rather than timeout // we try to terminate early by polling and canceling if we see - // an error message - options.getExecutorService().submit(new CancelOnError(job, messageHandler)); + // an error message or when all assertions have succeeded + options.getExecutorService().submit(new CancelOnError(job, messageHandler, this, assertionsPassedRef)); // Whether we canceled or not, this gets the final state of the job or times out State finalState; @@ -373,29 +384,94 @@ private static class CancelOnError implements Callable { private final DataflowPipelineJob job; private final ErrorMonitorMessagesHandler messageHandler; - - public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) { + private final TestDataflowRunner runner; + private final java.util.concurrent.atomic.AtomicReference> assertionsPassedRef; + + public CancelOnError( + DataflowPipelineJob job, + ErrorMonitorMessagesHandler messageHandler, + TestDataflowRunner runner, + java.util.concurrent.atomic.AtomicReference> assertionsPassedRef) { this.job = job; this.messageHandler = messageHandler; + this.runner = runner; + this.assertionsPassedRef = assertionsPassedRef; } @Override public Void call() throws Exception { + int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds (5 * 3s) + int steps = 0; + boolean cancellationPending = false; while (true) { - State jobState = job.getState(); - - // If we see an error, cancel and note failure - if (messageHandler.hasSeenError() && !job.getState().isTerminal()) { - job.cancel(); - LOG.info("Cancelling Dataflow job {}", job.getJobId()); - return null; - } - - if (jobState.isTerminal()) { - return null; + try { + State jobState = job.getState(); + + if (jobState.isTerminal()) { + return null; + } + + // Check if we should initiate cancellation based on metrics (only if assertion state is not yet known) + if (!assertionsPassedRef.get().isPresent() && !cancellationPending) { + if (steps % checkMetricsIntervalSteps == 0) { + try { + Optional assertionsPassed = runner.checkForPAssertSuccess(job); + if (assertionsPassed.isPresent()) { + assertionsPassedRef.set(assertionsPassed); + cancellationPending = true; + if (assertionsPassed.get()) { + LOG.info( + "All assertions passed for streaming job {}, cancelling job.", + job.getJobId()); + } else { + LOG.info( + "Found failed assertion for streaming job {}, cancelling job.", + job.getJobId()); + } + } + } catch (Exception e) { + LOG.warn( + "Transient error polling metrics for job {}", + job.getJobId(), + e); + } + } + } + + // Check if we should initiate cancellation based on error logs (only if not already cancellationPending) + if (!cancellationPending) { + long runningTimeMillis = steps * 3000L; + if (messageHandler.hasSeenError() + && (runningTimeMillis > 300000L || runner.expectedNumberOfAssertions == 0)) { + LOG.info( + "Cancelling Dataflow job due to error messages seen: {}", + messageHandler.getErrorMessage()); + cancellationPending = true; + } + } + + // Perform or retry cancellation if cancellation is pending + if (cancellationPending) { + try { + job.cancel(); + return null; // Successful cancellation + } catch (Exception e) { + LOG.warn( + "Failed to cancel Dataflow job {}. Will retry on next iteration. Error", + job.getJobId(), + e); + } + } + + } catch (Exception e) { + LOG.warn( + "Exception in streaming job monitor loop for job {}", + job.getJobId(), + e); } Thread.sleep(3000L); + steps++; } } } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java index ed6259a3ee22..52ca0e802a98 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java @@ -609,6 +609,62 @@ public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception { // If the onSuccessMatcher were invoked, it would have crashed here with AssertionError } + @Test + public void testRunStreamingJobEarlySuccess() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.CANCELLED); + when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) + .thenReturn(State.CANCELLED); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + runner.run(p, mockRunner); + + Mockito.verify(mockJob, Mockito.timeout(5000)).cancel(); + } + + @Test + public void testRunStreamingJobEarlyFailure() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getState()).thenReturn(State.CANCELLED); + when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class))) + .thenReturn(State.CANCELLED); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(mockClient.getJobMetrics(anyString())) + .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */)); + TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient); + + try { + runner.run(p, mockRunner); + fail("Expected AssertionError to be thrown"); + } catch (AssertionError expected) { + // Expected exception + } + + Mockito.verify(mockJob, Mockito.timeout(5000)).cancel(); + } + static class TestSuccessMatcher extends BaseMatcher implements SerializableMatcher { private final transient DataflowPipelineJob mockJob; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index 80d8728aa01b..08dcb9cd9296 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -986,6 +986,9 @@ public SplitResult trySplit(double fractionOfRemainder) { }; } + private static final org.slf4j.Logger LOG = + org.slf4j.LoggerFactory.getLogger(BundleFinalizingSplittableDoFn.class); + @ProcessElement public ProcessContinuation process( @Element String element, @@ -995,21 +998,51 @@ public ProcessContinuation process( throws InterruptedException { AtomicBoolean wasFinalized = WAS_FINALIZED.computeIfAbsent(element, (unused) -> new AtomicBoolean()); + + long currentAttempt = tracker.currentRestriction().getFrom(); + + // On subsequent attempts, the previous bundle has committed, so the finalization + // callback should run. Poll wasFinalized with a timed wait to avoid deadlocks + // on single-threaded executors. + if (currentAttempt > 0 && !wasFinalized.get()) { + long limitMs = 1000; + long start = System.currentTimeMillis(); + while (!wasFinalized.get() && (System.currentTimeMillis() - start) < limitMs) { + sleep(10L); + } + long duration = System.currentTimeMillis() - start; + if (wasFinalized.get()) { + LOG.info( + "Bundle finalization callback observed for element {} after waiting {} ms " + + "on attempt {}.", + element, + duration, + currentAttempt); + } else { + LOG.warn( + "Bundle finalization callback not observed for element {} after waiting {} ms " + + "on attempt {}. Yielding/resuming.", + element, + duration, + currentAttempt); + } + } + if (wasFinalized.get()) { - tracker.tryClaim(tracker.currentRestriction().getFrom() + 1); + tracker.tryClaim(currentAttempt + 1); receiver.output(element); WAS_FINALIZED.remove(element); // Claim beyond the end now that we know we have been finalized. tracker.tryClaim(Long.MAX_VALUE); return stop(); } - if (tracker.tryClaim(tracker.currentRestriction().getFrom() + 1)) { + + if (tracker.tryClaim(currentAttempt + 1)) { bundleFinalizer.afterBundleCommit( Instant.now().plus(Duration.standardSeconds(FINALIZATION_CALLBACK_TIMEOUT_SECS)), () -> wasFinalized.set(true)); - // We sleep here instead of setting a resume time since the resume time doesn't need to - // be honored. - sleep(100L); + // Return resume immediately. We already waited above on resumes, and for the + // first attempt, we want to commit the first bundle as fast as possible. return resume(); } WAS_FINALIZED.remove(element);