From 6ba8743ce8ffb24830ac2553f82111096284e91a Mon Sep 17 00:00:00 2001 From: Baekgyu Date: Mon, 8 Jun 2026 22:18:03 +0900 Subject: [PATCH] Let activities heartbeat during worker shutdown --- .../ActivityWorkerShutdownException.java | 6 + .../activity/HeartbeatContextImpl.java | 29 +++ .../activity/HeartbeatContextImplTest.java | 216 ++++++++++++++++++ 3 files changed, 251 insertions(+) diff --git a/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java b/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java index 3afcfc7e83..3c42854096 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java +++ b/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java @@ -9,6 +9,12 @@ * called. It is OK to ignore the exception to let the activity complete. It assumes that {@link * WorkerFactory#awaitTermination(long, TimeUnit)} is called with a timeout larger than the activity * execution time. + * + *

Before this exception is thrown, the heartbeat is sent to the server on a best-effort, + * throttled basis. A caller that keeps heartbeating during the {@link + * WorkerFactory#awaitTermination(long, TimeUnit)} grace period thus prevents the server from timing + * the activity out before it finishes. Throttled heartbeats may skip sending details, so persist + * final progress through the activity's completion result rather than the last heartbeat. */ public final class ActivityWorkerShutdownException extends ActivityCompletionException { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java index 48993d0da1..b4b1bbb57f 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java @@ -73,6 +73,11 @@ static long getLocalHeartbeatTimeoutBufferMillis() { private long heartbeatTimeoutDeadlineNanos; private boolean heartbeatTimedOut; + // Throttle state for the worker-shutdown heartbeat path, which has no scheduled executor: + // last send attempt (nanos, 0 = none) and whether it reached the server. + private long lastHeartbeatAttemptNanos; + private boolean lastHeartbeatSucceeded; + private ActivityCompletionException lastException; public HeartbeatContextImpl( @@ -144,6 +149,26 @@ public HeartbeatContextImpl( @Override public void heartbeat(V details) throws ActivityCompletionException { if (heartbeatExecutor.isShutdown()) { + // Worker shutting down: scheduled executor is gone, so emit synchronously then + // signal shutdown. Caller may ignore the exception and keep heartbeating. + lock.lock(); + try { + // Throttle as the scheduled path would, so a caller looping on heartbeat() doesn't flood. + long nowNanos = System.nanoTime(); + long throttleMillis = + lastHeartbeatSucceeded ? heartbeatIntervalMillis : HEARTBEAT_RETRY_WAIT_MILLIS; + if (lastHeartbeatAttemptNanos == 0 + || nowNanos - lastHeartbeatAttemptNanos + >= TimeUnit.MILLISECONDS.toNanos(throttleMillis)) { + sendHeartbeatRequest(details); + } + } catch (Exception e) { + // Best-effort: don't let a send error mask the shutdown signal the caller relies on. + log.warn("Heartbeat during worker shutdown failed", e); + } finally { + lock.unlock(); + } + // Cancel/pause/reset from a successful send is intentionally not surfaced during shutdown. throw new ActivityWorkerShutdownException(info); } lock.lock(); @@ -297,6 +322,8 @@ private void checkHeartbeatTimeoutDeadlineLocked() { } private void sendHeartbeatRequest(Object details) { + lastHeartbeatAttemptNanos = System.nanoTime(); + lastHeartbeatSucceeded = false; try { RecordActivityTaskHeartbeatResponse status = ActivityClientHelper.sendHeartbeatRequest( @@ -306,6 +333,8 @@ private void sendHeartbeatRequest(Object details) { info.getTaskToken(), dataConverterWithActivityContext.toPayloads(details), metricsScope); + // Reached the server, so the server-side heartbeat deadline was refreshed. + lastHeartbeatSucceeded = true; if (status.getCancelRequested()) { lastException = new ActivityCanceledException(info); } else if (status.getActivityReset()) { diff --git a/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java b/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java index 686bc566f8..6640b404fa 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java @@ -13,6 +13,7 @@ import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; import io.temporal.client.ActivityCanceledException; import io.temporal.client.ActivityCompletionException; +import io.temporal.client.ActivityWorkerShutdownException; import io.temporal.common.converter.GlobalDataConverter; import io.temporal.failure.TimeoutFailure; import io.temporal.serviceclient.WorkflowServiceStubs; @@ -183,6 +184,221 @@ public void heartbeatTimeoutPersistsAcrossMultipleCalls() { ctx.cancelOutstandingHeartbeat(); } + @Test + public void heartbeatEmitsThenThrowsDuringWorkerShutdown() { + // Simulate worker shutdown: the heartbeat executor is shut down first. + heartbeatExecutor.shutdown(); + + when(blockingStub.recordActivityTaskHeartbeat(any())) + .thenReturn(RecordActivityTaskHeartbeatResponse.getDefaultInstance()); + + ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60)); + HeartbeatContextImpl ctx = createHeartbeatContext(info); + + try { + ctx.heartbeat("final-progress"); + fail("Expected ActivityWorkerShutdownException"); + } catch (ActivityWorkerShutdownException e) { + // expected: the caller is still notified that the worker is shutting down + } + + // The heartbeat must be emitted to the server before the shutdown signal is thrown, + // so the server does not time out the activity during the awaitTermination grace period. + verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any()); + } + + @Test + public void heartbeatDuringWorkerShutdownThrowsShutdownEvenIfEmitFails() { + // Simulate worker shutdown. + heartbeatExecutor.shutdown(); + + // The final heartbeat RPC fails transiently. + when(blockingStub.recordActivityTaskHeartbeat(any())) + .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)); + + ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60)); + HeartbeatContextImpl ctx = createHeartbeatContext(info); + + // A transient RPC failure must not mask the shutdown signal with a raw gRPC error. + try { + ctx.heartbeat("final-progress"); + fail("Expected ActivityWorkerShutdownException"); + } catch (ActivityWorkerShutdownException e) { + // expected + } + + verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any()); + } + + @Test + public void heartbeatDuringWorkerShutdownThrottlesRepeatedHeartbeats() { + // Simulate worker shutdown. + heartbeatExecutor.shutdown(); + + when(blockingStub.recordActivityTaskHeartbeat(any())) + .thenReturn(RecordActivityTaskHeartbeatResponse.getDefaultInstance()); + + // Large heartbeat timeout => large throttle interval, so rapid successive calls during the + // grace period are throttled instead of flooding the server (the scheduled-executor throttle is + // unavailable once the executor is shut down). + ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60)); + HeartbeatContextImpl ctx = createHeartbeatContext(info); + + for (int i = 0; i < 5; i++) { + try { + ctx.heartbeat("progress-" + i); + fail("Expected ActivityWorkerShutdownException"); + } catch (ActivityWorkerShutdownException e) { + // expected on every call + } + } + + // Only the first heartbeat reaches the server; the rest fall within the throttle interval. + verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any()); + } + + @Test + public void heartbeatDuringWorkerShutdownThrowsShutdownEvenIfEmitThrowsNonGrpcException() { + // Simulate worker shutdown. + heartbeatExecutor.shutdown(); + + // The final heartbeat fails with a non-gRPC runtime exception (e.g. a serialization error). + when(blockingStub.recordActivityTaskHeartbeat(any())) + .thenThrow(new RuntimeException("serialization boom")); + + ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60)); + HeartbeatContextImpl ctx = createHeartbeatContext(info); + + // Any failure while emitting the heartbeat must not mask the shutdown signal the caller relies + // on; the caller must still observe ActivityWorkerShutdownException. + try { + ctx.heartbeat("final-progress"); + fail("Expected ActivityWorkerShutdownException"); + } catch (ActivityWorkerShutdownException e) { + // expected + } + + verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any()); + } + + @Test + public void heartbeatDuringWorkerShutdownRetriesPromptlyAfterFailedSend() { + // Simulate worker shutdown. + heartbeatExecutor.shutdown(); + + // The first send fails transiently; subsequent sends succeed. + when(blockingStub.recordActivityTaskHeartbeat(any())) + .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE)) + .thenReturn(RecordActivityTaskHeartbeatResponse.getDefaultInstance()); + + // Heartbeat timeout 60s => full throttle interval 48s, but a failed send must only block + // re-sending for the short retry interval (HEARTBEAT_RETRY_WAIT_MILLIS, 1s), mirroring the + // normal heartbeat path. Otherwise a transient failure would suppress heartbeats for a full + // interval and let the server time the activity out during the grace period. + ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60)); + HeartbeatContextImpl ctx = createHeartbeatContext(info); + + // First send fails. + try { + ctx.heartbeat("p1"); + fail("Expected ActivityWorkerShutdownException"); + } catch (ActivityWorkerShutdownException e) { + // expected + } + + // An immediate retry is still throttled: a failed send does not open the floodgates. + try { + ctx.heartbeat("p2"); + fail("Expected ActivityWorkerShutdownException"); + } catch (ActivityWorkerShutdownException e) { + // expected + } + verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any()); + + // A resend must follow within the short retry interval. Polling well under the 48s + // full interval (but over the 1s retry interval) proves the failed send is retried + // promptly, not suppressed for a full interval. If the full interval were wrongly + // applied, this would never reach 2. + Eventually.assertEventually( + Duration.ofSeconds(10), + () -> { + try { + ctx.heartbeat("retry"); + fail("Expected ActivityWorkerShutdownException"); + } catch (ActivityWorkerShutdownException e) { + // expected on every call + } + verify(blockingStub, times(2)).recordActivityTaskHeartbeat(any()); + }); + } + + @Test + public void heartbeatDuringWorkerShutdownResendsAfterThrottleIntervalElapses() { + // Simulate worker shutdown. + heartbeatExecutor.shutdown(); + + when(blockingStub.recordActivityTaskHeartbeat(any())) + .thenReturn(RecordActivityTaskHeartbeatResponse.getDefaultInstance()); + + // Heartbeat timeout 500ms => throttle interval is 400ms (0.8 * timeout). + ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofMillis(500)); + HeartbeatContextImpl ctx = createHeartbeatContext(info); + + // The first heartbeat goes out immediately. + try { + ctx.heartbeat("p1"); + fail("Expected ActivityWorkerShutdownException"); + } catch (ActivityWorkerShutdownException e) { + // expected + } + verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any()); + + // Once the throttle interval elapses, a later call resends to keep the server-side deadline + // fresh during the grace period. + Eventually.assertEventually( + Duration.ofSeconds(10), + () -> { + try { + ctx.heartbeat("p2"); + fail("Expected ActivityWorkerShutdownException"); + } catch (ActivityWorkerShutdownException e) { + // expected on every call + } + verify(blockingStub, times(2)).recordActivityTaskHeartbeat(any()); + }); + } + + @Test + public void heartbeatDuringWorkerShutdownThrottlesAgainstPrecedingNormalHeartbeat() { + when(blockingStub.recordActivityTaskHeartbeat(any())) + .thenReturn(RecordActivityTaskHeartbeatResponse.getDefaultInstance()); + + // Large heartbeat timeout => 48s throttle interval, so a heartbeat sent just before shutdown + // keeps the next one throttled for far longer than this test runs. + ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60)); + HeartbeatContextImpl ctx = createHeartbeatContext(info); + + // A normal heartbeat (executor still running) sends synchronously and records the send time. + ctx.heartbeat("normal"); + verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any()); + + // The worker now shuts down. + heartbeatExecutor.shutdown(); + + // The first shutdown-path heartbeat must reuse the throttle state left by the normal send: it + // falls within the 48s interval, so it throws without sending again. If the shutdown path did + // not carry over the normal send's timestamp, it would emit a second heartbeat here. + try { + ctx.heartbeat("shutdown"); + fail("Expected ActivityWorkerShutdownException"); + } catch (ActivityWorkerShutdownException e) { + // expected + } + verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any()); + + ctx.cancelOutstandingHeartbeat(); + } + private HeartbeatContextImpl createHeartbeatContext(ActivityInfo info) { return new HeartbeatContextImpl( service,