Skip to content

GOBBLIN-2265: Reliable terminal job-completion status for temporal-on-YARN (AM final status + launcher hooks + exit codes + prompt un-register)#4197

Open
pratapaditya04 wants to merge 6 commits into
apache:masterfrom
pratapaditya04:pratapaditya04/temporal-am-job-completion-gte-v2
Open

GOBBLIN-2265: Reliable terminal job-completion status for temporal-on-YARN (AM final status + launcher hooks + exit codes + prompt un-register)#4197
pratapaditya04 wants to merge 6 commits into
apache:masterfrom
pratapaditya04:pratapaditya04/temporal-am-job-completion-gte-v2

Conversation

@pratapaditya04
Copy link
Copy Markdown
Contributor

@pratapaditya04 pratapaditya04 commented May 26, 2026

Summary

Makes a temporal-on-YARN job's true outcome reliably observable end-to-end without a fragile JVM shutdown hook, exposes clean extension points so a single launcher subclass can be the sole terminal-GobblinTrackingEvent (GTE) source, and fixes a shutdown race that delayed the AM's YARN un-register by minutes. (Reworks the earlier shutdown-hook approach after finding it could silently drop the GTE.)

Tracks GOBBLIN-2265.

Problem

  1. The YARN ApplicationMaster un-registered with a hardcoded FinalApplicationStatus.SUCCEEDED, so a launcher polling the ApplicationReport saw SUCCEEDED even when the workflow FAILED.
  2. An earlier attempt emitted the terminal GTE from a JVM shutdown hook — but RootMetricContext registers its own shutdown hook that closes the Kafka event reporters, and hook ordering is non-deterministic, so the GTE could be dropped.
  3. The AM and launcher returned success exit codes in some failure scenarios, masking failures on dashboards.
  4. AM un-register was delayed by minutes on shutdown. YarnService.shutDown() waits on allContainersStopped, but the last container's removal did not reliably notify that monitor (the AMRM onContainersCompleted path never notified, and DynamicScalingYarnService additionally early-returned on the shutdownInProgress branch without notifying). The waiter then slept the full timeout, so unregisterApplicationMaster fired minutes late and YARN failed the attempt even for a successful workflow.

Changes

  • AM reports the real outcome to YARN. The temporal YarnService un-registers with a FinalApplicationStatus derived from the captured Temporal workflow status (COMPLETED→SUCCEEDED, CANCELED→KILLED, else FAILED), kept in lockstep with the AM JVM exit code (mapWorkflowStatusToFinalAppStatus / computeExitCode).
  • Removed the unreliable shutdown-hook GTE emitter; kept status capture + exit-code propagation.
  • Feature flag (default true)gobblin.temporal.job.completion.gte.emission.enabled gates the AM in-workflow JOB_SUCCEEDED/JOB_FAILED, so OSS stays self-complete; deployments that designate a launcher subclass as the single GTE source set it false.
  • GobblinYarnAppLauncher — exit code + hooks, no emission. Surfaces FAILED/KILLED/UNDEFINED, lost-AM-visibility, and never-launched applications as a non-zero exit code, and exposes protected no-op hooks (onTerminalApplicationStatus / onLostAmVisibility / onApplicationLaunchFailure) plus getApplicationId/getApplicationName/getConfig accessors. The OSS launcher itself emits no GTE — a subclass overrides the hooks to emit the single terminal GTE.
  • Prompt AM un-register on shutdown. Added YarnService.notifyIfAllContainersStopped() and call it from handleContainerCompletion and onContainerStopped, and from both early-return paths of DynamicScalingYarnService.handleContainerCompletion — so whichever callback removes the last container wakes shutDown() immediately. Reduced the fallback wait from 5m to 2m (now only a missed-notify backstop). In testing this collapsed the time-to-un-register from ~8 min to seconds after the last container stops.

Downstream idempotency

KafkaJobStatusMonitor only fires the observability event / REEVALUATE dag action on the first transition into a finished status (isNewStateTransitionToFinal), so a duplicate terminal GTE (or a Kafka redelivery) for the same flow execution is a no-op — no source-side dedup is required for correctness.

Testing

  • gobblin-temporal and gobblin-yarn compile clean; unit tests pass, incl. new mapWorkflowStatusToFinalAppStatus cases and launcher exit-code/hook-dispatch tests.
  • End-to-end on a prod-ltx1 GaaS distcp flow: workflow COMPLETED → un-register SUCCEEDED, AM exit 0; a permission-denied source produced workflow FAILED → un-register FAILED, AM exit 1. With the shutdown fix, the un-register fired within seconds of the last container stopping instead of waiting out the timeout.

Note

The LinkedIn-internal RobinGobblinYarnAppLauncher (separate repo) overrides the new hooks to be the single terminal-GTE source; that change lands in a follow-up PR after this publishes.

🤖 Generated with Claude Code

@pratapaditya04 pratapaditya04 force-pushed the pratapaditya04/temporal-am-job-completion-gte-v2 branch from 2029579 to f5e17bf Compare June 1, 2026 08:03
@pratapaditya04 pratapaditya04 changed the title GOBBLIN-XXXX: GGW-emitted terminal GTE on AM failure + AM/launcher exit-code propagation GOBBLIN-XXXX: Reliable terminal job-completion status for temporal-on-YARN (AM final status + launcher GTE hooks + exit codes) Jun 1, 2026
@pratapaditya04 pratapaditya04 force-pushed the pratapaditya04/temporal-am-job-completion-gte-v2 branch from f5e17bf to 190cad9 Compare June 1, 2026 08:27
@pratapaditya04 pratapaditya04 changed the title GOBBLIN-XXXX: Reliable terminal job-completion status for temporal-on-YARN (AM final status + launcher GTE hooks + exit codes) GOBBLIN-XXXX: Reliable terminal job-completion status for temporal-on-YARN (AM final status + launcher hooks + exit codes) Jun 1, 2026
@pratapaditya04 pratapaditya04 force-pushed the pratapaditya04/temporal-am-job-completion-gte-v2 branch 3 times, most recently from 820af02 to 6e617be Compare June 1, 2026 09:20
…-YARN (AM final status + launcher hooks + exit codes)

Make a temporal-on-YARN job's true outcome reliably observable end-to-end without depending on a
fragile JVM shutdown hook, and expose clean extension points for a single terminal-GTE source.

- AM un-registers with a FinalApplicationStatus derived from the Temporal workflow outcome
  (COMPLETED->SUCCEEDED, CANCELED->KILLED, else FAILED) instead of an unconditional SUCCEEDED, so a
  launcher polling the ApplicationReport sees the real result. Kept in lockstep with the AM JVM exit
  code via mapWorkflowStatusToFinalAppStatus/computeExitCode.
- Remove the unreliable JVM-shutdown-hook GTE emitter (RootMetricContext closes the Kafka reporters
  in its own shutdown hook, so the GTE could be silently dropped). Status capture and exit-code
  propagation are retained.
- Restore the in-workflow JOB_SUCCEEDED/JOB_FAILED GTEs and gate them behind a new
  gobblin.temporal.job.completion.gte.emission.enabled flag (default true), so a standalone OSS
  deployment stays self-complete; deployments that designate a launcher subclass as the single GTE
  source set it false.
- GobblinYarnAppLauncher: surface FAILED/KILLED/UNDEFINED, lost-AM-visibility, and never-launched
  applications as a non-zero launcher exit code, and expose protected no-op hooks
  (onTerminalApplicationStatus / onLostAmVisibility / onApplicationLaunchFailure) plus
  getApplicationId/getApplicationName/getConfig accessors so a subclass can emit the single terminal
  GTE. The OSS launcher itself emits no GTE.
- On graceful shutdown, force-kill the YARN application after the wait so the RM does not re-attempt
  the AM on cancel.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@pratapaditya04 pratapaditya04 force-pushed the pratapaditya04/temporal-am-job-completion-gte-v2 branch from 6e617be to bfb3a20 Compare June 1, 2026 09:35
…p debug logging

Fix a notify race in the temporal AM shutdown path: shutDown() waits on
allContainersStopped, but the two callbacks that observe a container going
away (AMRM onContainersCompleted -> handleContainerCompletion, and NM
onContainerStopped) did not reliably notify the waiter. DynamicScalingYarnService
additionally early-returned on the shutdownInProgress branch without notifying.
As a result the waiter slept the full wait timeout, delaying
unregisterApplicationMaster by minutes and causing YARN to fail the attempt
even on a successful workflow.

- Add YarnService.notifyIfAllContainersStopped(); call it from
  handleContainerCompletion and onContainerStopped (base) and from both
  early-return paths of DynamicScalingYarnService.handleContainerCompletion.
- Reduce the container-stop fallback wait from 5m to 2m (the notify makes the
  normal path immediate; this is only a missed-notify backstop).
- Replace the temporary debug markers with normal INFO-level logging.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@pratapaditya04 pratapaditya04 changed the title GOBBLIN-XXXX: Reliable terminal job-completion status for temporal-on-YARN (AM final status + launcher hooks + exit codes) GOBBLIN-2265: Reliable terminal job-completion status for temporal-on-YARN (AM final status + launcher hooks + exit codes + prompt un-register) Jun 1, 2026
@pratapaditya04 pratapaditya04 marked this pull request as ready for review June 1, 2026 17:32
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Jun 2, 2026

Codecov Report

❌ Patch coverage is 39.20000% with 76 lines in your changes missing coverage. Please review.
✅ Project coverage is 48.53%. Comparing base (26406a8) to head (57cfed4).
⚠️ Report is 13 commits behind head on master.

Files with missing lines Patch % Lines
...rg/apache/gobblin/yarn/GobblinYarnAppLauncher.java 39.70% 37 Missing and 4 partials ⚠️
.../org/apache/gobblin/temporal/yarn/YarnService.java 0.00% 16 Missing ⚠️
.../ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java 0.00% 6 Missing ⚠️
...mporal/joblauncher/GobblinTemporalJobLauncher.java 78.57% 5 Missing and 1 partial ⚠️
...emporal/yarn/GobblinTemporalApplicationMaster.java 0.00% 5 Missing ⚠️
...bblin/temporal/yarn/DynamicScalingYarnService.java 0.00% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #4197      +/-   ##
============================================
+ Coverage     43.26%   48.53%   +5.26%     
- Complexity     2583     8837    +6254     
============================================
  Files           516     1618    +1102     
  Lines         22087    65199   +43112     
  Branches       2505     7351    +4846     
============================================
+ Hits           9557    31642   +22085     
- Misses        11566    30747   +19181     
- Partials        964     2810    +1846     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

…larify terminal GTE is emitted by the Temporal worker, not the AM

- Route writes to the static lastTerminalStatus cache through a private static setter
  (GobblinTemporalJobLauncher) so FindBugs ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD no longer fires.
- Rename isAmTerminalGteEmissionEnabled -> isWorkflowTerminalGteEmissionEnabled and correct the
  javadoc/comments: ExecuteGobblinWorkflowImpl runs on a Temporal worker, not the YARN AM.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves end-to-end observability and correctness of terminal job outcomes for temporal-on-YARN by ensuring the YARN FinalApplicationStatus, AM JVM exit code, and launcher exit code/hook dispatch align with the actual Temporal workflow outcome, while also fixing an AM shutdown wait/notify race that could delay YARN un-registration.

Changes:

  • Derive and propagate true terminal outcome via AM exit codes + YARN FinalApplicationStatus (based on captured Temporal workflow status).
  • Add launcher-side exitCode propagation and protected terminal hooks (plus focused unit tests) intended for subclasses to emit a single terminal event.
  • Fix container-stop notification paths to avoid multi-minute un-register delays; add a feature flag to gate workflow-side terminal GTE emission.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java Adds launcher exit-code propagation + terminal hooks and integrates them into terminal-report and failure paths.
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTerminalGteTest.java New unit tests for launcher terminal-status mapping, exit-code wiring, and hook dispatch/guarding.
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java Captures workflow terminal status for reuse by AM exit code + YARN un-register status; adds mapping helpers.
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java Adds tests for workflow-status→final-status mapping, exit codes, and status-capture caching/reset behavior.
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java Uses captured terminal status to un-register with correct FinalApplicationStatus; adds notify helper to wake shutdown waiter.
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java Ensures shutdown waiter is notified even on early-return/edge container-completion paths.
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java Exits AM JVM with code derived from captured workflow terminal status.
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java Adds feature flag to gate workflow-side terminal job-completion GTE emission.
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java Gates terminal success/failure GTE emission on the new feature flag.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +1312 to +1325
try {
gobblinYarnAppLauncher.launch();
} catch (Exception e) {
// launch() already invoked handleApplicationLaunchFailure (sets exitCode=1, emits terminal GTE) before
// rethrowing. Swallow here so we reach the explicit System.exit below with the non-zero code.
LOGGER.error("GobblinYarnAppLauncher launch failed", e);
}

// Surface AM-level failures (FAILED/KILLED/UNDEFINED FinalApplicationStatus, lost-AM-visibility on
// exhausted report fetches, or a never-launched application) as a non-zero launcher process exit so
// GGW/Grid Gateway dashboards see them end-to-end. SUCCEEDED keeps exitCode at the field's default of 0.
int finalExitCode = gobblinYarnAppLauncher.getExitCode();
LOGGER.info("GobblinYarnAppLauncher exiting with code {}", finalExitCode);
System.exit(finalExitCode);
Comment thread gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java Outdated
Comment on lines +130 to +135
return response.getWorkflowExecutionInfo().getStatus();
} catch (Exception e) {
log.warn("Failed to describe workflow {} for completion GTE; treating as UNSPECIFIED (will emit JOB_FAILED)",
this.workflowId, e);
return WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the blanket exception catch will lead to wrong GTE in case of temporal is momentarily down / namespace limit exceeded exception

DEADLINE_EXCEEDED
UNAVAILABLE
RESOURCE_EXHAUSTED

can we see if we can distinguish between them ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch.let me check ways for handling it

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks — addressed in 11ba7500a. fetchWorkflowStatus() now retries on transient gRPC StatusRuntimeException codes (UNAVAILABLE, DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED) with bounded linear backoff (3 attempts) before falling back; non-retryable codes fall back immediately.

One inherent caveat: if Temporal is still unreachable after the retries, the true status is genuinely unknowable, so we still fall back to UNSPECIFIEDJOB_FAILED rather than emit nothing (leaving GaaS without any terminal state would be worse). Happy to tune the attempt count / backoff. Leaving this open for your re-review.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update in 03e7bf9e0: on a closer look, the Temporal service stubs are already built with RpcRetryOptions (TemporalWorkflowClientFactory#buildRpcRetryOptions, tunable via gobblin.temporal.rpc.retry.options.*), so transient gRPC codes (UNAVAILABLE / DEADLINE_EXCEEDED / RESOURCE_EXHAUSTED) are already retried with backoff at the stub layer before this catch is ever reached. I've therefore dropped the manual retry loop + Thread.sleep I'd added in the previous commit and now rely on that existing, configurable retry — the catch only handles the genuinely-unreachable-after-retries fallback to UNSPECIFIED. Happy to add describe-specific tuning if you'd prefer it over the shared stub options.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess temporal sdk already internally retries on its apis,, that should be applicable here

}
sendGracefulShutdownSignal(amContainerId.get());
pollForApplicationCompletionUntil(timeoutMs);
// Always kill so the RM cannot re-attempt the AM (no-op if the app already finished). If the terminal
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be a no-op or we will see some exceptions in log ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

killApplication on an already-finished application is idempotent on the RM side — it returns without throwing (no YarnException), so it is effectively a no-op and won't produce error logs; the only line emitted is our own preceding LOGGER.info(... force-killing ...). In 11ba7500a I also now re-check the report just before this call and dispatch the real terminal status if the app already finished, so for a self-completed app we no longer rely on the kill for the outcome at all. Leaving open for your confirmation.

pratapaditya04 and others added 3 commits June 3, 2026 14:49
…tatus

- GobblinTemporalJobLauncher.fetchWorkflowStatus: retry transient gRPC failures
  (UNAVAILABLE / DEADLINE_EXCEEDED / RESOURCE_EXHAUSTED) with bounded linear
  backoff before falling back to UNSPECIFIED, so a momentary Temporal outage is
  not mis-reported as a terminal JOB_FAILED GTE.
- temporal YarnService.shutDown: re-check containerMap emptiness inside the
  allContainersStopped monitor and loop on the condition, fixing a lost-wakeup
  where a notify firing between the outer check and wait() stalled the AM
  un-register for the full 2-minute timeout. Also correct the
  getFinalApplicationStatusForUnregister Javadoc (a null captured status maps
  to FAILED, not SUCCEEDED).
- GobblinYarnAppLauncher.signalGracefulShutdownAndWaitForTerminal: re-fetch the
  ApplicationReport after polling and dispatch the real terminal status if the
  app already finished, instead of an unconditional synthetic KILLED. Document
  that onTerminalApplicationStatus may receive a null ApplicationReport.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…configurable container-stop wait; skip kill when app already finished)

- fetchWorkflowStatus: remove the manual retry loop + Thread.sleep. Transient gRPC
  failures (UNAVAILABLE / DEADLINE_EXCEEDED / RESOURCE_EXHAUSTED) are already retried
  with backoff by the Temporal service stubs' configured RpcRetryOptions
  (gobblin.temporal.rpc.retry.options.*); the catch now only handles the
  post-retry-exhausted fallback to UNSPECIFIED.
- YarnService.shutDown: make the bounded container-stop wait configurable via
  gobblin.temporal.containers.stop.wait.timeout.minutes (default 2).
- GobblinYarnAppLauncher.signalGracefulShutdownAndWaitForTerminal: only force-kill
  when the app is not already finished; if it already reached a terminal state,
  dispatch the real status and skip the kill.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
testGracefulShutdownSendsSignalAndPolls expected getApplicationReport to be
invoked twice, but signalGracefulShutdownAndWaitForTerminal now re-fetches the
report once after the poll to decide between surfacing the real terminal status
and force-killing. Update the expected count to 3 and assert that the kill is
skipped when the application is already terminal.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants