Skip to content

[GOBBLIN-2247]Add cleanup logic for Staging directories in temporal flow#4166

Open
pratapaditya04 wants to merge 15 commits intoapache:masterfrom
pratapaditya04:cleanup_temp
Open

[GOBBLIN-2247]Add cleanup logic for Staging directories in temporal flow#4166
pratapaditya04 wants to merge 15 commits intoapache:masterfrom
pratapaditya04:cleanup_temp

Conversation

@pratapaditya04
Copy link
Contributor

@pratapaditya04 pratapaditya04 commented Feb 16, 2026

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):

Add Graceful Shutdown Support with Work Directory Cleanup

Summary

Implements graceful shutdown support across Gobblin Yarn and Temporal by:

  1. Yarn Launcher: Sends GRACEFUL_SHUTDOWN signal to AM container and waits for terminal state
  2. Temporal Job Launcher: Executes shutdown hook to clean up staging and working directories

This ensures proper cleanup during job termination while allowing the Application Master time to finish cleanup tasks.

Problem

When Gobblin applications are stopped, there was no mechanism to:

  • Signal the AM for graceful shutdown before killing containers
  • Clean up staging directories and temporary work files during abnormal termination
  • Wait for cleanup to complete before launcher exits

This led to:

  • Abrupt container termination without cleanup opportunity
  • Accumulated temporary files consuming disk space
  • Manual intervention required to clean up orphaned directories

Solution

  1. Yarn: Graceful Shutdown Signaling (gobblin-yarn)

GobblinYarnAppLauncher.java:

  • signalGracefulShutdownAndWaitForTerminal() - Sends GRACEFUL_SHUTDOWN signal to AM container via YarnClient
  • getAmContainerId() - Resolves AM container ID from application report
  • sendGracefulShutdownSignal() - Sends signal to container
  • pollForApplicationCompletionUntil() - Polls application state until terminal or timeout
  • Only signals when detachOnExit is disabled (stays attached to monitor completion)

GobblinYarnConfigurationKeys.java:
gobblin.yarn.graceful.shutdown.wait.time.minutes=10 # Max wait for terminal state
gobblin.yarn.graceful.shutdown.poll.interval.seconds=60 # Poll interval

YarnHelixUtils.java:

  • Updated jar cache retention from monthly to bi-monthly for better cleanup
  1. Temporal: Shutdown Hook for Cleanup (gobblin-temporal)

GobblinJobLauncher.java:

  • JVM shutdown hook that executes on SIGTERM/SIGKILL
  • Loads persisted JobState from disk
  • Creates non-cached FileSystem instances (avoids closure race conditions)
  • Runs staging and working directory cleanup in parallel with timeout
  • Supports multi-branch workflows with branch-specific filesystem URIs

GenerateWorkUnitsImpl.java:

  • Stores work directory paths in JobState before persistence

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Added unit tests

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@pratapaditya04 pratapaditya04 changed the title [DRAFT]added cleanup logic for working directories [GOBBLIN-2247]Add cleanup logic for Staging directories in temporal flow Feb 19, 2026
@codecov-commenter
Copy link

codecov-commenter commented Feb 19, 2026

Codecov Report

❌ Patch coverage is 47.85276% with 85 lines in your changes missing coverage. Please review.
✅ Project coverage is 49.33%. Comparing base (d445b1e) to head (7dae02d).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
...bblin/temporal/joblauncher/GobblinJobLauncher.java 46.42% 58 Missing and 2 partials ⚠️
...rg/apache/gobblin/yarn/GobblinYarnAppLauncher.java 54.16% 18 Missing and 4 partials ⚠️
...poral/ddm/activity/impl/GenerateWorkUnitsImpl.java 0.00% 3 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #4166      +/-   ##
============================================
+ Coverage     48.36%   49.33%   +0.96%     
- Complexity     8742    10362    +1620     
============================================
  Files          1616     1926     +310     
  Lines         64748    75616   +10868     
  Branches       7302     8388    +1086     
============================================
+ Hits          31317    37305    +5988     
- Misses        30645    35014    +4369     
- Partials       2786     3297     +511     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request implements graceful shutdown support with work directory cleanup across Gobblin Yarn and Temporal deployments. The changes enable proper cleanup of staging and working directories during job termination, addressing the problem of accumulated temporary files from abrupt container terminations.

Changes:

  • Added graceful shutdown signaling in Yarn launcher to send GRACEFUL_SHUTDOWN to AM containers and wait for terminal state before exiting
  • Implemented JVM shutdown hook in Temporal job launcher to perform cleanup of staging/working directories during abnormal termination
  • Added configuration keys for graceful shutdown timeouts and cleanup behavior with corresponding test coverage

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 16 comments.

Show a summary per file
File Description
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java Implements graceful shutdown signaling to AM container with polling logic and timeout handling
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java Adds configuration keys for graceful shutdown wait time and poll interval
gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java Adds unit tests for graceful shutdown behavior and application completion state checking
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java Registers shutdown hook for cleanup, implements staging directory cleanup with multi-filesystem support
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java Adds configuration keys for work directory paths to delete and cleanup timeout
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java Stores work directory paths in JobState for later cleanup use
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncherTest.java Adds tests for staging directory cleanup with various configurations
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImplTest.java Adds tests verifying work directory paths are correctly stored in JobState
Comments suppressed due to low confidence (1)

gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:462

  • If the graceful shutdown signal fails or times out, the method proceeds with stopping the launcher normally. However, the ApplicationMaster container may still be running and performing cleanup when cleanUpAppWorkDirectory is called, potentially causing conflicts. The AM might be trying to write final state while the launcher is deleting directories. Consider adding a warning log or checking application state again before cleanup to ensure the AM has actually terminated.
      // Only signal and wait when we are staying attached: if detachOnExit is enabled, we leave the app running.
      if (this.applicationId.isPresent() && !this.detachOnExitEnabled) {
        signalGracefulShutdownAndWaitForTerminal();
      }

      if (this.serviceManager.isPresent()) {
        this.serviceManager.get().stopAsync().awaitStopped(5, TimeUnit.MINUTES);
      }

      ExecutorsUtils.shutdownExecutorService(this.applicationStatusMonitor, Optional.of(LOGGER), 5, TimeUnit.MINUTES);

      stopYarnClient();

      if (!this.detachOnExitEnabled) {
        LOGGER.info("Disabling all live Helix instances..");
      }
    } finally {
      try {
        if (this.applicationId.isPresent() && !this.detachOnExitEnabled) {
          cleanUpAppWorkDirectory(this.applicationId.get());
        }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +498 to +511
private void deleteSinglePath(Path path, java.util.List<FileSystem> writerFileSystems) {
for (FileSystem writerFs : writerFileSystems) {
try {
if (writerFs.exists(path)) {
log.info("Deleting work directory: {}", path);
writerFs.delete(path, true);
return;
}
} catch (Exception e) {
// Try next filesystem
}
}
log.info("Work directory does not exist or not accessible: {}", path);
}
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method silently swallows exceptions when trying different filesystems (line 506-508). While this allows trying multiple filesystems, it makes debugging difficult when all filesystems fail to delete a path. The final log message "does not exist or not accessible" doesn't distinguish between a path that truly doesn't exist and a path that failed to delete due to permissions or other errors. Consider logging the exceptions at debug level or collecting them to provide more detailed error information.

Copilot uses AI. Check for mistakes.
Comment on lines +1005 to +1028
private void pollForApplicationCompletionUntil(long timeoutMs) throws InterruptedException, YarnException, IOException {
int pollIntervalSec = ConfigUtils.getInt(this.config,
GobblinYarnConfigurationKeys.GRACEFUL_SHUTDOWN_POLL_INTERVAL_SECONDS_KEY,
GobblinYarnConfigurationKeys.DEFAULT_GRACEFUL_SHUTDOWN_POLL_INTERVAL_SECONDS);
long pollIntervalMs = pollIntervalSec * 1000L;
if (pollIntervalMs > timeoutMs) {
pollIntervalMs = timeoutMs;
}
long deadlineMs = System.currentTimeMillis() + timeoutMs;
ApplicationId appId = this.applicationId.get();
while (true) {
ApplicationReport report = this.yarnClient.getApplicationReport(appId);
if (isApplicationCompleted(report)) {
LOGGER.info("Application {} reached terminal state {}", appId, report.getYarnApplicationState());
return;
}
long nowMs = System.currentTimeMillis();
if (nowMs >= deadlineMs) {
LOGGER.info("Graceful shutdown wait timeout reached for application {}", appId);
return;
}
long remainingMs = deadlineMs - nowMs;
Thread.sleep(Math.min(pollIntervalMs, remainingMs));
}
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The polling logic has an issue where if pollIntervalMs > timeoutMs, it sets pollIntervalMs = timeoutMs. Then if remainingMs < pollIntervalMs on the first iteration (which is common when timeoutMs is small), the thread will sleep for the remaining time, effectively making the entire timeout period a single sleep without any polling. This defeats the purpose of polling. Consider using a minimum poll interval or restructuring the logic to ensure at least one poll happens before timeout.

Copilot uses AI. Check for mistakes.

/**
* Poll interval (seconds) when waiting for application completion after graceful shutdown signal.
* Default 5. Should be less than the total wait time (see {@link #GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY}).
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation comment states "Default 5" but the actual default value is 60. This inconsistency could confuse users. The documentation should be updated to match the implementation.

Suggested change
* Default 5. Should be less than the total wait time (see {@link #GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY}).
* Default 60. Should be less than the total wait time (see {@link #GRACEFUL_SHUTDOWN_WAIT_TIME_MINUTES_KEY}).

Copilot uses AI. Check for mistakes.
Comment on lines +275 to +277
stagingDir.delete();
tmpDir.delete();
}
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test cleanup at the end only deletes tmpDir without recursively deleting its contents. If the cleanup was disabled, stagingDir still exists and tmpDir.delete() will fail silently. Use recursive deletion or FileUtils.deleteDirectory() to ensure proper cleanup.

Copilot uses AI. Check for mistakes.
Comment on lines 81 to 89
throws Exception {
this.workflowId = "someWorkflowId";
}

// Expose jobContext for testing
public org.apache.gobblin.runtime.JobContext getJobContext() {
return this.jobContext;
}
}
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GobblinTemporalJobLauncherForTest exposes getJobContext() for testing, which is good. However, since each test now creates a launcher instance that registers a shutdown hook in the parent class constructor, these shutdown hooks will accumulate and all execute when the JVM terminates. This could cause issues in test execution. Consider adding a cleanup mechanism to remove the shutdown hook after each test, or implementing a way to disable shutdown hook registration during testing.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments