Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
- name: run validatesRunnerStreaming script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:google-cloud-dataflow-java:validatesRunnerStreaming
gradle-command: :runners:google-cloud-dataflow-java:validatesRunnerStreamingEngine
max-workers: 12
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v7
Expand Down
12 changes: 6 additions & 6 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ def createLegacyWorkerValidatesRunnerTest = { Map args ->

systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions)

// Increase test parallelism up to the number of Gradle workers. By default this is equal
// to the number of CPU cores, but can be increased by setting --max-workers=N.
maxParallelForks Integer.MAX_VALUE
// By default throttle parallelism to 4 to avoid GHA and GCP quota exhaustion.
// Can be overridden via -PmaxParallelForks=N.
maxParallelForks project.findProperty('maxParallelForks') ? (project.findProperty('maxParallelForks') as Integer) : 4
classpath = configurations.validatesRunner
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) +
files(project(project.path).sourceSets.test.output.classesDirs)
Expand Down Expand Up @@ -263,9 +263,9 @@ def createRunnerV2ValidatesRunnerTest = { Map args ->
dependsOn buildAndPushDockerJavaContainer
systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions)

// Increase test parallelism up to the number of Gradle workers. By default this is equal
// to the number of CPU cores, but can be increased by setting --max-workers=N.
maxParallelForks Integer.MAX_VALUE
// By default throttle parallelism to 4 to avoid GHA and GCP quota exhaustion.
// Can be overridden via -PmaxParallelForks=N.
maxParallelForks project.findProperty('maxParallelForks') ? (project.findProperty('maxParallelForks') as Integer) : 4
classpath = configurations.validatesRunner
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs) +
files(project(project.path).sourceSets.test.output.classesDirs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,28 @@ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
ErrorMonitorMessagesHandler messageHandler =
new ErrorMonitorMessagesHandler(job, new MonitoringUtil.LoggingHandler());

java.util.concurrent.atomic.AtomicReference<Optional<Boolean>> assertionsPassedRef =
new java.util.concurrent.atomic.AtomicReference<>(Optional.absent());

if (options.isStreaming()) {
if (options.isBlockOnRun()) {
jobSuccess = waitForStreamingJobTermination(job, messageHandler);
jobSuccess = waitForStreamingJobTermination(job, messageHandler, assertionsPassedRef);
} else {
jobSuccess = true;
}
// No metrics in streaming
allAssertionsPassed = Optional.absent();
allAssertionsPassed = assertionsPassedRef.get();
if (!allAssertionsPassed.isPresent()) {
allAssertionsPassed = checkForPAssertSuccess(job);
}
} else {
jobSuccess = waitForBatchJobTermination(job, messageHandler);
allAssertionsPassed = checkForPAssertSuccess(job);
}

if (allAssertionsPassed.isPresent() && allAssertionsPassed.get()) {
jobSuccess = true;
}

// If there is a certain assertion failure, throw the most precise exception we can.
// There are situations where the metric will not be available, but as long as we recover
// the actionable message from the logs it is acceptable.
Expand All @@ -160,11 +169,13 @@ DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
@SuppressWarnings("FutureReturnValueIgnored") // Job status checked via job.waitUntilFinish
@SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
private boolean waitForStreamingJobTermination(
final DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
final DataflowPipelineJob job,
ErrorMonitorMessagesHandler messageHandler,
java.util.concurrent.atomic.AtomicReference<Optional<Boolean>> assertionsPassedRef) {
// In streaming, there are infinite retries, so rather than timeout
// we try to terminate early by polling and canceling if we see
// an error message
options.getExecutorService().submit(new CancelOnError(job, messageHandler));
// an error message or when all assertions have succeeded
options.getExecutorService().submit(new CancelOnError(job, messageHandler, this, assertionsPassedRef));

// Whether we canceled or not, this gets the final state of the job or times out
State finalState;
Expand Down Expand Up @@ -373,29 +384,94 @@ private static class CancelOnError implements Callable<Void> {

private final DataflowPipelineJob job;
private final ErrorMonitorMessagesHandler messageHandler;

public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
private final TestDataflowRunner runner;
private final java.util.concurrent.atomic.AtomicReference<Optional<Boolean>> assertionsPassedRef;

public CancelOnError(
DataflowPipelineJob job,
ErrorMonitorMessagesHandler messageHandler,
TestDataflowRunner runner,
java.util.concurrent.atomic.AtomicReference<Optional<Boolean>> assertionsPassedRef) {
this.job = job;
this.messageHandler = messageHandler;
this.runner = runner;
this.assertionsPassedRef = assertionsPassedRef;
}

@Override
public Void call() throws Exception {
int checkMetricsIntervalSteps = 5; // Check metrics every 15 seconds (5 * 3s)
int steps = 0;
boolean cancellationPending = false;
while (true) {
State jobState = job.getState();

// If we see an error, cancel and note failure
if (messageHandler.hasSeenError() && !job.getState().isTerminal()) {
job.cancel();
LOG.info("Cancelling Dataflow job {}", job.getJobId());
return null;
}

if (jobState.isTerminal()) {
return null;
try {
State jobState = job.getState();

if (jobState.isTerminal()) {
return null;
}

// Check if we should initiate cancellation based on metrics (only if assertion state is not yet known)
if (!assertionsPassedRef.get().isPresent() && !cancellationPending) {
if (steps % checkMetricsIntervalSteps == 0) {
try {
Optional<Boolean> assertionsPassed = runner.checkForPAssertSuccess(job);
if (assertionsPassed.isPresent()) {
assertionsPassedRef.set(assertionsPassed);
cancellationPending = true;
if (assertionsPassed.get()) {
LOG.info(
"All assertions passed for streaming job {}, cancelling job.",
job.getJobId());
} else {
LOG.info(
"Found failed assertion for streaming job {}, cancelling job.",
job.getJobId());
}
}
} catch (Exception e) {
LOG.warn(
"Transient error polling metrics for job {}",
job.getJobId(),
e);
}
}
}

// Check if we should initiate cancellation based on error logs (only if not already cancellationPending)
if (!cancellationPending) {
long runningTimeMillis = steps * 3000L;
if (messageHandler.hasSeenError()
&& (runningTimeMillis > 300000L || runner.expectedNumberOfAssertions == 0)) {
LOG.info(
"Cancelling Dataflow job due to error messages seen: {}",
messageHandler.getErrorMessage());
cancellationPending = true;
}
}

// Perform or retry cancellation if cancellation is pending
if (cancellationPending) {
try {
job.cancel();
return null; // Successful cancellation
} catch (Exception e) {
LOG.warn(
"Failed to cancel Dataflow job {}. Will retry on next iteration. Error",
job.getJobId(),
e);
}
}

} catch (Exception e) {
LOG.warn(
"Exception in streaming job monitor loop for job {}",
job.getJobId(),
e);
}

Thread.sleep(3000L);
steps++;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,62 @@ public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
// If the onSuccessMatcher were invoked, it would have crashed here with AssertionError
}

@Test
public void testRunStreamingJobEarlySuccess() throws Exception {
options.setStreaming(true);
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);

DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.CANCELLED);
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.CANCELLED);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");

DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);

when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
runner.run(p, mockRunner);

Mockito.verify(mockJob, Mockito.timeout(5000)).cancel();
}
Comment thread
durgaprasadml marked this conversation as resolved.

@Test
public void testRunStreamingJobEarlyFailure() throws Exception {
options.setStreaming(true);
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
PAssert.that(pc).containsInAnyOrder(1, 2, 3);

DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getState()).thenReturn(State.CANCELLED);
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.CANCELLED);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");

DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);

when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);

try {
runner.run(p, mockRunner);
fail("Expected AssertionError to be thrown");
} catch (AssertionError expected) {
// Expected exception
}

Mockito.verify(mockJob, Mockito.timeout(5000)).cancel();
}
Comment thread
durgaprasadml marked this conversation as resolved.

static class TestSuccessMatcher extends BaseMatcher<PipelineResult>
implements SerializableMatcher<PipelineResult> {
private final transient DataflowPipelineJob mockJob;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,9 @@ public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
};
}

private static final org.slf4j.Logger LOG =
org.slf4j.LoggerFactory.getLogger(BundleFinalizingSplittableDoFn.class);

@ProcessElement
public ProcessContinuation process(
@Element String element,
Expand All @@ -995,21 +998,51 @@ public ProcessContinuation process(
throws InterruptedException {
AtomicBoolean wasFinalized =
WAS_FINALIZED.computeIfAbsent(element, (unused) -> new AtomicBoolean());

long currentAttempt = tracker.currentRestriction().getFrom();

// On subsequent attempts, the previous bundle has committed, so the finalization
// callback should run. Poll wasFinalized with a timed wait to avoid deadlocks
// on single-threaded executors.
if (currentAttempt > 0 && !wasFinalized.get()) {
long limitMs = 1000;
long start = System.currentTimeMillis();
while (!wasFinalized.get() && (System.currentTimeMillis() - start) < limitMs) {
sleep(10L);
}
long duration = System.currentTimeMillis() - start;
if (wasFinalized.get()) {
LOG.info(
"Bundle finalization callback observed for element {} after waiting {} ms "
+ "on attempt {}.",
element,
duration,
currentAttempt);
} else {
LOG.warn(
"Bundle finalization callback not observed for element {} after waiting {} ms "
+ "on attempt {}. Yielding/resuming.",
element,
duration,
currentAttempt);
}
}

if (wasFinalized.get()) {
tracker.tryClaim(tracker.currentRestriction().getFrom() + 1);
tracker.tryClaim(currentAttempt + 1);
receiver.output(element);
WAS_FINALIZED.remove(element);
// Claim beyond the end now that we know we have been finalized.
tracker.tryClaim(Long.MAX_VALUE);
return stop();
}
if (tracker.tryClaim(tracker.currentRestriction().getFrom() + 1)) {

if (tracker.tryClaim(currentAttempt + 1)) {
bundleFinalizer.afterBundleCommit(
Instant.now().plus(Duration.standardSeconds(FINALIZATION_CALLBACK_TIMEOUT_SECS)),
() -> wasFinalized.set(true));
// We sleep here instead of setting a resume time since the resume time doesn't need to
// be honored.
sleep(100L);
// Return resume immediately. We already waited above on resumes, and for the
// first attempt, we want to commit the first bundle as fast as possible.
return resume();
}
WAS_FINALIZED.remove(element);
Expand Down
Loading