fix: prevent heartbeat timer from being permanently killed by slow or delayed heartbeats#18904
fix: prevent heartbeat timer from being permanently killed by slow or delayed heartbeats#18904prashantwason wants to merge 1 commit into
Conversation
… delayed heartbeats HoodieHeartbeatClient could permanently stop generating heartbeats for an instant, causing later commits to abort with "Heartbeat for instant ... has expired" even though the writer was still alive: - The heartbeat file is written synchronously on the Timer thread. Because the timer uses scheduleAtFixedRate, a slow or hung storage write blocks the thread and freezes all subsequent heartbeats for that instant. - When a heartbeat refresh is delayed past the tolerable interval, updateHeartbeat() called Thread.currentThread().interrupt(), permanently killing the timer thread and turning a transient delay into a permanent blackout. Fix: - Perform the heartbeat file write on a bounded daemon executor (Future.get with a per-interval timeout) so a slow or hung storage call cannot block the timer thread; a timed-out write is retried on the next tick. - Remove the self-interrupt; log a warning and continue. The commit-time check HeartbeatUtils.abortIfHeartbeatExpired() remains the sole enforcement point. Add TestHoodieHeartbeatClient.testTimerSurvivesHungHeartbeatWrite.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18904 +/- ##
=========================================
Coverage 68.81% 68.81%
- Complexity 29160 29178 +18
=========================================
Files 2520 2520
Lines 140056 140086 +30
Branches 17209 17209
=========================================
+ Hits 96373 96397 +24
- Misses 35909 35912 +3
- Partials 7774 7777 +3
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR addresses two real issues with the heartbeat client: (a) synchronous storage writes blocking the timer thread, and (b) the self-interrupt permanently killing the timer. The new design (executor with bounded timeout + remove self-interrupt) is reasonable. One edge case worth double-checking in the inline comments around what happens when the very first heartbeat write times out during start(). Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. One minor type inconsistency worth noting in the new field declaration.
| heartbeat.setInstantTime(instantTime); | ||
| heartbeat.setLastHeartbeatTime(newHeartbeatTime); | ||
| heartbeat.setNumHeartbeats(heartbeat.getNumHeartbeats() + 1); | ||
| } catch (TimeoutException te) { |
There was a problem hiding this comment.
🤖 If the very first writeHeartbeatFile() call (invoked synchronously from start()) hits this timeout path, heartbeat.setLastHeartbeatTime(...) is never reached, so the Heartbeat's lastHeartbeatTime stays null. A subsequent call to isHeartbeatExpired() (e.g., from HeartbeatUtils.abortIfHeartbeatExpired() during a fast write-then-commit) would NPE at line 189 (currentTime - getLastHeartbeatTime() unboxes a null Long). Could lastHeartbeatTime be initialized conservatively here (or in start() before scheduling), or isHeartbeatExpired guarded against the null case, to close this corner?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| // Maximum time the timer thread will wait for a single heartbeat file write to complete before | ||
| // abandoning it and letting the next tick retry. Bounded to one interval so that a slow/hung | ||
| // storage write cannot block the timer thread (and thus freeze all subsequent heartbeats). | ||
| private final long heartbeatWriteTimeoutMs; |
There was a problem hiding this comment.
🤖 nit: the two sibling duration fields (heartbeatIntervalInMs, maxAllowableHeartbeatIntervalInMs) are boxed Long — could you align heartbeatWriteTimeoutMs to Long as well to keep the field group consistent?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| public void close() { | ||
| this.stopHeartbeatTimers(); | ||
| this.instantToHeartbeatMap.clear(); | ||
| synchronized (this) { |
There was a problem hiding this comment.
do we have concurernt close on this client, should we add a isClosed flag to avoid repetitive close.
| // future heartbeats for this instant, turning a transient delay into a permanent blackout. | ||
| // Enforcement is done at commit time in HeartbeatUtils.abortIfHeartbeatExpired(), which is the | ||
| // correct and sole enforcement point. | ||
| log.warn("Missed generating heartbeat for instant {} within allowable interval {} ms; continuing to refresh", |
There was a problem hiding this comment.
makes sense somehow, but I do see some risk for correctness: when failed writes rollback strategy is configured as LAZY, the async cleaner would possibility rollback the current instant by removing some data files(not remove the metadata files on timeline yet), and then the write finish to commit, then the commit got data loss.
Should we also increase the tolerale missing cnt, the current default is 2, should we change it to 10 or 20
Describe the issue this Pull Request addresses
Closes #18903
HoodieHeartbeatClientcan permanently stop generating heartbeats for an in-flight instant, causing a later commit to abort withHoodieException: Heartbeat for instant <t> has expiredeven though the writer is still alive. Two independent causes, both inupdateHeartbeat():Timerthread. Since the timer usesscheduleAtFixedRate, a slow or hung storage write blocks the thread and freezes all subsequent heartbeats for the instant.updateHeartbeat()callsThread.currentThread().interrupt(), which permanently kills the timer thread — turning a transient delay (GC pause, driver stall, single slow write) into a permanent blackout.Summary and Changelog
Future.get(heartbeatWriteTimeoutMs)), so a slow or hung storage call can no longer block the timer thread. The write timeout is one heartbeat interval; a timed-out write does not advance the last-heartbeat time and is retried on the next tick. A cached thread pool is used so that if one write hangs, subsequent ticks proceed on a fresh thread.updateHeartbeat(). Instead ofThread.currentThread().interrupt(), log a warning and continue refreshing. The commit-time checkHeartbeatUtils.abortIfHeartbeatExpired()remains the sole enforcement point for staleness.close().TestHoodieHeartbeatClient.testTimerSurvivesHungHeartbeatWrite, which blocks the first heartbeat write and asserts the timer keeps generating heartbeats (covering both fixes).Impact
No public API or config change. Heartbeat refresh becomes resilient to transient storage latency and driver pauses: a transient stall no longer permanently disables heartbeats for an instant. Staleness is still enforced at commit time, so correctness of the concurrency guard is unchanged.
Risk Level
low
Behavior change is confined to
HoodieHeartbeatClient. ExistingTestHoodieHeartbeatClienttests pass and a new regression test was added.Documentation Update
none
Contributor's checklist