diff --git a/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java b/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java
index 3afcfc7e83..3c42854096 100644
--- a/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java
+++ b/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java
@@ -9,6 +9,12 @@
* called. It is OK to ignore the exception to let the activity complete. It assumes that {@link
* WorkerFactory#awaitTermination(long, TimeUnit)} is called with a timeout larger than the activity
* execution time.
+ *
+ *
Before this exception is thrown, the heartbeat is sent to the server on a best-effort,
+ * throttled basis. A caller that keeps heartbeating during the {@link
+ * WorkerFactory#awaitTermination(long, TimeUnit)} grace period thus prevents the server from timing
+ * the activity out before it finishes. Throttled heartbeats may skip sending details, so persist
+ * final progress through the activity's completion result rather than the last heartbeat.
*/
public final class ActivityWorkerShutdownException extends ActivityCompletionException {
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java
index 48993d0da1..b4b1bbb57f 100644
--- a/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java
+++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java
@@ -73,6 +73,11 @@ static long getLocalHeartbeatTimeoutBufferMillis() {
private long heartbeatTimeoutDeadlineNanos;
private boolean heartbeatTimedOut;
+ // Throttle state for the worker-shutdown heartbeat path, which has no scheduled executor:
+ // last send attempt (nanos, 0 = none) and whether it reached the server.
+ private long lastHeartbeatAttemptNanos;
+ private boolean lastHeartbeatSucceeded;
+
private ActivityCompletionException lastException;
public HeartbeatContextImpl(
@@ -144,6 +149,26 @@ public HeartbeatContextImpl(
@Override
public void heartbeat(V details) throws ActivityCompletionException {
if (heartbeatExecutor.isShutdown()) {
+ // Worker shutting down: scheduled executor is gone, so emit synchronously then
+ // signal shutdown. Caller may ignore the exception and keep heartbeating.
+ lock.lock();
+ try {
+ // Throttle as the scheduled path would, so a caller looping on heartbeat() doesn't flood.
+ long nowNanos = System.nanoTime();
+ long throttleMillis =
+ lastHeartbeatSucceeded ? heartbeatIntervalMillis : HEARTBEAT_RETRY_WAIT_MILLIS;
+ if (lastHeartbeatAttemptNanos == 0
+ || nowNanos - lastHeartbeatAttemptNanos
+ >= TimeUnit.MILLISECONDS.toNanos(throttleMillis)) {
+ sendHeartbeatRequest(details);
+ }
+ } catch (Exception e) {
+ // Best-effort: don't let a send error mask the shutdown signal the caller relies on.
+ log.warn("Heartbeat during worker shutdown failed", e);
+ } finally {
+ lock.unlock();
+ }
+ // Cancel/pause/reset from a successful send is intentionally not surfaced during shutdown.
throw new ActivityWorkerShutdownException(info);
}
lock.lock();
@@ -297,6 +322,8 @@ private void checkHeartbeatTimeoutDeadlineLocked() {
}
private void sendHeartbeatRequest(Object details) {
+ lastHeartbeatAttemptNanos = System.nanoTime();
+ lastHeartbeatSucceeded = false;
try {
RecordActivityTaskHeartbeatResponse status =
ActivityClientHelper.sendHeartbeatRequest(
@@ -306,6 +333,8 @@ private void sendHeartbeatRequest(Object details) {
info.getTaskToken(),
dataConverterWithActivityContext.toPayloads(details),
metricsScope);
+ // Reached the server, so the server-side heartbeat deadline was refreshed.
+ lastHeartbeatSucceeded = true;
if (status.getCancelRequested()) {
lastException = new ActivityCanceledException(info);
} else if (status.getActivityReset()) {
diff --git a/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java b/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java
index 686bc566f8..6640b404fa 100644
--- a/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java
+++ b/temporal-sdk/src/test/java/io/temporal/internal/activity/HeartbeatContextImplTest.java
@@ -13,6 +13,7 @@
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
import io.temporal.client.ActivityCanceledException;
import io.temporal.client.ActivityCompletionException;
+import io.temporal.client.ActivityWorkerShutdownException;
import io.temporal.common.converter.GlobalDataConverter;
import io.temporal.failure.TimeoutFailure;
import io.temporal.serviceclient.WorkflowServiceStubs;
@@ -183,6 +184,221 @@ public void heartbeatTimeoutPersistsAcrossMultipleCalls() {
ctx.cancelOutstandingHeartbeat();
}
+ @Test
+ public void heartbeatEmitsThenThrowsDuringWorkerShutdown() {
+ // Simulate worker shutdown: the heartbeat executor is shut down first.
+ heartbeatExecutor.shutdown();
+
+ when(blockingStub.recordActivityTaskHeartbeat(any()))
+ .thenReturn(RecordActivityTaskHeartbeatResponse.getDefaultInstance());
+
+ ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60));
+ HeartbeatContextImpl ctx = createHeartbeatContext(info);
+
+ try {
+ ctx.heartbeat("final-progress");
+ fail("Expected ActivityWorkerShutdownException");
+ } catch (ActivityWorkerShutdownException e) {
+ // expected: the caller is still notified that the worker is shutting down
+ }
+
+ // The heartbeat must be emitted to the server before the shutdown signal is thrown,
+ // so the server does not time out the activity during the awaitTermination grace period.
+ verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any());
+ }
+
+ @Test
+ public void heartbeatDuringWorkerShutdownThrowsShutdownEvenIfEmitFails() {
+ // Simulate worker shutdown.
+ heartbeatExecutor.shutdown();
+
+ // The final heartbeat RPC fails transiently.
+ when(blockingStub.recordActivityTaskHeartbeat(any()))
+ .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE));
+
+ ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60));
+ HeartbeatContextImpl ctx = createHeartbeatContext(info);
+
+ // A transient RPC failure must not mask the shutdown signal with a raw gRPC error.
+ try {
+ ctx.heartbeat("final-progress");
+ fail("Expected ActivityWorkerShutdownException");
+ } catch (ActivityWorkerShutdownException e) {
+ // expected
+ }
+
+ verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any());
+ }
+
+ @Test
+ public void heartbeatDuringWorkerShutdownThrottlesRepeatedHeartbeats() {
+ // Simulate worker shutdown.
+ heartbeatExecutor.shutdown();
+
+ when(blockingStub.recordActivityTaskHeartbeat(any()))
+ .thenReturn(RecordActivityTaskHeartbeatResponse.getDefaultInstance());
+
+ // Large heartbeat timeout => large throttle interval, so rapid successive calls during the
+ // grace period are throttled instead of flooding the server (the scheduled-executor throttle is
+ // unavailable once the executor is shut down).
+ ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60));
+ HeartbeatContextImpl ctx = createHeartbeatContext(info);
+
+ for (int i = 0; i < 5; i++) {
+ try {
+ ctx.heartbeat("progress-" + i);
+ fail("Expected ActivityWorkerShutdownException");
+ } catch (ActivityWorkerShutdownException e) {
+ // expected on every call
+ }
+ }
+
+ // Only the first heartbeat reaches the server; the rest fall within the throttle interval.
+ verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any());
+ }
+
+ @Test
+ public void heartbeatDuringWorkerShutdownThrowsShutdownEvenIfEmitThrowsNonGrpcException() {
+ // Simulate worker shutdown.
+ heartbeatExecutor.shutdown();
+
+ // The final heartbeat fails with a non-gRPC runtime exception (e.g. a serialization error).
+ when(blockingStub.recordActivityTaskHeartbeat(any()))
+ .thenThrow(new RuntimeException("serialization boom"));
+
+ ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60));
+ HeartbeatContextImpl ctx = createHeartbeatContext(info);
+
+ // Any failure while emitting the heartbeat must not mask the shutdown signal the caller relies
+ // on; the caller must still observe ActivityWorkerShutdownException.
+ try {
+ ctx.heartbeat("final-progress");
+ fail("Expected ActivityWorkerShutdownException");
+ } catch (ActivityWorkerShutdownException e) {
+ // expected
+ }
+
+ verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any());
+ }
+
+ @Test
+ public void heartbeatDuringWorkerShutdownRetriesPromptlyAfterFailedSend() {
+ // Simulate worker shutdown.
+ heartbeatExecutor.shutdown();
+
+ // The first send fails transiently; subsequent sends succeed.
+ when(blockingStub.recordActivityTaskHeartbeat(any()))
+ .thenThrow(new StatusRuntimeException(Status.UNAVAILABLE))
+ .thenReturn(RecordActivityTaskHeartbeatResponse.getDefaultInstance());
+
+ // Heartbeat timeout 60s => full throttle interval 48s, but a failed send must only block
+ // re-sending for the short retry interval (HEARTBEAT_RETRY_WAIT_MILLIS, 1s), mirroring the
+ // normal heartbeat path. Otherwise a transient failure would suppress heartbeats for a full
+ // interval and let the server time the activity out during the grace period.
+ ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60));
+ HeartbeatContextImpl ctx = createHeartbeatContext(info);
+
+ // First send fails.
+ try {
+ ctx.heartbeat("p1");
+ fail("Expected ActivityWorkerShutdownException");
+ } catch (ActivityWorkerShutdownException e) {
+ // expected
+ }
+
+ // An immediate retry is still throttled: a failed send does not open the floodgates.
+ try {
+ ctx.heartbeat("p2");
+ fail("Expected ActivityWorkerShutdownException");
+ } catch (ActivityWorkerShutdownException e) {
+ // expected
+ }
+ verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any());
+
+ // A resend must follow within the short retry interval. Polling well under the 48s
+ // full interval (but over the 1s retry interval) proves the failed send is retried
+ // promptly, not suppressed for a full interval. If the full interval were wrongly
+ // applied, this would never reach 2.
+ Eventually.assertEventually(
+ Duration.ofSeconds(10),
+ () -> {
+ try {
+ ctx.heartbeat("retry");
+ fail("Expected ActivityWorkerShutdownException");
+ } catch (ActivityWorkerShutdownException e) {
+ // expected on every call
+ }
+ verify(blockingStub, times(2)).recordActivityTaskHeartbeat(any());
+ });
+ }
+
+ @Test
+ public void heartbeatDuringWorkerShutdownResendsAfterThrottleIntervalElapses() {
+ // Simulate worker shutdown.
+ heartbeatExecutor.shutdown();
+
+ when(blockingStub.recordActivityTaskHeartbeat(any()))
+ .thenReturn(RecordActivityTaskHeartbeatResponse.getDefaultInstance());
+
+ // Heartbeat timeout 500ms => throttle interval is 400ms (0.8 * timeout).
+ ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofMillis(500));
+ HeartbeatContextImpl ctx = createHeartbeatContext(info);
+
+ // The first heartbeat goes out immediately.
+ try {
+ ctx.heartbeat("p1");
+ fail("Expected ActivityWorkerShutdownException");
+ } catch (ActivityWorkerShutdownException e) {
+ // expected
+ }
+ verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any());
+
+ // Once the throttle interval elapses, a later call resends to keep the server-side deadline
+ // fresh during the grace period.
+ Eventually.assertEventually(
+ Duration.ofSeconds(10),
+ () -> {
+ try {
+ ctx.heartbeat("p2");
+ fail("Expected ActivityWorkerShutdownException");
+ } catch (ActivityWorkerShutdownException e) {
+ // expected on every call
+ }
+ verify(blockingStub, times(2)).recordActivityTaskHeartbeat(any());
+ });
+ }
+
+ @Test
+ public void heartbeatDuringWorkerShutdownThrottlesAgainstPrecedingNormalHeartbeat() {
+ when(blockingStub.recordActivityTaskHeartbeat(any()))
+ .thenReturn(RecordActivityTaskHeartbeatResponse.getDefaultInstance());
+
+ // Large heartbeat timeout => 48s throttle interval, so a heartbeat sent just before shutdown
+ // keeps the next one throttled for far longer than this test runs.
+ ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60));
+ HeartbeatContextImpl ctx = createHeartbeatContext(info);
+
+ // A normal heartbeat (executor still running) sends synchronously and records the send time.
+ ctx.heartbeat("normal");
+ verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any());
+
+ // The worker now shuts down.
+ heartbeatExecutor.shutdown();
+
+ // The first shutdown-path heartbeat must reuse the throttle state left by the normal send: it
+ // falls within the 48s interval, so it throws without sending again. If the shutdown path did
+ // not carry over the normal send's timestamp, it would emit a second heartbeat here.
+ try {
+ ctx.heartbeat("shutdown");
+ fail("Expected ActivityWorkerShutdownException");
+ } catch (ActivityWorkerShutdownException e) {
+ // expected
+ }
+ verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any());
+
+ ctx.cancelOutstandingHeartbeat();
+ }
+
private HeartbeatContextImpl createHeartbeatContext(ActivityInfo info) {
return new HeartbeatContextImpl(
service,