GOBBLIN-2265: Reliable terminal job-completion status for temporal-on-YARN (AM final status + launcher hooks + exit codes + prompt un-register)#4197
Conversation
2029579 to
f5e17bf
Compare
f5e17bf to
190cad9
Compare
820af02 to
6e617be
Compare
…-YARN (AM final status + launcher hooks + exit codes) Make a temporal-on-YARN job's true outcome reliably observable end-to-end without depending on a fragile JVM shutdown hook, and expose clean extension points for a single terminal-GTE source. - AM un-registers with a FinalApplicationStatus derived from the Temporal workflow outcome (COMPLETED->SUCCEEDED, CANCELED->KILLED, else FAILED) instead of an unconditional SUCCEEDED, so a launcher polling the ApplicationReport sees the real result. Kept in lockstep with the AM JVM exit code via mapWorkflowStatusToFinalAppStatus/computeExitCode. - Remove the unreliable JVM-shutdown-hook GTE emitter (RootMetricContext closes the Kafka reporters in its own shutdown hook, so the GTE could be silently dropped). Status capture and exit-code propagation are retained. - Restore the in-workflow JOB_SUCCEEDED/JOB_FAILED GTEs and gate them behind a new gobblin.temporal.job.completion.gte.emission.enabled flag (default true), so a standalone OSS deployment stays self-complete; deployments that designate a launcher subclass as the single GTE source set it false. - GobblinYarnAppLauncher: surface FAILED/KILLED/UNDEFINED, lost-AM-visibility, and never-launched applications as a non-zero launcher exit code, and expose protected no-op hooks (onTerminalApplicationStatus / onLostAmVisibility / onApplicationLaunchFailure) plus getApplicationId/getApplicationName/getConfig accessors so a subclass can emit the single terminal GTE. The OSS launcher itself emits no GTE. - On graceful shutdown, force-kill the YARN application after the wait so the RM does not re-attempt the AM on cancel. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
6e617be to
bfb3a20
Compare
…p debug logging Fix a notify race in the temporal AM shutdown path: shutDown() waits on allContainersStopped, but the two callbacks that observe a container going away (AMRM onContainersCompleted -> handleContainerCompletion, and NM onContainerStopped) did not reliably notify the waiter. DynamicScalingYarnService additionally early-returned on the shutdownInProgress branch without notifying. As a result the waiter slept the full wait timeout, delaying unregisterApplicationMaster by minutes and causing YARN to fail the attempt even on a successful workflow. - Add YarnService.notifyIfAllContainersStopped(); call it from handleContainerCompletion and onContainerStopped (base) and from both early-return paths of DynamicScalingYarnService.handleContainerCompletion. - Reduce the container-stop fallback wait from 5m to 2m (the notify makes the normal path immediate; this is only a missed-notify backstop). - Replace the temporary debug markers with normal INFO-level logging. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #4197 +/- ##
============================================
+ Coverage 43.26% 48.53% +5.26%
- Complexity 2583 8837 +6254
============================================
Files 516 1618 +1102
Lines 22087 65199 +43112
Branches 2505 7351 +4846
============================================
+ Hits 9557 31642 +22085
- Misses 11566 30747 +19181
- Partials 964 2810 +1846 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…larify terminal GTE is emitted by the Temporal worker, not the AM - Route writes to the static lastTerminalStatus cache through a private static setter (GobblinTemporalJobLauncher) so FindBugs ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD no longer fires. - Rename isAmTerminalGteEmissionEnabled -> isWorkflowTerminalGteEmissionEnabled and correct the javadoc/comments: ExecuteGobblinWorkflowImpl runs on a Temporal worker, not the YARN AM. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR improves end-to-end observability and correctness of terminal job outcomes for temporal-on-YARN by ensuring the YARN FinalApplicationStatus, AM JVM exit code, and launcher exit code/hook dispatch align with the actual Temporal workflow outcome, while also fixing an AM shutdown wait/notify race that could delay YARN un-registration.
Changes:
- Derive and propagate true terminal outcome via AM exit codes + YARN
FinalApplicationStatus(based on captured Temporal workflow status). - Add launcher-side
exitCodepropagation and protected terminal hooks (plus focused unit tests) intended for subclasses to emit a single terminal event. - Fix container-stop notification paths to avoid multi-minute un-register delays; add a feature flag to gate workflow-side terminal GTE emission.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java | Adds launcher exit-code propagation + terminal hooks and integrates them into terminal-report and failure paths. |
| gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTerminalGteTest.java | New unit tests for launcher terminal-status mapping, exit-code wiring, and hook dispatch/guarding. |
| gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java | Captures workflow terminal status for reuse by AM exit code + YARN un-register status; adds mapping helpers. |
| gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java | Adds tests for workflow-status→final-status mapping, exit codes, and status-capture caching/reset behavior. |
| gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java | Uses captured terminal status to un-register with correct FinalApplicationStatus; adds notify helper to wake shutdown waiter. |
| gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java | Ensures shutdown waiter is notified even on early-return/edge container-completion paths. |
| gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java | Exits AM JVM with code derived from captured workflow terminal status. |
| gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java | Adds feature flag to gate workflow-side terminal job-completion GTE emission. |
| gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java | Gates terminal success/failure GTE emission on the new feature flag. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| try { | ||
| gobblinYarnAppLauncher.launch(); | ||
| } catch (Exception e) { | ||
| // launch() already invoked handleApplicationLaunchFailure (sets exitCode=1, emits terminal GTE) before | ||
| // rethrowing. Swallow here so we reach the explicit System.exit below with the non-zero code. | ||
| LOGGER.error("GobblinYarnAppLauncher launch failed", e); | ||
| } | ||
|
|
||
| // Surface AM-level failures (FAILED/KILLED/UNDEFINED FinalApplicationStatus, lost-AM-visibility on | ||
| // exhausted report fetches, or a never-launched application) as a non-zero launcher process exit so | ||
| // GGW/Grid Gateway dashboards see them end-to-end. SUCCEEDED keeps exitCode at the field's default of 0. | ||
| int finalExitCode = gobblinYarnAppLauncher.getExitCode(); | ||
| LOGGER.info("GobblinYarnAppLauncher exiting with code {}", finalExitCode); | ||
| System.exit(finalExitCode); |
| return response.getWorkflowExecutionInfo().getStatus(); | ||
| } catch (Exception e) { | ||
| log.warn("Failed to describe workflow {} for completion GTE; treating as UNSPECIFIED (will emit JOB_FAILED)", | ||
| this.workflowId, e); | ||
| return WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED; | ||
| } |
There was a problem hiding this comment.
the blanket exception catch will lead to wrong GTE in case of temporal is momentarily down / namespace limit exceeded exception
DEADLINE_EXCEEDED
UNAVAILABLE
RESOURCE_EXHAUSTED
can we see if we can distinguish between them ?
There was a problem hiding this comment.
good catch.let me check ways for handling it
There was a problem hiding this comment.
Thanks — addressed in 11ba7500a. fetchWorkflowStatus() now retries on transient gRPC StatusRuntimeException codes (UNAVAILABLE, DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED) with bounded linear backoff (3 attempts) before falling back; non-retryable codes fall back immediately.
One inherent caveat: if Temporal is still unreachable after the retries, the true status is genuinely unknowable, so we still fall back to UNSPECIFIED → JOB_FAILED rather than emit nothing (leaving GaaS without any terminal state would be worse). Happy to tune the attempt count / backoff. Leaving this open for your re-review.
There was a problem hiding this comment.
Update in 03e7bf9e0: on a closer look, the Temporal service stubs are already built with RpcRetryOptions (TemporalWorkflowClientFactory#buildRpcRetryOptions, tunable via gobblin.temporal.rpc.retry.options.*), so transient gRPC codes (UNAVAILABLE / DEADLINE_EXCEEDED / RESOURCE_EXHAUSTED) are already retried with backoff at the stub layer before this catch is ever reached. I've therefore dropped the manual retry loop + Thread.sleep I'd added in the previous commit and now rely on that existing, configurable retry — the catch only handles the genuinely-unreachable-after-retries fallback to UNSPECIFIED. Happy to add describe-specific tuning if you'd prefer it over the shared stub options.
There was a problem hiding this comment.
I guess temporal sdk already internally retries on its apis,, that should be applicable here
| } | ||
| sendGracefulShutdownSignal(amContainerId.get()); | ||
| pollForApplicationCompletionUntil(timeoutMs); | ||
| // Always kill so the RM cannot re-attempt the AM (no-op if the app already finished). If the terminal |
There was a problem hiding this comment.
will this be a no-op or we will see some exceptions in log ?
There was a problem hiding this comment.
killApplication on an already-finished application is idempotent on the RM side — it returns without throwing (no YarnException), so it is effectively a no-op and won't produce error logs; the only line emitted is our own preceding LOGGER.info(... force-killing ...). In 11ba7500a I also now re-check the report just before this call and dispatch the real terminal status if the app already finished, so for a self-completed app we no longer rely on the kill for the outcome at all. Leaving open for your confirmation.
…tatus - GobblinTemporalJobLauncher.fetchWorkflowStatus: retry transient gRPC failures (UNAVAILABLE / DEADLINE_EXCEEDED / RESOURCE_EXHAUSTED) with bounded linear backoff before falling back to UNSPECIFIED, so a momentary Temporal outage is not mis-reported as a terminal JOB_FAILED GTE. - temporal YarnService.shutDown: re-check containerMap emptiness inside the allContainersStopped monitor and loop on the condition, fixing a lost-wakeup where a notify firing between the outer check and wait() stalled the AM un-register for the full 2-minute timeout. Also correct the getFinalApplicationStatusForUnregister Javadoc (a null captured status maps to FAILED, not SUCCEEDED). - GobblinYarnAppLauncher.signalGracefulShutdownAndWaitForTerminal: re-fetch the ApplicationReport after polling and dispatch the real terminal status if the app already finished, instead of an unconditional synthetic KILLED. Document that onTerminalApplicationStatus may receive a null ApplicationReport. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…configurable container-stop wait; skip kill when app already finished) - fetchWorkflowStatus: remove the manual retry loop + Thread.sleep. Transient gRPC failures (UNAVAILABLE / DEADLINE_EXCEEDED / RESOURCE_EXHAUSTED) are already retried with backoff by the Temporal service stubs' configured RpcRetryOptions (gobblin.temporal.rpc.retry.options.*); the catch now only handles the post-retry-exhausted fallback to UNSPECIFIED. - YarnService.shutDown: make the bounded container-stop wait configurable via gobblin.temporal.containers.stop.wait.timeout.minutes (default 2). - GobblinYarnAppLauncher.signalGracefulShutdownAndWaitForTerminal: only force-kill when the app is not already finished; if it already reached a terminal state, dispatch the real status and skip the kill. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
testGracefulShutdownSendsSignalAndPolls expected getApplicationReport to be invoked twice, but signalGracefulShutdownAndWaitForTerminal now re-fetches the report once after the poll to decide between surfacing the real terminal status and force-killing. Update the expected count to 3 and assert that the kill is skipped when the application is already terminal. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Summary
Makes a temporal-on-YARN job's true outcome reliably observable end-to-end without a fragile JVM shutdown hook, exposes clean extension points so a single launcher subclass can be the sole terminal-
GobblinTrackingEvent(GTE) source, and fixes a shutdown race that delayed the AM's YARN un-register by minutes. (Reworks the earlier shutdown-hook approach after finding it could silently drop the GTE.)Tracks GOBBLIN-2265.
Problem
ApplicationMasterun-registered with a hardcodedFinalApplicationStatus.SUCCEEDED, so a launcher polling theApplicationReportsaw SUCCEEDED even when the workflow FAILED.RootMetricContextregisters its own shutdown hook that closes the Kafka event reporters, and hook ordering is non-deterministic, so the GTE could be dropped.YarnService.shutDown()waits onallContainersStopped, but the last container's removal did not reliably notify that monitor (the AMRMonContainersCompletedpath never notified, andDynamicScalingYarnServiceadditionally early-returned on theshutdownInProgressbranch without notifying). The waiter then slept the full timeout, sounregisterApplicationMasterfired minutes late and YARN failed the attempt even for a successful workflow.Changes
YarnServiceun-registers with aFinalApplicationStatusderived from the captured Temporal workflow status (COMPLETED→SUCCEEDED,CANCELED→KILLED, elseFAILED), kept in lockstep with the AM JVM exit code (mapWorkflowStatusToFinalAppStatus/computeExitCode).true) —gobblin.temporal.job.completion.gte.emission.enabledgates the AM in-workflowJOB_SUCCEEDED/JOB_FAILED, so OSS stays self-complete; deployments that designate a launcher subclass as the single GTE source set itfalse.GobblinYarnAppLauncher— exit code + hooks, no emission. Surfaces FAILED/KILLED/UNDEFINED, lost-AM-visibility, and never-launched applications as a non-zero exit code, and exposes protected no-op hooks (onTerminalApplicationStatus/onLostAmVisibility/onApplicationLaunchFailure) plusgetApplicationId/getApplicationName/getConfigaccessors. The OSS launcher itself emits no GTE — a subclass overrides the hooks to emit the single terminal GTE.YarnService.notifyIfAllContainersStopped()and call it fromhandleContainerCompletionandonContainerStopped, and from both early-return paths ofDynamicScalingYarnService.handleContainerCompletion— so whichever callback removes the last container wakesshutDown()immediately. Reduced the fallback wait from 5m to 2m (now only a missed-notify backstop). In testing this collapsed the time-to-un-register from ~8 min to seconds after the last container stops.Downstream idempotency
KafkaJobStatusMonitoronly fires the observability event / REEVALUATE dag action on the first transition into a finished status (isNewStateTransitionToFinal), so a duplicate terminal GTE (or a Kafka redelivery) for the same flow execution is a no-op — no source-side dedup is required for correctness.Testing
gobblin-temporalandgobblin-yarncompile clean; unit tests pass, incl. newmapWorkflowStatusToFinalAppStatuscases and launcher exit-code/hook-dispatch tests.COMPLETED→ un-registerSUCCEEDED, AM exit0; a permission-denied source produced workflowFAILED→ un-registerFAILED, AM exit1. With the shutdown fix, the un-register fired within seconds of the last container stopping instead of waiting out the timeout.Note
The LinkedIn-internal
RobinGobblinYarnAppLauncher(separate repo) overrides the new hooks to be the single terminal-GTE source; that change lands in a follow-up PR after this publishes.🤖 Generated with Claude Code