[GOBBLIN-2247]Add cleanup logic for Staging directories in temporal flow#4166
[GOBBLIN-2247]Add cleanup logic for Staging directories in temporal flow#4166pratapaditya04 wants to merge 15 commits intoapache:masterfrom
Conversation
…to cleanup_temp
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #4166 +/- ##
============================================
+ Coverage 48.36% 49.33% +0.96%
- Complexity 8742 10362 +1620
============================================
Files 1616 1926 +310
Lines 64748 75616 +10868
Branches 7302 8388 +1086
============================================
+ Hits 31317 37305 +5988
- Misses 30645 35014 +4369
- Partials 2786 3297 +511 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This pull request implements graceful shutdown support with work directory cleanup across Gobblin Yarn and Temporal deployments. The changes enable proper cleanup of staging and working directories during job termination, addressing the problem of accumulated temporary files from abrupt container terminations.
Changes:
- Added graceful shutdown signaling in Yarn launcher to send GRACEFUL_SHUTDOWN to AM containers and wait for terminal state before exiting
- Implemented JVM shutdown hook in Temporal job launcher to perform cleanup of staging/working directories during abnormal termination
- Added configuration keys for graceful shutdown timeouts and cleanup behavior with corresponding test coverage
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 16 comments.
Show a summary per file
| File | Description |
|---|---|
| gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java | Implements graceful shutdown signaling to AM container with polling logic and timeout handling |
| gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java | Adds configuration keys for graceful shutdown wait time and poll interval |
| gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java | Adds unit tests for graceful shutdown behavior and application completion state checking |
| gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java | Registers shutdown hook for cleanup, implements staging directory cleanup with multi-filesystem support |
| gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java | Adds configuration keys for work directory paths to delete and cleanup timeout |
| gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java | Stores work directory paths in JobState for later cleanup use |
| gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java | Adds tests for staging directory cleanup with various configurations |
| gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java | Adds tests verifying work directory paths are correctly stored in JobState |
Comments suppressed due to low confidence (1)
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:462
- If the graceful shutdown signal fails or times out, the method proceeds with stopping the launcher normally. However, the ApplicationMaster container may still be running and performing cleanup when cleanUpAppWorkDirectory is called, potentially causing conflicts. The AM might be trying to write final state while the launcher is deleting directories. Consider adding a warning log or checking application state again before cleanup to ensure the AM has actually terminated.
// Only signal and wait when we are staying attached: if detachOnExit is enabled, we leave the app running.
if (this.applicationId.isPresent() && !this.detachOnExitEnabled) {
signalGracefulShutdownAndWaitForTerminal();
}
if (this.serviceManager.isPresent()) {
this.serviceManager.get().stopAsync().awaitStopped(5, TimeUnit.MINUTES);
}
ExecutorsUtils.shutdownExecutorService(this.applicationStatusMonitor, Optional.of(LOGGER), 5, TimeUnit.MINUTES);
stopYarnClient();
if (!this.detachOnExitEnabled) {
LOGGER.info("Disabling all live Helix instances..");
}
} finally {
try {
if (this.applicationId.isPresent() && !this.detachOnExitEnabled) {
cleanUpAppWorkDirectory(this.applicationId.get());
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
Show resolved
Hide resolved
| private void deleteSinglePath(Path path, java.util.List<FileSystem> writerFileSystems) { | ||
| for (FileSystem writerFs : writerFileSystems) { | ||
| try { | ||
| if (writerFs.exists(path)) { | ||
| log.info("Deleting work directory: {}", path); | ||
| writerFs.delete(path, true); | ||
| return; | ||
| } | ||
| } catch (Exception e) { | ||
| // Try next filesystem | ||
| } | ||
| } | ||
| log.info("Work directory does not exist or not accessible: {}", path); | ||
| } |
There was a problem hiding this comment.
The method silently swallows exceptions when trying different filesystems (line 506-508). While this allows trying multiple filesystems, it makes debugging difficult when all filesystems fail to delete a path. The final log message "does not exist or not accessible" doesn't distinguish between a path that truly doesn't exist and a path that failed to delete due to permissions or other errors. Consider logging the exceptions at debug level or collecting them to provide more detailed error information.
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
Show resolved
Hide resolved
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
Show resolved
Hide resolved
| private void pollForApplicationCompletionUntil(long timeoutMs) throws InterruptedException, YarnException, IOException { | ||
| int pollIntervalSec = ConfigUtils.getInt(this.config, | ||
| GobblinYarnConfigurationKeys.GRACEFUL_SHUTDOWN_POLL_INTERVAL_SECONDS_KEY, | ||
| GobblinYarnConfigurationKeys.DEFAULT_GRACEFUL_SHUTDOWN_POLL_INTERVAL_SECONDS); | ||
| long pollIntervalMs = pollIntervalSec * 1000L; | ||
| if (pollIntervalMs > timeoutMs) { | ||
| pollIntervalMs = timeoutMs; | ||
| } | ||
| long deadlineMs = System.currentTimeMillis() + timeoutMs; | ||
| ApplicationId appId = this.applicationId.get(); | ||
| while (true) { | ||
| ApplicationReport report = this.yarnClient.getApplicationReport(appId); | ||
| if (isApplicationCompleted(report)) { | ||
| LOGGER.info("Application {} reached terminal state {}", appId, report.getYarnApplicationState()); | ||
| return; | ||
| } | ||
| long nowMs = System.currentTimeMillis(); | ||
| if (nowMs >= deadlineMs) { | ||
| LOGGER.info("Graceful shutdown wait timeout reached for application {}", appId); | ||
| return; | ||
| } | ||
| long remainingMs = deadlineMs - nowMs; | ||
| Thread.sleep(Math.min(pollIntervalMs, remainingMs)); | ||
| } |
There was a problem hiding this comment.
The polling logic has an issue where if pollIntervalMs > timeoutMs, it sets pollIntervalMs = timeoutMs. Then if remainingMs < pollIntervalMs on the first iteration (which is common when timeoutMs is small), the thread will sleep for the remaining time, effectively making the entire timeout period a single sleep without any polling. This defeats the purpose of polling. Consider using a minimum poll interval or restructuring the logic to ensure at least one poll happens before timeout.
|
|
||
| /** | ||
| * Poll interval (seconds) when waiting for application completion after graceful shutdown signal. | ||
| * Default 5. Should be less than the total wait time (see {@link #GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY}). |
There was a problem hiding this comment.
The documentation comment states "Default 5" but the actual default value is 60. This inconsistency could confuse users. The documentation should be updated to match the implementation.
| * Default 5. Should be less than the total wait time (see {@link #GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY}). | |
| * Default 60. Should be less than the total wait time (see {@link #GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY}). |
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
Show resolved
Hide resolved
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java
Show resolved
Hide resolved
| stagingDir.delete(); | ||
| tmpDir.delete(); | ||
| } |
There was a problem hiding this comment.
The test cleanup at the end only deletes tmpDir without recursively deleting its contents. If the cleanup was disabled, stagingDir still exists and tmpDir.delete() will fail silently. Use recursive deletion or FileUtils.deleteDirectory() to ensure proper cleanup.
| throws Exception { | ||
| this.workflowId = "someWorkflowId"; | ||
| } | ||
|
|
||
| // Expose jobContext for testing | ||
| public org.apache.gobblin.runtime.JobContext getJobContext() { | ||
| return this.jobContext; | ||
| } | ||
| } |
There was a problem hiding this comment.
The GobblinTemporalJobLauncherForTest exposes getJobContext() for testing, which is good. However, since each test now creates a launcher instance that registers a shutdown hook in the parent class constructor, these shutdown hooks will accumulate and all execute when the JVM terminates. This could cause issues in test execution. Consider adding a cleanup mechanism to remove the shutdown hook after each test, or implementing a way to disable shutdown hook registration during testing.
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Add Graceful Shutdown Support with Work Directory Cleanup
Summary
Implements graceful shutdown support across Gobblin Yarn and Temporal by:
This ensures proper cleanup during job termination while allowing the Application Master time to finish cleanup tasks.
Problem
When Gobblin applications are stopped, there was no mechanism to:
This led to:
Solution
GobblinYarnAppLauncher.java:
GobblinYarnConfigurationKeys.java:
gobblin.yarn.graceful.shutdown.wait.time.minutes=10 # Max wait for terminal state
gobblin.yarn.graceful.shutdown.poll.interval.seconds=60 # Poll interval
YarnHelixUtils.java:
GobblinJobLauncher.java:
GenerateWorkUnitsImpl.java:
Tests
Added unit tests
Commits