Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
* called. It is OK to ignore the exception to let the activity complete. It assumes that {@link
* WorkerFactory#awaitTermination(long, TimeUnit)} is called with a timeout larger than the activity
* execution time.
*
* <p>Before this exception is thrown, the heartbeat is sent to the server on a best-effort,
* throttled basis. A caller that keeps heartbeating during the {@link
* WorkerFactory#awaitTermination(long, TimeUnit)} grace period thus prevents the server from timing
* the activity out before it finishes. Throttled heartbeats may skip sending details, so persist
* final progress through the activity's completion result rather than the last heartbeat.
*/
public final class ActivityWorkerShutdownException extends ActivityCompletionException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ static long getLocalHeartbeatTimeoutBufferMillis() {
private long heartbeatTimeoutDeadlineNanos;
private boolean heartbeatTimedOut;

// Throttle state for the worker-shutdown heartbeat path, which has no scheduled executor:
// last send attempt (nanos, 0 = none) and whether it reached the server.
private long lastHeartbeatAttemptNanos;
private boolean lastHeartbeatSucceeded;

private ActivityCompletionException lastException;

public HeartbeatContextImpl(
Expand Down Expand Up @@ -144,6 +149,26 @@ public HeartbeatContextImpl(
@Override
public <V> void heartbeat(V details) throws ActivityCompletionException {
if (heartbeatExecutor.isShutdown()) {
// Worker shutting down: scheduled executor is gone, so emit synchronously then
// signal shutdown. Caller may ignore the exception and keep heartbeating.
lock.lock();
try {
// Throttle as the scheduled path would, so a caller looping on heartbeat() doesn't flood.
long nowNanos = System.nanoTime();
long throttleMillis =
lastHeartbeatSucceeded ? heartbeatIntervalMillis : HEARTBEAT_RETRY_WAIT_MILLIS;
if (lastHeartbeatAttemptNanos == 0
|| nowNanos - lastHeartbeatAttemptNanos
>= TimeUnit.MILLISECONDS.toNanos(throttleMillis)) {
sendHeartbeatRequest(details);
}
} catch (Exception e) {
// Best-effort: don't let a send error mask the shutdown signal the caller relies on.
log.warn("Heartbeat during worker shutdown failed", e);
} finally {
lock.unlock();
}
// Cancel/pause/reset from a successful send is intentionally not surfaced during shutdown.
throw new ActivityWorkerShutdownException(info);
}
lock.lock();
Expand Down Expand Up @@ -297,6 +322,8 @@ private void checkHeartbeatTimeoutDeadlineLocked() {
}

private void sendHeartbeatRequest(Object details) {
lastHeartbeatAttemptNanos = System.nanoTime();
lastHeartbeatSucceeded = false;
try {
RecordActivityTaskHeartbeatResponse status =
ActivityClientHelper.sendHeartbeatRequest(
Expand All @@ -306,6 +333,8 @@ private void sendHeartbeatRequest(Object details) {
info.getTaskToken(),
dataConverterWithActivityContext.toPayloads(details),
metricsScope);
// Reached the server, so the server-side heartbeat deadline was refreshed.
lastHeartbeatSucceeded = true;
if (status.getCancelRequested()) {
lastException = new ActivityCanceledException(info);
} else if (status.getActivityReset()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
import io.temporal.client.ActivityCanceledException;
import io.temporal.client.ActivityCompletionException;
import io.temporal.client.ActivityWorkerShutdownException;
import io.temporal.common.converter.GlobalDataConverter;
import io.temporal.failure.TimeoutFailure;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand Down Expand Up @@ -183,6 +184,221 @@ public void heartbeatTimeoutPersistsAcrossMultipleCalls() {
ctx.cancelOutstandingHeartbeat();
}

@Test
public void heartbeatEmitsThenThrowsDuringWorkerShutdown() {
// Simulate worker shutdown: the heartbeat executor is shut down first.
heartbeatExecutor.shutdown();

when(blockingStub.recordActivityTaskHeartbeat(any()))
.thenReturn(RecordActivityTaskHeartbeatResponse.getDefaultInstance());

ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60));
HeartbeatContextImpl ctx = createHeartbeatContext(info);

try {
ctx.heartbeat("final-progress");
fail("Expected ActivityWorkerShutdownException");
} catch (ActivityWorkerShutdownException e) {
// expected: the caller is still notified that the worker is shutting down
}

// The heartbeat must be emitted to the server before the shutdown signal is thrown,
// so the server does not time out the activity during the awaitTermination grace period.
verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any());
}

@Test
public void heartbeatDuringWorkerShutdownThrowsShutdownEvenIfEmitFails() {
// Simulate worker shutdown.
heartbeatExecutor.shutdown();

// The final heartbeat RPC fails transiently.
when(blockingStub.recordActivityTaskHeartbeat(any()))
.thenThrow(new StatusRuntimeException(Status.UNAVAILABLE));

ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60));
HeartbeatContextImpl ctx = createHeartbeatContext(info);

// A transient RPC failure must not mask the shutdown signal with a raw gRPC error.
try {
ctx.heartbeat("final-progress");
fail("Expected ActivityWorkerShutdownException");
} catch (ActivityWorkerShutdownException e) {
// expected
}

verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any());
}

@Test
public void heartbeatDuringWorkerShutdownThrottlesRepeatedHeartbeats() {
// Simulate worker shutdown.
heartbeatExecutor.shutdown();

when(blockingStub.recordActivityTaskHeartbeat(any()))
.thenReturn(RecordActivityTaskHeartbeatResponse.getDefaultInstance());

// Large heartbeat timeout => large throttle interval, so rapid successive calls during the
// grace period are throttled instead of flooding the server (the scheduled-executor throttle is
// unavailable once the executor is shut down).
ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60));
HeartbeatContextImpl ctx = createHeartbeatContext(info);

for (int i = 0; i < 5; i++) {
try {
ctx.heartbeat("progress-" + i);
fail("Expected ActivityWorkerShutdownException");
} catch (ActivityWorkerShutdownException e) {
// expected on every call
}
}

// Only the first heartbeat reaches the server; the rest fall within the throttle interval.
verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any());
}

@Test
public void heartbeatDuringWorkerShutdownThrowsShutdownEvenIfEmitThrowsNonGrpcException() {
// Simulate worker shutdown.
heartbeatExecutor.shutdown();

// The final heartbeat fails with a non-gRPC runtime exception (e.g. a serialization error).
when(blockingStub.recordActivityTaskHeartbeat(any()))
.thenThrow(new RuntimeException("serialization boom"));

ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60));
HeartbeatContextImpl ctx = createHeartbeatContext(info);

// Any failure while emitting the heartbeat must not mask the shutdown signal the caller relies
// on; the caller must still observe ActivityWorkerShutdownException.
try {
ctx.heartbeat("final-progress");
fail("Expected ActivityWorkerShutdownException");
} catch (ActivityWorkerShutdownException e) {
// expected
}

verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any());
}

@Test
public void heartbeatDuringWorkerShutdownRetriesPromptlyAfterFailedSend() {
// Simulate worker shutdown.
heartbeatExecutor.shutdown();

// The first send fails transiently; subsequent sends succeed.
when(blockingStub.recordActivityTaskHeartbeat(any()))
.thenThrow(new StatusRuntimeException(Status.UNAVAILABLE))
.thenReturn(RecordActivityTaskHeartbeatResponse.getDefaultInstance());

// Heartbeat timeout 60s => full throttle interval 48s, but a failed send must only block
// re-sending for the short retry interval (HEARTBEAT_RETRY_WAIT_MILLIS, 1s), mirroring the
// normal heartbeat path. Otherwise a transient failure would suppress heartbeats for a full
// interval and let the server time the activity out during the grace period.
ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60));
HeartbeatContextImpl ctx = createHeartbeatContext(info);

// First send fails.
try {
ctx.heartbeat("p1");
fail("Expected ActivityWorkerShutdownException");
} catch (ActivityWorkerShutdownException e) {
// expected
}

// An immediate retry is still throttled: a failed send does not open the floodgates.
try {
ctx.heartbeat("p2");
fail("Expected ActivityWorkerShutdownException");
} catch (ActivityWorkerShutdownException e) {
// expected
}
verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any());

// A resend must follow within the short retry interval. Polling well under the 48s
// full interval (but over the 1s retry interval) proves the failed send is retried
// promptly, not suppressed for a full interval. If the full interval were wrongly
// applied, this would never reach 2.
Eventually.assertEventually(
Duration.ofSeconds(10),
() -> {
try {
ctx.heartbeat("retry");
fail("Expected ActivityWorkerShutdownException");
} catch (ActivityWorkerShutdownException e) {
// expected on every call
}
verify(blockingStub, times(2)).recordActivityTaskHeartbeat(any());
});
}

@Test
public void heartbeatDuringWorkerShutdownResendsAfterThrottleIntervalElapses() {
// Simulate worker shutdown.
heartbeatExecutor.shutdown();

when(blockingStub.recordActivityTaskHeartbeat(any()))
.thenReturn(RecordActivityTaskHeartbeatResponse.getDefaultInstance());

// Heartbeat timeout 500ms => throttle interval is 400ms (0.8 * timeout).
ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofMillis(500));
HeartbeatContextImpl ctx = createHeartbeatContext(info);

// The first heartbeat goes out immediately.
try {
ctx.heartbeat("p1");
fail("Expected ActivityWorkerShutdownException");
} catch (ActivityWorkerShutdownException e) {
// expected
}
verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any());

// Once the throttle interval elapses, a later call resends to keep the server-side deadline
// fresh during the grace period.
Eventually.assertEventually(
Duration.ofSeconds(10),
() -> {
try {
ctx.heartbeat("p2");
fail("Expected ActivityWorkerShutdownException");
} catch (ActivityWorkerShutdownException e) {
// expected on every call
}
verify(blockingStub, times(2)).recordActivityTaskHeartbeat(any());
});
}

@Test
public void heartbeatDuringWorkerShutdownThrottlesAgainstPrecedingNormalHeartbeat() {
when(blockingStub.recordActivityTaskHeartbeat(any()))
.thenReturn(RecordActivityTaskHeartbeatResponse.getDefaultInstance());

// Large heartbeat timeout => 48s throttle interval, so a heartbeat sent just before shutdown
// keeps the next one throttled for far longer than this test runs.
ActivityInfo info = activityInfoWithHeartbeatTimeout(Duration.ofSeconds(60));
HeartbeatContextImpl ctx = createHeartbeatContext(info);

// A normal heartbeat (executor still running) sends synchronously and records the send time.
ctx.heartbeat("normal");
verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any());

// The worker now shuts down.
heartbeatExecutor.shutdown();

// The first shutdown-path heartbeat must reuse the throttle state left by the normal send: it
// falls within the 48s interval, so it throws without sending again. If the shutdown path did
// not carry over the normal send's timestamp, it would emit a second heartbeat here.
try {
ctx.heartbeat("shutdown");
fail("Expected ActivityWorkerShutdownException");
} catch (ActivityWorkerShutdownException e) {
// expected
}
verify(blockingStub, times(1)).recordActivityTaskHeartbeat(any());

ctx.cancelOutstandingHeartbeat();
}

private HeartbeatContextImpl createHeartbeatContext(ActivityInfo info) {
return new HeartbeatContextImpl(
service,
Expand Down