diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 964d0d988..9297f198d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -108,6 +108,7 @@ jobs: --dynamic-config-value history.MaxBufferedQueryCount=10000 \ --dynamic-config-value frontend.workerVersioningDataAPIs=true \ --dynamic-config-value history.enableRequestIdRefLinks=true \ + --dynamic-config-value frontend.ListWorkersEnabled=true \ --dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' & sleep 10s diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java index 76c55d3c5..fa9cc82b6 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java @@ -23,6 +23,7 @@ import io.temporal.internal.client.external.ManualActivityCompletionClientFactory; import io.temporal.internal.common.PluginUtils; import io.temporal.internal.sync.StubMarker; +import io.temporal.internal.worker.HeartbeatManager; import io.temporal.serviceclient.MetricsTag; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.serviceclient.WorkflowServiceStubsPlugin; @@ -53,6 +54,8 @@ final class WorkflowClientInternalImpl implements WorkflowClient, WorkflowClient private final Scope metricsScope; private final WorkflowClientInterceptor[] interceptors; private final WorkerFactoryRegistry workerFactoryRegistry = new WorkerFactoryRegistry(); + private final String workerGroupingKey = java.util.UUID.randomUUID().toString(); + private final @Nullable HeartbeatManager heartbeatManager; /** * Creates client that connects to an instance of the Temporal Service. Cannot be used from within @@ -112,6 +115,14 @@ public static WorkflowClient newInstance( options.getNamespace(), options.getIdentity(), options.getDataConverter()); + + java.time.Duration heartbeatInterval = options.getWorkerHeartbeatInterval(); + if (!heartbeatInterval.isNegative()) { + this.heartbeatManager = + new HeartbeatManager(workflowServiceStubs, options.getIdentity(), heartbeatInterval); + } else { + this.heartbeatManager = null; + } } private WorkflowClientCallsInterceptor initializeClientInvoker() { @@ -790,6 +801,17 @@ public void deregisterWorkerFactory(WorkerFactory workerFactory) { workerFactoryRegistry.deregister(workerFactory); } + @Override + public String getWorkerGroupingKey() { + return workerGroupingKey; + } + + @Override + @Nullable + public HeartbeatManager getHeartbeatManager() { + return heartbeatManager; + } + @Override public NexusStartWorkflowResponse startNexus( NexusStartWorkflowRequest request, Functions.Proc workflow) { diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java index ca6785275..e10defba5 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java @@ -1,5 +1,6 @@ package io.temporal.client; +import com.google.common.base.Preconditions; import io.temporal.api.enums.v1.QueryRejectCondition; import io.temporal.common.Experimental; import io.temporal.common.context.ContextPropagator; @@ -7,6 +8,7 @@ import io.temporal.common.converter.GlobalDataConverter; import io.temporal.common.interceptors.WorkflowClientInterceptor; import java.lang.management.ManagementFactory; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -49,6 +51,7 @@ public static final class Builder { private List contextPropagators; private QueryRejectCondition queryRejectCondition; private WorkflowClientPlugin[] plugins; + private Duration workerHeartbeatInterval; private Builder() {} @@ -64,6 +67,7 @@ private Builder(WorkflowClientOptions options) { contextPropagators = options.contextPropagators; queryRejectCondition = options.queryRejectCondition; plugins = options.plugins; + workerHeartbeatInterval = options.workerHeartbeatInterval; } public Builder setNamespace(String namespace) { @@ -153,6 +157,19 @@ public Builder setPlugins(WorkflowClientPlugin... plugins) { return this; } + /** + * Sets the interval at which workers send heartbeat RPCs to the server. If not set or set to + * zero, defaults to 60 seconds. A negative duration disables heartbeating. Positive values must + * be between 1 and 60 seconds inclusive. + * + * @param workerHeartbeatInterval the heartbeat interval, or a negative duration to disable + */ + @Experimental + public Builder setWorkerHeartbeatInterval(Duration workerHeartbeatInterval) { + this.workerHeartbeatInterval = workerHeartbeatInterval; + return this; + } + public WorkflowClientOptions build() { return new WorkflowClientOptions( namespace, @@ -162,7 +179,8 @@ public WorkflowClientOptions build() { binaryChecksum, contextPropagators, queryRejectCondition, - plugins == null ? EMPTY_PLUGINS : plugins); + plugins == null ? EMPTY_PLUGINS : plugins, + resolveHeartbeatInterval(workerHeartbeatInterval)); } /** @@ -188,7 +206,22 @@ public WorkflowClientOptions validateAndBuildWithDefaults() { queryRejectCondition == null ? QueryRejectCondition.QUERY_REJECT_CONDITION_UNSPECIFIED : queryRejectCondition, - plugins == null ? EMPTY_PLUGINS : plugins); + plugins == null ? EMPTY_PLUGINS : plugins, + resolveHeartbeatInterval(workerHeartbeatInterval)); + } + + private static Duration resolveHeartbeatInterval(Duration raw) { + if (raw == null || raw.isZero()) { + return Duration.ofSeconds(60); + } + if (raw.isNegative()) { + return raw; + } + Preconditions.checkArgument( + raw.compareTo(Duration.ofSeconds(1)) >= 0 && raw.compareTo(Duration.ofSeconds(60)) <= 0, + "workerHeartbeatInterval must be between 1s and 60s, got %s", + raw); + return raw; } } @@ -215,6 +248,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() { private final WorkflowClientPlugin[] plugins; + private final Duration workerHeartbeatInterval; + private WorkflowClientOptions( String namespace, DataConverter dataConverter, @@ -223,7 +258,8 @@ private WorkflowClientOptions( String binaryChecksum, List contextPropagators, QueryRejectCondition queryRejectCondition, - WorkflowClientPlugin[] plugins) { + WorkflowClientPlugin[] plugins, + Duration workerHeartbeatInterval) { this.namespace = namespace; this.dataConverter = dataConverter; this.interceptors = interceptors; @@ -232,6 +268,7 @@ private WorkflowClientOptions( this.contextPropagators = contextPropagators; this.queryRejectCondition = queryRejectCondition; this.plugins = plugins; + this.workerHeartbeatInterval = workerHeartbeatInterval; } /** @@ -289,6 +326,15 @@ public WorkflowClientPlugin[] getPlugins() { return plugins; } + /** + * Returns the worker heartbeat interval. Defaults to 60 seconds if not configured. A negative + * duration means heartbeating is explicitly disabled. + */ + @Experimental + public Duration getWorkerHeartbeatInterval() { + return workerHeartbeatInterval; + } + @Override public String toString() { return "WorkflowClientOptions{" @@ -311,6 +357,8 @@ public String toString() { + queryRejectCondition + ", plugins=" + Arrays.toString(plugins) + + ", workerHeartbeatInterval=" + + workerHeartbeatInterval + '}'; } @@ -326,7 +374,9 @@ public boolean equals(Object o) { && com.google.common.base.Objects.equal(binaryChecksum, that.binaryChecksum) && com.google.common.base.Objects.equal(contextPropagators, that.contextPropagators) && queryRejectCondition == that.queryRejectCondition - && Arrays.equals(plugins, that.plugins); + && Arrays.equals(plugins, that.plugins) + && com.google.common.base.Objects.equal( + workerHeartbeatInterval, that.workerHeartbeatInterval); } @Override @@ -339,6 +389,7 @@ public int hashCode() { binaryChecksum, contextPropagators, queryRejectCondition, - Arrays.hashCode(plugins)); + Arrays.hashCode(plugins), + workerHeartbeatInterval); } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientInternal.java index 438b2bac7..fc034a366 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientInternal.java @@ -1,8 +1,10 @@ package io.temporal.internal.client; import io.temporal.client.WorkflowClient; +import io.temporal.internal.worker.HeartbeatManager; import io.temporal.worker.WorkerFactory; import io.temporal.workflow.Functions; +import javax.annotation.Nullable; /** * From OOP point of view, there is no reason for this interface not to extend {@link @@ -18,4 +20,9 @@ public interface WorkflowClientInternal { void deregisterWorkerFactory(WorkerFactory workerFactory); NexusStartWorkflowResponse startNexus(NexusStartWorkflowRequest request, Functions.Proc workflow); + + String getWorkerGroupingKey(); + + @Nullable + HeartbeatManager getHeartbeatManager(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java index 7072d163e..b01503956 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java @@ -17,7 +17,6 @@ import io.temporal.worker.PollerTypeMetricsTag; import io.temporal.worker.tuning.*; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import javax.annotation.Nonnull; import org.slf4j.Logger; @@ -30,7 +29,7 @@ final class ActivityPollTask implements MultiThreadedPoller.PollTask slotSupplier; private final Scope metricsScope; private final PollActivityTaskQueueRequest pollRequest; - private final AtomicInteger pollGauge = new AtomicInteger(); + private final PollerTracker pollerTracker; @SuppressWarnings("deprecation") public ActivityPollTask( @@ -42,10 +41,12 @@ public ActivityPollTask( double activitiesPerSecond, @Nonnull TrackingSlotSupplier slotSupplier, @Nonnull Scope metricsScope, - @Nonnull Supplier serverCapabilities) { + @Nonnull Supplier serverCapabilities, + @Nonnull PollerTracker pollerTracker) { this.service = Objects.requireNonNull(service); this.slotSupplier = slotSupplier; this.metricsScope = Objects.requireNonNull(metricsScope); + this.pollerTracker = Objects.requireNonNull(pollerTracker); PollActivityTaskQueueRequest.Builder pollRequest = PollActivityTaskQueueRequest.newBuilder() @@ -100,7 +101,7 @@ public ActivityTask poll() { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.incrementAndGet()); + .update(pollerTracker.pollStarted()); try { response = @@ -119,6 +120,7 @@ public ActivityTask poll() { ProtobufTimeUtils.toM3Duration( response.getStartedTime(), response.getCurrentAttemptScheduledTime())); isSuccessful = true; + pollerTracker.pollSucceeded(); return new ActivityTask( response, permit, @@ -126,7 +128,7 @@ public ActivityTask poll() { } finally { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.decrementAndGet()); + .update(pollerTracker.pollCompleted()); if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java index 3387ee760..520ce7a37 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java @@ -26,7 +26,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +48,9 @@ final class ActivityWorker implements SuspendableWorker { private final GrpcRetryer grpcRetryer; private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions; private final TrackingSlotSupplier slotSupplier; - private final AtomicBoolean serverSupportsAutoscaling; + private final TaskCounter taskCounter = new TaskCounter(); + private final PollerTracker pollerTracker; + private final NamespaceCapabilities namespaceCapabilities; public ActivityWorker( @Nonnull WorkflowServiceStubs service, @@ -59,7 +60,7 @@ public ActivityWorker( @Nonnull SingleWorkerOptions options, @Nonnull ActivityTaskHandler handler, @Nonnull SlotSupplier slotSupplier, - @Nonnull AtomicBoolean serverSupportsAutoscaling) { + @Nonnull NamespaceCapabilities namespaceCapabilities) { this.service = Objects.requireNonNull(service); this.namespace = Objects.requireNonNull(namespace); this.taskQueue = Objects.requireNonNull(taskQueue); @@ -75,7 +76,8 @@ public ActivityWorker( DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null); this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope); - this.serverSupportsAutoscaling = serverSupportsAutoscaling; + this.pollerTracker = new PollerTracker(); + this.namespaceCapabilities = namespaceCapabilities; } @Override @@ -107,10 +109,11 @@ public boolean start() { taskQueueActivitiesPerSecond, this.slotSupplier, workerMetricsScope, - service.getServerCapabilities()), + service.getServerCapabilities(), + pollerTracker), this.pollTaskExecutor, pollerOptions, - serverSupportsAutoscaling.get(), + namespaceCapabilities.isPollerAutoscaling(), workerMetricsScope); } else { @@ -126,7 +129,8 @@ public boolean start() { taskQueueActivitiesPerSecond, this.slotSupplier, workerMetricsScope, - service.getServerCapabilities()), + service.getServerCapabilities(), + pollerTracker), this.pollTaskExecutor, pollerOptions, workerMetricsScope); @@ -216,6 +220,22 @@ private PollerOptions getPollerOptions(SingleWorkerOptions options) { return pollerOptions; } + public TrackingSlotSupplier getSlotSupplier() { + return slotSupplier; + } + + public TaskCounter getTaskCounter() { + return taskCounter; + } + + public PollerOptions getPollerOptions() { + return pollerOptions; + } + + public PollerTracker getPollerTracker() { + return pollerTracker; + } + @Override public String toString() { return String.format( @@ -259,9 +279,22 @@ public void handle(ActivityTask task) throws Exception { MDC.put(LoggerTag.ATTEMPT, Integer.toString(pollResponse.getAttempt())); ActivityTaskHandler.Result result = null; + boolean taskFailed = false; try { result = handleActivity(task, metricsScope); + if (result.getTaskFailed() != null + && !io.temporal.internal.common.FailureUtils.isBenignApplicationFailure( + result.getTaskFailed().getFailure())) { + taskFailed = true; + } + } catch (Exception e) { + taskFailed = true; + throw e; } finally { + taskCounter.recordProcessed(); + if (taskFailed) { + taskCounter.recordFailed(); + } MDC.remove(LoggerTag.ACTIVITY_ID); MDC.remove(LoggerTag.ACTIVITY_TYPE); MDC.remove(LoggerTag.WORKFLOW_ID); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java index 7f5573e24..60ebcbf65 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java @@ -20,8 +20,8 @@ import io.temporal.worker.tuning.ActivitySlotInfo; import io.temporal.worker.tuning.SlotPermit; import io.temporal.worker.tuning.SlotReleaseReason; +import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import javax.annotation.Nonnull; import org.slf4j.Logger; @@ -34,8 +34,8 @@ public class AsyncActivityPollTask implements AsyncPoller.PollTaskAsync slotSupplier, @Nonnull Scope metricsScope, - @Nonnull Supplier serverCapabilities) { + @Nonnull Supplier serverCapabilities, + @Nonnull PollerTracker pollerTracker) { this.service = service; this.slotSupplier = slotSupplier; this.metricsScope = metricsScope; + this.pollerTracker = Objects.requireNonNull(pollerTracker); PollActivityTaskQueueRequest.Builder pollRequest = PollActivityTaskQueueRequest.newBuilder() @@ -86,7 +88,7 @@ public CompletableFuture poll(SlotPermit permit) { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.incrementAndGet()); + .update(pollerTracker.pollStarted()); CompletableFuture response = null; try { @@ -101,7 +103,7 @@ public CompletableFuture poll(SlotPermit permit) { } catch (Exception e) { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.decrementAndGet()); + .update(pollerTracker.pollCompleted()); throw new RuntimeException(e); } @@ -112,6 +114,7 @@ public CompletableFuture poll(SlotPermit permit) { metricsScope.counter(MetricsType.ACTIVITY_POLL_NO_TASK_COUNTER).inc(1); return null; } + pollerTracker.pollSucceeded(); metricsScope .timer(MetricsType.ACTIVITY_SCHEDULE_TO_START_LATENCY) .record( @@ -123,10 +126,11 @@ public CompletableFuture poll(SlotPermit permit) { () -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit)); }) .whenComplete( - (r, e) -> - MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK) - .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.decrementAndGet())); + (r, e) -> { + MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK) + .gauge(MetricsType.NUM_POLLERS) + .update(pollerTracker.pollCompleted()); + }); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java index 10be3b588..efc4dc807 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java @@ -20,7 +20,6 @@ import io.temporal.worker.tuning.SlotReleaseReason; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import javax.annotation.Nonnull; import org.slf4j.Logger; @@ -33,8 +32,8 @@ public class AsyncNexusPollTask implements AsyncPoller.PollTaskAsync private final WorkflowServiceStubs service; private final Scope metricsScope; private final PollNexusTaskQueueRequest pollRequest; - private final AtomicInteger pollGauge = new AtomicInteger(); private final Context.CancellableContext grpcContext = Context.ROOT.withCancellation(); + private final PollerTracker pollerTracker; @SuppressWarnings("deprecation") public AsyncNexusPollTask( @@ -45,10 +44,12 @@ public AsyncNexusPollTask( @Nonnull WorkerVersioningOptions versioningOptions, @Nonnull Scope metricsScope, @Nonnull Supplier serverCapabilities, - TrackingSlotSupplier slotSupplier) { + TrackingSlotSupplier slotSupplier, + @Nonnull PollerTracker pollerTracker) { this.service = Objects.requireNonNull(service); this.metricsScope = Objects.requireNonNull(metricsScope); this.slotSupplier = slotSupplier; + this.pollerTracker = Objects.requireNonNull(pollerTracker); PollNexusTaskQueueRequest.Builder pollRequest = PollNexusTaskQueueRequest.newBuilder() @@ -79,7 +80,7 @@ public CompletableFuture poll(SlotPermit permit) { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.incrementAndGet()); + .update(pollerTracker.pollStarted()); CompletableFuture response = null; try { @@ -94,7 +95,7 @@ public CompletableFuture poll(SlotPermit permit) { } catch (Exception e) { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.decrementAndGet()); + .update(pollerTracker.pollCompleted()); throw new RuntimeException(e); } @@ -105,6 +106,7 @@ public CompletableFuture poll(SlotPermit permit) { metricsScope.counter(MetricsType.NEXUS_POLL_NO_TASK_COUNTER).inc(1); return null; } + pollerTracker.pollSucceeded(); Timestamp startedTime = ProtobufTimeUtils.getCurrentProtoTime(); metricsScope .timer(MetricsType.NEXUS_SCHEDULE_TO_START_LATENCY) @@ -117,10 +119,11 @@ public CompletableFuture poll(SlotPermit permit) { () -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit)); }) .whenComplete( - (r, e) -> - MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK) - .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.decrementAndGet())); + (r, e) -> { + MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK) + .gauge(MetricsType.NUM_POLLERS) + .update(pollerTracker.pollCompleted()); + }); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java index 4a7f793cc..c30dbc9e1 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java @@ -20,7 +20,6 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -35,11 +34,11 @@ public class AsyncWorkflowPollTask private final Scope metricsScope; private final Scope pollerMetricScope; private final PollWorkflowTaskQueueRequest pollRequest; - private final AtomicInteger pollGauge = new AtomicInteger(); private final MetricsTag.TagValue taskQueueTagValue; private final boolean stickyPoller; private final Context.CancellableContext grpcContext = Context.ROOT.withCancellation(); private final AtomicBoolean shutdown = new AtomicBoolean(false); + private final PollerTracker pollerTracker; @Override public String toString() { @@ -56,10 +55,12 @@ public AsyncWorkflowPollTask( @Nonnull WorkerVersioningOptions versioningOptions, @Nonnull TrackingSlotSupplier slotSupplier, @Nonnull Scope metricsScope, - @Nonnull Supplier serverCapabilities) { + @Nonnull Supplier serverCapabilities, + @Nonnull PollerTracker pollerTracker) { this.service = service; this.slotSupplier = slotSupplier; this.metricsScope = metricsScope; + this.pollerTracker = Objects.requireNonNull(pollerTracker); PollWorkflowTaskQueueRequest.Builder pollRequestBuilder = PollWorkflowTaskQueueRequest.newBuilder() @@ -122,7 +123,7 @@ public CompletableFuture poll(SlotPermit permit) MetricsTag.tagged(metricsScope, taskQueueTagValue) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.incrementAndGet()); + .update(pollerTracker.pollStarted()); CompletableFuture response = null; try { @@ -137,7 +138,7 @@ public CompletableFuture poll(SlotPermit permit) } catch (Exception e) { MetricsTag.tagged(metricsScope, taskQueueTagValue) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.decrementAndGet()); + .update(pollerTracker.pollCompleted()); throw new RuntimeException(e); } @@ -150,6 +151,7 @@ public CompletableFuture poll(SlotPermit permit) .inc(1); return null; } + pollerTracker.pollSucceeded(); slotSupplier.markSlotUsed(new WorkflowSlotInfo(r, pollRequest), permit); pollerMetricScope .counter(MetricsType.WORKFLOW_TASK_QUEUE_POLL_SUCCEED_COUNTER) @@ -160,10 +162,11 @@ public CompletableFuture poll(SlotPermit permit) return new WorkflowTask(r, (reason) -> slotSupplier.releaseSlot(reason, permit)); }) .whenComplete( - (r, e) -> - MetricsTag.tagged(metricsScope, taskQueueTagValue) - .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.decrementAndGet())); + (r, e) -> { + MetricsTag.tagged(metricsScope, taskQueueTagValue) + .gauge(MetricsType.NUM_POLLERS) + .update(pollerTracker.pollCompleted()); + }); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/HeartbeatManager.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/HeartbeatManager.java new file mode 100644 index 000000000..43b99808c --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/HeartbeatManager.java @@ -0,0 +1,181 @@ +package io.temporal.internal.worker; + +import io.temporal.api.worker.v1.WorkerHeartbeat; +import io.temporal.api.workflowservice.v1.RecordWorkerHeartbeatRequest; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages periodic worker heartbeat RPCs. Routes workers to per-namespace {@link + * SharedNamespaceWorker} instances, each with its own scheduler. + */ +public class HeartbeatManager { + private static final Logger log = LoggerFactory.getLogger(HeartbeatManager.class); + + private final WorkflowServiceStubs service; + private final String identity; + private final Duration interval; + private final Map namespaceWorkers = new HashMap<>(); + + private final Object lock = new Object(); + + public HeartbeatManager(WorkflowServiceStubs service, String identity, Duration interval) { + this.service = service; + this.identity = identity; + this.interval = interval; + } + + /** + * Register a worker's heartbeat callback. Creates a per-namespace SharedNamespaceWorker if this + * is the first worker for the given namespace. + */ + public void registerWorker( + String namespace, String workerInstanceKey, Supplier callback) { + synchronized (lock) { + namespaceWorkers.compute( + namespace, + (ns, existing) -> { + if (existing != null && !existing.isShutdown()) { + existing.registerWorker(workerInstanceKey, callback); + return existing; + } + SharedNamespaceWorker nsWorker = + new SharedNamespaceWorker(service, ns, identity, interval); + nsWorker.registerWorker(workerInstanceKey, callback); + return nsWorker; + }); + } + } + + /** Unregister a worker. Stops the namespace worker if no workers remain for that namespace. */ + public void unregisterWorker(String namespace, String workerInstanceKey) { + synchronized (lock) { + SharedNamespaceWorker nsWorker = namespaceWorkers.get(namespace); + if (nsWorker == null) { + // Already cleaned up by shutdown() + return; + } + nsWorker.unregisterWorker(workerInstanceKey); + if (nsWorker.isEmpty()) { + nsWorker.shutdown(); + namespaceWorkers.remove(namespace); + } + } + } + + public void shutdown() { + synchronized (lock) { + for (SharedNamespaceWorker nsWorker : namespaceWorkers.values()) { + nsWorker.shutdown(); + } + namespaceWorkers.clear(); + } + } + + /** + * Handles heartbeating for all workers in a specific namespace. Each instance owns its own + * scheduler thread and callback map. + */ + static class SharedNamespaceWorker { + private final WorkflowServiceStubs service; + private final String namespace; + private final String identity; + private final ConcurrentHashMap> callbacks = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler; + private final AtomicBoolean shuttingDown = new AtomicBoolean(false); + + SharedNamespaceWorker( + WorkflowServiceStubs service, String namespace, String identity, Duration interval) { + this.service = service; + this.namespace = namespace; + this.identity = identity; + this.scheduler = + Executors.newSingleThreadScheduledExecutor( + r -> { + Thread t = new Thread(r, "worker-heartbeat-" + namespace); + t.setDaemon(true); + return t; + }); + scheduler.scheduleAtFixedRate( + this::heartbeatTick, 0, interval.toMillis(), TimeUnit.MILLISECONDS); + } + + void registerWorker(String workerInstanceKey, Supplier callback) { + callbacks.put(workerInstanceKey, callback); + } + + void unregisterWorker(String workerInstanceKey) { + callbacks.remove(workerInstanceKey); + } + + boolean isEmpty() { + return callbacks.isEmpty(); + } + + boolean isShutdown() { + return scheduler.isShutdown(); + } + + void shutdown() { + if (!shuttingDown.compareAndSet(false, true)) return; + scheduler.shutdown(); + try { + scheduler.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private void heartbeatTick() { + if (callbacks.isEmpty()) return; + + try { + List heartbeats = new ArrayList<>(); + for (Map.Entry> entry : callbacks.entrySet()) { + try { + heartbeats.add(entry.getValue().get()); + } catch (Exception e) { + log.warn( + "Failed to build heartbeat for worker {} in namespace {}", + entry.getKey(), + namespace, + e); + } + } + + if (!heartbeats.isEmpty()) { + service + .blockingStub() + .recordWorkerHeartbeat( + RecordWorkerHeartbeatRequest.newBuilder() + .setNamespace(namespace) + .setIdentity(identity) + .addAllWorkerHeartbeat(heartbeats) + .build()); + } + } catch (io.grpc.StatusRuntimeException e) { + if (e.getStatus().getCode() == io.grpc.Status.Code.UNIMPLEMENTED) { + log.warn( + "Server does not support worker heartbeats for namespace {}, disabling", namespace); + // Only signal shutdown — don't awaitTermination from within the scheduler's own thread + shuttingDown.set(true); + scheduler.shutdown(); + return; + } + log.warn("Failed to send worker heartbeat for namespace {}", namespace, e); + } catch (Exception e) { + log.warn("Failed to send worker heartbeat for namespace {}", namespace, e); + } + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java index 5fdea211d..d3dbf1d71 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java @@ -47,6 +47,7 @@ final class LocalActivityWorker implements Startable, Shutdownable { private final LocalActivityDispatcherImpl laScheduler; + private final TaskCounter taskCounter = new TaskCounter(); private final PollerOptions pollerOptions; private final Scope workerMetricsScope; @@ -397,6 +398,8 @@ public void handle(LocalActivityAttemptTask attemptTask) throws Exception { LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext(); executionContext.newAttempt(); PollActivityTaskQueueResponseOrBuilder activityTask = attemptTask.getAttemptTask(); + boolean taskFailed = false; + boolean taskExecuted = false; try { // if an activity was already completed by any mean like scheduleToClose or scheduleToStart, @@ -456,6 +459,7 @@ public void handle(LocalActivityAttemptTask attemptTask) throws Exception { } finally { sw.stop(); } + taskExecuted = true; // Cancel startToCloseTimeoutFuture if it's not yet fired. boolean startToCloseTimeoutFired = @@ -473,14 +477,26 @@ public void handle(LocalActivityAttemptTask attemptTask) throws Exception { } reason = handleResult(activityHandlerResult, attemptTask, metricsScope); + if (activityHandlerResult.getTaskFailed() != null + && !io.temporal.internal.common.FailureUtils.isBenignApplicationFailure( + activityHandlerResult.getTaskFailed().getFailure())) { + taskFailed = true; + } } catch (Throwable ex) { // handleLocalActivity is expected to never throw an exception and return a result // that can be used for a workflow callback if this method throws, it's a bug. log.error("[BUG] Code that expected to never throw an exception threw an exception", ex); + taskFailed = true; executionContext.callback( processingFailed(activityTask.getActivityId(), activityTask.getAttempt(), ex)); throw ex; } finally { + if (taskExecuted) { + taskCounter.recordProcessed(); + if (taskFailed) { + taskCounter.recordFailed(); + } + } slotSupplier.releaseSlot(reason, executionContext.getPermit()); MDC.remove(LoggerTag.ACTIVITY_ID); MDC.remove(LoggerTag.ACTIVITY_TYPE); @@ -748,10 +764,18 @@ private PollerOptions getPollerOptions(SingleWorkerOptions options) { return pollerOptions; } + public TrackingSlotSupplier getSlotSupplier() { + return slotSupplier; + } + public LocalActivityDispatcher getLocalActivityScheduler() { return laScheduler; } + public TaskCounter getTaskCounter() { + return taskCounter; + } + private static Failure newTimeoutFailure(TimeoutType timeoutType, @Nullable Failure cause) { TimeoutFailureInfo.Builder info = TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType); Failure.Builder result = Failure.newBuilder().setTimeoutFailureInfo(info); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java new file mode 100644 index 000000000..4fa9d09a5 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NamespaceCapabilities.java @@ -0,0 +1,29 @@ +package io.temporal.internal.worker; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Holds namespace-level capabilities discovered from the server's DescribeNamespace response. A + * single instance is shared across all workers in a WorkerFactory and is populated at startup. Uses + * AtomicBooleans so capabilities can be set after construction. + */ +public final class NamespaceCapabilities { + private final AtomicBoolean pollerAutoscaling = new AtomicBoolean(false); + private final AtomicBoolean workerHeartbeats = new AtomicBoolean(false); + + public boolean isPollerAutoscaling() { + return pollerAutoscaling.get(); + } + + public void setPollerAutoscaling(boolean value) { + pollerAutoscaling.set(value); + } + + public boolean isWorkerHeartbeats() { + return workerHeartbeats.get(); + } + + public void setWorkerHeartbeats(boolean value) { + workerHeartbeats.set(value); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java index 13e88690e..4116825b9 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java @@ -14,7 +14,6 @@ import io.temporal.worker.PollerTypeMetricsTag; import io.temporal.worker.tuning.*; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import javax.annotation.Nonnull; import org.slf4j.Logger; @@ -27,7 +26,7 @@ final class NexusPollTask implements MultiThreadedPoller.PollTask { private final TrackingSlotSupplier slotSupplier; private final Scope metricsScope; private final PollNexusTaskQueueRequest pollRequest; - private final AtomicInteger pollGauge = new AtomicInteger(); + private final PollerTracker pollerTracker; @SuppressWarnings("deprecation") public NexusPollTask( @@ -38,10 +37,12 @@ public NexusPollTask( @Nonnull WorkerVersioningOptions versioningOptions, @Nonnull TrackingSlotSupplier slotSupplier, @Nonnull Scope metricsScope, - @Nonnull Supplier serverCapabilities) { + @Nonnull Supplier serverCapabilities, + @Nonnull PollerTracker pollerTracker) { this.service = Objects.requireNonNull(service); this.slotSupplier = slotSupplier; this.metricsScope = Objects.requireNonNull(metricsScope); + this.pollerTracker = Objects.requireNonNull(pollerTracker); PollNexusTaskQueueRequest.Builder pollRequest = PollNexusTaskQueueRequest.newBuilder() @@ -89,7 +90,7 @@ public NexusTask poll() { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.incrementAndGet()); + .update(pollerTracker.pollStarted()); try { response = @@ -111,6 +112,7 @@ public NexusTask poll() { startedTime, response.getRequest().getScheduledTime())); isSuccessful = true; + pollerTracker.pollSucceeded(); return new NexusTask( response, permit, @@ -118,7 +120,7 @@ public NexusTask poll() { } finally { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.decrementAndGet()); + .update(pollerTracker.pollCompleted()); if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java index c1b999cdb..d826e5543 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java @@ -30,7 +30,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +52,10 @@ final class NexusWorker implements SuspendableWorker { private final GrpcRetryer grpcRetryer; private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions; private final TrackingSlotSupplier slotSupplier; - private final AtomicBoolean serverSupportsAutoscaling; + private final NamespaceCapabilities namespaceCapabilities; private final boolean forceOldFailureFormat; + private final TaskCounter taskCounter = new TaskCounter(); + private final PollerTracker pollerTracker = new PollerTracker(); public NexusWorker( @Nonnull WorkflowServiceStubs service, @@ -64,7 +65,7 @@ public NexusWorker( @Nonnull NexusTaskHandler handler, @Nonnull DataConverter dataConverter, @Nonnull SlotSupplier slotSupplier, - @Nonnull AtomicBoolean serverSupportsAutoscaling) { + @Nonnull NamespaceCapabilities namespaceCapabilities) { this.service = Objects.requireNonNull(service); this.namespace = Objects.requireNonNull(namespace); this.taskQueue = Objects.requireNonNull(taskQueue); @@ -80,7 +81,7 @@ public NexusWorker( DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null); this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope); - this.serverSupportsAutoscaling = serverSupportsAutoscaling; + this.namespaceCapabilities = namespaceCapabilities; // Allow tests to force old format for backward compatibility testing String forceOldFormat = System.getProperty("temporal.nexus.forceOldFailureFormat"); this.forceOldFailureFormat = "true".equalsIgnoreCase(forceOldFormat); @@ -113,10 +114,11 @@ public boolean start() { options.getWorkerVersioningOptions(), workerMetricsScope, service.getServerCapabilities(), - this.slotSupplier), + this.slotSupplier, + pollerTracker), this.pollTaskExecutor, pollerOptions, - serverSupportsAutoscaling.get(), + namespaceCapabilities.isPollerAutoscaling(), workerMetricsScope); } else { poller = @@ -130,7 +132,8 @@ public boolean start() { options.getWorkerVersioningOptions(), this.slotSupplier, workerMetricsScope, - service.getServerCapabilities()), + service.getServerCapabilities(), + pollerTracker), this.pollTaskExecutor, pollerOptions, workerMetricsScope); @@ -214,6 +217,22 @@ private PollerOptions getPollerOptions(SingleWorkerOptions options) { return pollerOptions; } + public TrackingSlotSupplier getSlotSupplier() { + return slotSupplier; + } + + public TaskCounter getTaskCounter() { + return taskCounter; + } + + public PollerOptions getPollerOptions() { + return pollerOptions; + } + + public PollerTracker getPollerTracker() { + return pollerTracker; + } + @Override public String toString() { return String.format( @@ -271,9 +290,17 @@ public void handle(NexusTask task) { service, operation, taskQueue, options.getIdentity(), options.getBuildId()), task.getPermit()); + boolean taskFailed = false; try { - handleNexusTask(task, metricsScope); + taskFailed = handleNexusTask(task, metricsScope); + } catch (Throwable e) { + taskFailed = true; + throw e; } finally { + taskCounter.recordProcessed(); + if (taskFailed) { + taskCounter.recordFailed(); + } task.getCompletionCallback().apply(); MDC.remove(LoggerTag.NEXUS_SERVICE); MDC.remove(LoggerTag.NEXUS_OPERATION); @@ -288,16 +315,18 @@ public Throwable wrapFailure(NexusTask task, Throwable failure) { } @SuppressWarnings("deprecation") // Uses hasOperationError()/getOperationError() for compat - private void handleNexusTask(NexusTask task, Scope metricsScope) { + private boolean handleNexusTask(NexusTask task, Scope metricsScope) { PollNexusTaskQueueResponseOrBuilder pollResponse = task.getResponse(); ByteString taskToken = pollResponse.getTaskToken(); NexusTaskHandler.Result result; + boolean failed = false; Stopwatch sw = metricsScope.timer(MetricsType.NEXUS_EXEC_LATENCY).start(); try { result = handler.handle(task, metricsScope); if (result.getHandlerException() != null) { + failed = true; metricsScope .tagged( Collections.singletonMap( @@ -307,6 +336,7 @@ private void handleNexusTask(NexusTask task, Scope metricsScope) { .inc(1); } else if (result.getResponse().hasStartOperation() && result.getResponse().getStartOperation().hasOperationError()) { + failed = true; String operationState = result.getResponse().getStartOperation().getOperationError().getOperationState(); metricsScope @@ -315,6 +345,7 @@ private void handleNexusTask(NexusTask task, Scope metricsScope) { .inc(1); } else if (result.getResponse().hasStartOperation() && result.getResponse().getStartOperation().hasFailure()) { + failed = true; Failure f = result.getResponse().getStartOperation().getFailure(); String operationState; if (f.hasApplicationFailureInfo()) { @@ -333,7 +364,7 @@ private void handleNexusTask(NexusTask task, Scope metricsScope) { .tagged(Collections.singletonMap(TASK_FAILURE_TYPE, "timeout")) .counter(MetricsType.NEXUS_EXEC_FAILED_COUNTER) .inc(1); - return; + return true; } catch (Throwable e) { metricsScope .tagged(Collections.singletonMap(TASK_FAILURE_TYPE, "internal_sdk_error")) @@ -364,6 +395,7 @@ private void handleNexusTask(NexusTask task, Scope metricsScope) { Duration e2eDuration = ProtobufTimeUtils.toM3DurationSinceNow(pollResponse.getRequest().getScheduledTime()); metricsScope.timer(MetricsType.NEXUS_TASK_E2E_LATENCY).record(e2eDuration); + return failed; } private void logExceptionDuringResultReporting( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/PollerTracker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/PollerTracker.java new file mode 100644 index 000000000..e91d1f608 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/PollerTracker.java @@ -0,0 +1,46 @@ +package io.temporal.internal.worker; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tracks in-flight poll count and last successful poll time for heartbeat reporting. + * + *

A single counter feeds both the metrics system and heartbeat population, avoiding the need for + * separate counters tracking the same value. + * + *

This object bridges the gap between poll tasks (created inside {@code start()}) and heartbeat + * building: the intermediate worker (e.g. ActivityWorker) creates it, passes it to the poll task, + * and later reads it when building heartbeats. + * + *

{@link #pollStarted()} and {@link #pollCompleted()} return the updated count so callers can + * forward the value to the Tally gauge in a single operation. + */ +public class PollerTracker { + private final AtomicInteger inFlightPolls = new AtomicInteger(); + private final AtomicReference lastSuccessfulPollTime = new AtomicReference<>(); + + /** Increments in-flight count. Returns the new value for forwarding to the Tally gauge. */ + public int pollStarted() { + return inFlightPolls.incrementAndGet(); + } + + /** Decrements in-flight count. Returns the new value for forwarding to the Tally gauge. */ + public int pollCompleted() { + return inFlightPolls.decrementAndGet(); + } + + /** Records the current time as the last successful poll (a poll that returned a task). */ + public void pollSucceeded() { + lastSuccessfulPollTime.set(Instant.now()); + } + + public int getInFlightPolls() { + return inFlightPolls.get(); + } + + public Instant getLastSuccessfulPollTime() { + return lastSuccessfulPollTime.get(); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java index 87ddc7c4a..2d015f2e8 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java @@ -11,7 +11,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +34,7 @@ public SyncActivityWorker( double taskQueueActivitiesPerSecond, SingleWorkerOptions options, SlotSupplier slotSupplier, - @Nonnull AtomicBoolean serverSupportsAutoscaling) { + @Nonnull NamespaceCapabilities namespaceCapabilities) { this.identity = options.getIdentity(); this.namespace = namespace; this.taskQueue = taskQueue; @@ -76,7 +75,7 @@ public SyncActivityWorker( options, taskHandler, slotSupplier, - serverSupportsAutoscaling); + namespaceCapabilities); } public void registerActivityImplementations(Object... activitiesImplementation) { @@ -151,6 +150,22 @@ public EagerActivityDispatcher getEagerActivityDispatcher() { return this.worker.getEagerActivityDispatcher(); } + public TrackingSlotSupplier getSlotSupplier() { + return worker.getSlotSupplier(); + } + + public TaskCounter getTaskCounter() { + return worker.getTaskCounter(); + } + + public PollerOptions getPollerOptions() { + return worker.getPollerOptions(); + } + + public PollerTracker getPollerTracker() { + return worker.getPollerTracker(); + } + @Override public String toString() { return String.format( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java index 23471f86e..adb6f7dff 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java @@ -6,7 +6,6 @@ import io.temporal.worker.tuning.SlotSupplier; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,7 +25,7 @@ public SyncNexusWorker( String taskQueue, SingleWorkerOptions options, SlotSupplier slotSupplier, - @Nonnull AtomicBoolean serverSupportsAutoscaling) { + @Nonnull NamespaceCapabilities namespaceCapabilities) { this.identity = options.getIdentity(); this.namespace = namespace; this.taskQueue = taskQueue; @@ -47,7 +46,7 @@ public SyncNexusWorker( taskHandler, options.getDataConverter(), slotSupplier, - serverSupportsAutoscaling); + namespaceCapabilities); } @Override @@ -98,6 +97,22 @@ public boolean isTerminated() { return worker.isTerminated(); } + public TrackingSlotSupplier getSlotSupplier() { + return worker.getSlotSupplier(); + } + + public TaskCounter getTaskCounter() { + return worker.getTaskCounter(); + } + + public PollerOptions getPollerOptions() { + return worker.getPollerOptions(); + } + + public PollerTracker getPollerTracker() { + return worker.getPollerTracker(); + } + @Override public WorkerLifecycleState getLifecycleState() { return worker.getLifecycleState(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java index 2b94effb6..ed8efafe9 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java @@ -3,7 +3,9 @@ import static io.temporal.internal.common.InternalUtils.createStickyTaskQueue; import io.temporal.api.common.v1.Payloads; +import io.temporal.api.enums.v1.TaskQueueType; import io.temporal.api.taskqueue.v1.TaskQueue; +import io.temporal.api.worker.v1.WorkerHeartbeat; import io.temporal.client.WorkflowClient; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.EncodedValues; @@ -22,10 +24,11 @@ import io.temporal.workflow.Functions.Func1; import java.lang.reflect.Type; import java.time.Duration; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -61,6 +64,8 @@ public SyncWorkflowWorker( @Nonnull WorkflowClient client, @Nonnull String namespace, @Nonnull String taskQueue, + @Nonnull String workerInstanceKey, + @Nonnull Supplier> activeTaskQueueTypesSupplier, @Nonnull SingleWorkerOptions singleWorkerOptions, @Nonnull SingleWorkerOptions localActivityOptions, @Nonnull WorkflowRunLockManager runLocks, @@ -70,7 +75,7 @@ public SyncWorkflowWorker( @Nonnull EagerActivityDispatcher eagerActivityDispatcher, @Nonnull SlotSupplier slotSupplier, @Nonnull SlotSupplier laSlotSupplier, - @Nonnull AtomicBoolean serverSupportsAutoscaling) { + @Nonnull NamespaceCapabilities namespaceCapabilities) { this.identity = singleWorkerOptions.getIdentity(); this.namespace = namespace; this.taskQueue = taskQueue; @@ -118,6 +123,8 @@ public SyncWorkflowWorker( client.getWorkflowServiceStubs(), namespace, taskQueue, + workerInstanceKey, + activeTaskQueueTypesSupplier, stickyTaskQueueName, singleWorkerOptions, runLocks, @@ -125,7 +132,7 @@ public SyncWorkflowWorker( taskHandler, eagerActivityDispatcher, slotSupplier, - serverSupportsAutoscaling); + namespaceCapabilities); // Exists to support Worker#replayWorkflowExecution functionality. // This handler has to be non-sticky to avoid evicting actual executions from the cache @@ -235,7 +242,46 @@ public WorkerLifecycleState getLifecycleState() { return null; } - @Override + public TrackingSlotSupplier getWorkflowSlotSupplier() { + return workflowWorker.getSlotSupplier(); + } + + public TrackingSlotSupplier getLocalActivitySlotSupplier() { + return laWorker.getSlotSupplier(); + } + + public void setHeartbeatSupplier(Supplier supplier) { + workflowWorker.setHeartbeatSupplier(supplier); + } + + public boolean hasStickyQueue() { + return workflowWorker.hasStickyQueue(); + } + + public TaskCounter getWorkflowTaskCounter() { + return workflowWorker.getTaskCounter(); + } + + public TaskCounter getLocalActivityTaskCounter() { + return laWorker.getTaskCounter(); + } + + public PollerOptions getWorkflowPollerOptions() { + return workflowWorker.getPollerOptions(); + } + + public PollerTracker getWorkflowPollerTracker() { + return workflowWorker.getPollerTracker(); + } + + public PollerTracker getStickyPollerTracker() { + return workflowWorker.getStickyPollerTracker(); + } + + public String getStickyTaskQueueName() { + return workflowWorker.getStickyTaskQueueName(); + } + public String toString() { return String.format( "SyncWorkflowWorker{namespace=%s, taskQueue=%s, identity=%s}", diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/TaskCounter.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/TaskCounter.java new file mode 100644 index 000000000..991e78cf6 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/TaskCounter.java @@ -0,0 +1,25 @@ +package io.temporal.internal.worker; + +import java.util.concurrent.atomic.AtomicInteger; + +/** Tracks total processed and failed task counts for a worker. */ +public final class TaskCounter { + private final AtomicInteger totalProcessed = new AtomicInteger(); + private final AtomicInteger totalFailed = new AtomicInteger(); + + void recordProcessed() { + totalProcessed.incrementAndGet(); + } + + void recordFailed() { + totalFailed.incrementAndGet(); + } + + public int getTotalProcessed() { + return totalProcessed.get(); + } + + public int getTotalFailed() { + return totalFailed.get(); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java index ccd85a469..4bbee2aa1 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java @@ -24,13 +24,29 @@ public class TrackingSlotSupplier { private final AtomicInteger issuedSlots = new AtomicInteger(); private final Map usedSlots = new ConcurrentHashMap<>(); private final Scope metricsScope; + private final String supplierKind; public TrackingSlotSupplier(SlotSupplier inner, Scope metricsScope) { this.inner = inner; this.metricsScope = metricsScope; + this.supplierKind = determineSupplierKind(inner); publishSlotsMetric(); } + private static String determineSupplierKind(SlotSupplier supplier) { + if (supplier instanceof FixedSizeSlotSupplier) return "Fixed"; + if (supplier instanceof ResourceBasedSlotSupplier) return "ResourceBased"; + return supplier.getClass().getSimpleName(); + } + + public String getSupplierKind() { + return supplierKind; + } + + public int getUsedSlotCount() { + return usedSlots.size(); + } + public SlotSupplierFuture reserveSlot(SlotReservationData data) { final SlotSupplierFuture future; try { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowExecutorCache.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowExecutorCache.java index 69ed8beb4..bdd4c9901 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowExecutorCache.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowExecutorCache.java @@ -12,6 +12,7 @@ import io.temporal.worker.MetricsType; import java.util.Objects; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.slf4j.Logger; @@ -23,6 +24,8 @@ public final class WorkflowExecutorCache { private final WorkflowRunLockManager runLockManager; private final Cache cache; private final Scope metricsScope; + private final AtomicInteger cacheHits = new AtomicInteger(); + private final AtomicInteger cacheMisses = new AtomicInteger(); public WorkflowExecutorCache( int workflowCacheSize, WorkflowRunLockManager runLockManager, Scope scope) { @@ -77,6 +80,7 @@ public WorkflowRunTaskHandler getOrCreate( if (workflowRunTaskHandler != null) { workflowTypeScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1); + cacheHits.incrementAndGet(); return workflowRunTaskHandler; } @@ -85,6 +89,7 @@ public WorkflowRunTaskHandler getOrCreate( execution.getWorkflowId(), runId); workflowTypeScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1); + cacheMisses.incrementAndGet(); return workflowExecutorFn.call(); } @@ -166,4 +171,16 @@ public void invalidateAll() { cache.invalidateAll(); metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size()); } + + public int getCacheHits() { + return cacheHits.get(); + } + + public int getCacheMisses() { + return cacheMisses.get(); + } + + public int getCurrentCacheSize() { + return (int) cache.size(); + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java index fa5e3cc79..cdb5e5163 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java @@ -21,7 +21,6 @@ import io.temporal.worker.tuning.SlotSupplierFuture; import io.temporal.worker.tuning.WorkflowSlotInfo; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -38,8 +37,8 @@ final class WorkflowPollTask implements MultiThreadedPoller.PollTask slotSupplier, @Nonnull StickyQueueBalancer stickyQueueBalancer, @Nonnull Scope workerMetricsScope, - @Nonnull Supplier serverCapabilities) { + @Nonnull Supplier serverCapabilities, + @Nonnull PollerTracker pollerTracker, + @Nonnull PollerTracker stickyPollerTracker) { this.slotSupplier = Objects.requireNonNull(slotSupplier); this.stickyQueueBalancer = Objects.requireNonNull(stickyQueueBalancer); this.metricsScope = Objects.requireNonNull(workerMetricsScope); + this.pollerTracker = Objects.requireNonNull(pollerTracker); + this.stickyPollerTracker = Objects.requireNonNull(stickyPollerTracker); this.stickyMetricsScope = workerMetricsScope.tagged( new ImmutableMap.Builder(1) @@ -133,16 +136,17 @@ public WorkflowTask poll() { boolean isSticky = TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(taskQueueKind); PollWorkflowTaskQueueRequest request = isSticky ? stickyPollRequest : pollRequest; Scope scope = isSticky ? stickyMetricsScope : metricsScope; + PollerTracker tracker = isSticky ? stickyPollerTracker : pollerTracker; log.trace("poll request begin: {}", request); if (isSticky) { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.WORKFLOW_STICKY_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(stickyPollGauge.incrementAndGet()); + .update(stickyPollerTracker.pollStarted()); } else { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.WORKFLOW_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(normalPollGauge.incrementAndGet()); + .update(pollerTracker.pollStarted()); } try { @@ -151,19 +155,19 @@ public WorkflowTask poll() { return null; } isSuccessful = true; + tracker.pollSucceeded(); stickyQueueBalancer.finishPoll(taskQueueKind, response.getBacklogCountHint()); slotSupplier.markSlotUsed(new WorkflowSlotInfo(response, pollRequest), permit); return new WorkflowTask(response, (rr) -> slotSupplier.releaseSlot(rr, permit)); } finally { - if (isSticky) { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.WORKFLOW_STICKY_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(stickyPollGauge.decrementAndGet()); + .update(stickyPollerTracker.pollCompleted()); } else { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.WORKFLOW_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(normalPollGauge.decrementAndGet()); + .update(pollerTracker.pollCompleted()); } if (!isSuccessful) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java index 4986808c4..f98316d5d 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java @@ -13,8 +13,11 @@ import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.QueryResultType; import io.temporal.api.enums.v1.TaskQueueKind; +import io.temporal.api.enums.v1.TaskQueueType; +import io.temporal.api.enums.v1.WorkerStatus; import io.temporal.api.enums.v1.WorkflowTaskFailedCause; import io.temporal.api.failure.v1.Failure; +import io.temporal.api.worker.v1.WorkerHeartbeat; import io.temporal.api.workflowservice.v1.*; import io.temporal.failure.ApplicationFailure; import io.temporal.internal.logging.LoggerTag; @@ -30,7 +33,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -55,7 +58,14 @@ final class WorkflowWorker implements SuspendableWorker { private final GrpcRetryer grpcRetryer; private final EagerActivityDispatcher eagerActivityDispatcher; private final TrackingSlotSupplier slotSupplier; - private final AtomicBoolean serverSupportsAutoscaling; + private volatile Supplier heartbeatSupplier; + private final String workerInstanceKey; + private final Supplier> activeTaskQueueTypesSupplier; + + private final TaskCounter taskCounter = new TaskCounter(); + private final PollerTracker pollerTracker = new PollerTracker(); + private final PollerTracker stickyPollerTracker = new PollerTracker(); + private final NamespaceCapabilities namespaceCapabilities; private PollTaskExecutor pollTaskExecutor; @@ -69,6 +79,8 @@ public WorkflowWorker( @Nonnull WorkflowServiceStubs service, @Nonnull String namespace, @Nonnull String taskQueue, + @Nonnull String workerInstanceKey, + @Nonnull Supplier> activeTaskQueueTypesSupplier, @Nullable String stickyTaskQueueName, @Nonnull SingleWorkerOptions options, @Nonnull WorkflowRunLockManager runLocks, @@ -76,10 +88,12 @@ public WorkflowWorker( @Nonnull WorkflowTaskHandler handler, @Nonnull EagerActivityDispatcher eagerActivityDispatcher, @Nonnull SlotSupplier slotSupplier, - @Nonnull AtomicBoolean serverSupportsAutoscaling) { + @Nonnull NamespaceCapabilities namespaceCapabilities) { this.service = Objects.requireNonNull(service); this.namespace = Objects.requireNonNull(namespace); this.taskQueue = Objects.requireNonNull(taskQueue); + this.workerInstanceKey = Objects.requireNonNull(workerInstanceKey); + this.activeTaskQueueTypesSupplier = Objects.requireNonNull(activeTaskQueueTypesSupplier); this.options = Objects.requireNonNull(options); this.stickyTaskQueueName = stickyTaskQueueName; this.pollerOptions = getPollerOptions(options); @@ -91,7 +105,7 @@ public WorkflowWorker( this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities()); this.eagerActivityDispatcher = eagerActivityDispatcher; this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope); - this.serverSupportsAutoscaling = serverSupportsAutoscaling; + this.namespaceCapabilities = namespaceCapabilities; } @Override @@ -122,7 +136,8 @@ public boolean start() { options.getWorkerVersioningOptions(), slotSupplier, workerMetricsScope, - service.getServerCapabilities()); + service.getServerCapabilities(), + pollerTracker); pollers = Arrays.asList( new AsyncWorkflowPollTask( @@ -134,7 +149,8 @@ public boolean start() { options.getWorkerVersioningOptions(), slotSupplier, workerMetricsScope, - service.getServerCapabilities()), + service.getServerCapabilities(), + stickyPollerTracker), normalPoller); this.stickyQueueBalancer = normalPoller; } else { @@ -149,7 +165,8 @@ public boolean start() { options.getWorkerVersioningOptions(), slotSupplier, workerMetricsScope, - service.getServerCapabilities())); + service.getServerCapabilities(), + pollerTracker)); } poller = new AsyncPoller<>( @@ -158,7 +175,7 @@ public boolean start() { pollers, this.pollTaskExecutor, pollerOptions, - serverSupportsAutoscaling.get(), + namespaceCapabilities.isPollerAutoscaling(), workerMetricsScope); } else { PollerBehaviorSimpleMaximum pollerBehavior = @@ -180,7 +197,9 @@ public boolean start() { slotSupplier, stickyQueueBalancer, workerMetricsScope, - service.getServerCapabilities()), + service.getServerCapabilities(), + pollerTracker, + stickyPollerTracker), pollTaskExecutor, pollerOptions, workerMetricsScope); @@ -216,19 +235,25 @@ public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean return CompletableFuture.allOf( pollerShutdown.thenCompose( ignore -> { - if (!interruptTasks && stickyTaskQueueName != null) { - return shutdownManager.waitOnWorkerShutdownRequest( - service - .futureStub() - .shutdownWorker( - ShutdownWorkerRequest.newBuilder() - .setIdentity(options.getIdentity()) - .setNamespace(namespace) - .setStickyTaskQueue(stickyTaskQueueName) - .setReason(GRACEFUL_SHUTDOWN_MESSAGE) - .build())); + ShutdownWorkerRequest.Builder shutdownReq = + ShutdownWorkerRequest.newBuilder() + .setIdentity(options.getIdentity()) + .setNamespace(namespace) + .setTaskQueue(taskQueue) + .setWorkerInstanceKey(workerInstanceKey) + .setReason(GRACEFUL_SHUTDOWN_MESSAGE) + .addAllTaskQueueTypes(activeTaskQueueTypesSupplier.get()); + if (stickyTaskQueueName != null) { + shutdownReq.setStickyTaskQueue(stickyTaskQueueName); + } + if (heartbeatSupplier != null) { + shutdownReq.setWorkerHeartbeat( + heartbeatSupplier.get().toBuilder() + .setStatus(WorkerStatus.WORKER_STATUS_SHUTTING_DOWN) + .build()); } - return CompletableFuture.completedFuture(null); + return shutdownManager.waitOnWorkerShutdownRequest( + service.futureStub().shutdownWorker(shutdownReq.build())); }), pollerShutdown .thenCompose( @@ -338,6 +363,38 @@ public WorkflowTaskDispatchHandle reserveWorkflowExecutor() { .orElse(null); } + public void setHeartbeatSupplier(Supplier supplier) { + this.heartbeatSupplier = supplier; + } + + public TrackingSlotSupplier getSlotSupplier() { + return slotSupplier; + } + + public boolean hasStickyQueue() { + return stickyTaskQueueName != null; + } + + public String getStickyTaskQueueName() { + return stickyTaskQueueName; + } + + public TaskCounter getTaskCounter() { + return taskCounter; + } + + public PollerOptions getPollerOptions() { + return pollerOptions; + } + + public PollerTracker getPollerTracker() { + return pollerTracker; + } + + public PollerTracker getStickyPollerTracker() { + return stickyPollerTracker; + } + @Override public String toString() { return String.format( @@ -405,139 +462,152 @@ public void handle(WorkflowTask task) throws Exception { do { PollWorkflowTaskQueueResponse currentTask = nextWFTResponse.get(); nextWFTResponse = Optional.empty(); - WorkflowTaskHandler.Result result = handleTask(currentTask, workflowTypeScope); - WorkflowTaskFailedCause taskFailedCause = null; + boolean iterationFailed = false; try { - RespondWorkflowTaskCompletedRequest taskCompleted = result.getTaskCompleted(); - RespondWorkflowTaskFailedRequest taskFailed = result.getTaskFailed(); - RespondQueryTaskCompletedRequest queryCompleted = result.getQueryCompleted(); - - if (queryCompleted != null) { - try { - sendDirectQueryCompletedResponse( - currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope); - } catch (StatusRuntimeException e) { - GrpcMessageTooLargeException tooLargeException = - GrpcMessageTooLargeException.tryWrap(e); - if (tooLargeException == null) { - throw e; + WorkflowTaskHandler.Result result = handleTask(currentTask, workflowTypeScope); + WorkflowTaskFailedCause taskFailedCause = null; + try { + RespondWorkflowTaskCompletedRequest taskCompleted = result.getTaskCompleted(); + RespondWorkflowTaskFailedRequest taskFailed = result.getTaskFailed(); + RespondQueryTaskCompletedRequest queryCompleted = result.getQueryCompleted(); + + if (queryCompleted != null) { + try { + sendDirectQueryCompletedResponse( + currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope); + } catch (StatusRuntimeException e) { + GrpcMessageTooLargeException tooLargeException = + GrpcMessageTooLargeException.tryWrap(e); + if (tooLargeException == null) { + throw e; + } + Failure failure = + grpcMessageTooLargeFailure( + workflowExecution.getWorkflowId(), + tooLargeException, + "Failed to send query response"); + RespondQueryTaskCompletedRequest.Builder queryFailedBuilder = + RespondQueryTaskCompletedRequest.newBuilder() + .setTaskToken(currentTask.getTaskToken()) + .setNamespace(namespace) + .setCompletedType(QueryResultType.QUERY_RESULT_TYPE_FAILED) + .setErrorMessage(failure.getMessage()) + .setFailure(failure); + sendDirectQueryCompletedResponse( + currentTask.getTaskToken(), queryFailedBuilder, workflowTypeScope); } - Failure failure = - grpcMessageTooLargeFailure( - workflowExecution.getWorkflowId(), - tooLargeException, - "Failed to send query response"); - RespondQueryTaskCompletedRequest.Builder queryFailedBuilder = - RespondQueryTaskCompletedRequest.newBuilder() - .setTaskToken(currentTask.getTaskToken()) - .setNamespace(namespace) - .setCompletedType(QueryResultType.QUERY_RESULT_TYPE_FAILED) - .setErrorMessage(failure.getMessage()) - .setFailure(failure); - sendDirectQueryCompletedResponse( - currentTask.getTaskToken(), queryFailedBuilder, workflowTypeScope); - } - } else { - try { - if (taskCompleted != null) { - RespondWorkflowTaskCompletedRequest.Builder requestBuilder = - taskCompleted.toBuilder(); - try (EagerActivitySlotsReservation activitySlotsReservation = - new EagerActivitySlotsReservation(eagerActivityDispatcher)) { - activitySlotsReservation.applyToRequest(requestBuilder); - RespondWorkflowTaskCompletedResponse response = - sendTaskCompleted( - currentTask.getTaskToken(), - requestBuilder, - result.getRequestRetryOptions(), - workflowTypeScope); - // If we were processing a speculative WFT the server may instruct us that the - // task was dropped by resting out event ID. - long resetEventId = response.getResetHistoryEventId(); - if (resetEventId != 0) { - result.getResetEventIdHandle().apply(resetEventId); + } else { + try { + if (taskCompleted != null) { + RespondWorkflowTaskCompletedRequest.Builder requestBuilder = + taskCompleted.toBuilder(); + try (EagerActivitySlotsReservation activitySlotsReservation = + new EagerActivitySlotsReservation(eagerActivityDispatcher)) { + activitySlotsReservation.applyToRequest(requestBuilder); + RespondWorkflowTaskCompletedResponse response = + sendTaskCompleted( + currentTask.getTaskToken(), + requestBuilder, + result.getRequestRetryOptions(), + workflowTypeScope); + // If we were processing a speculative WFT the server may instruct us that the + // task was dropped by resting out event ID. + long resetEventId = response.getResetHistoryEventId(); + if (resetEventId != 0) { + result.getResetEventIdHandle().apply(resetEventId); + } + nextWFTResponse = + response.hasWorkflowTask() + ? Optional.of(response.getWorkflowTask()) + : Optional.empty(); + // TODO we don't have to do this under the runId lock + activitySlotsReservation.handleResponse(response); } - nextWFTResponse = - response.hasWorkflowTask() - ? Optional.of(response.getWorkflowTask()) - : Optional.empty(); - // TODO we don't have to do this under the runId lock - activitySlotsReservation.handleResponse(response); + } else if (taskFailed != null) { + taskFailedCause = taskFailed.getCause(); + sendTaskFailed( + currentTask.getTaskToken(), + taskFailed.toBuilder(), + result.getRequestRetryOptions(), + workflowTypeScope); } - } else if (taskFailed != null) { - taskFailedCause = taskFailed.getCause(); + + // Apply post-completion metrics only if runnable present and the above succeeded + if (result.getApplyPostCompletionMetrics() != null) { + result.getApplyPostCompletionMetrics().run(); + } + } catch (GrpcMessageTooLargeException e) { + // Only fail workflow task on the first attempt, subsequent failures of the same + // workflow task should timeout. + if (currentTask.getAttempt() > 1) { + throw e; + } + + releaseReason = SlotReleaseReason.error(e); + handleReportingFailure( + e, currentTask, result, workflowExecution, workflowTypeScope); + // setting/replacing failure cause for metrics purposes + taskFailedCause = + WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE; + + String messagePrefix = + String.format( + "Failed to send workflow task %s", + taskFailed == null ? "completion" : "failure"); + RespondWorkflowTaskFailedRequest.Builder taskFailedBuilder = + RespondWorkflowTaskFailedRequest.newBuilder() + .setFailure( + grpcMessageTooLargeFailure( + workflowExecution.getWorkflowId(), e, messagePrefix)) + .setCause( + WorkflowTaskFailedCause + .WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE); sendTaskFailed( currentTask.getTaskToken(), - taskFailed.toBuilder(), + taskFailedBuilder, result.getRequestRetryOptions(), workflowTypeScope); } + } + } catch (Exception e) { + iterationFailed = true; + releaseReason = SlotReleaseReason.error(e); + handleReportingFailure(e, currentTask, result, workflowExecution, workflowTypeScope); + throw e; + } - // Apply post-completion metrics only if runnable present and the above succeeded - if (result.getApplyPostCompletionMetrics() != null) { - result.getApplyPostCompletionMetrics().run(); - } - } catch (GrpcMessageTooLargeException e) { - // Only fail workflow task on the first attempt, subsequent failures of the same - // workflow task should timeout. - if (currentTask.getAttempt() > 1) { - throw e; - } - - releaseReason = SlotReleaseReason.error(e); - handleReportingFailure( - e, currentTask, result, workflowExecution, workflowTypeScope); - // setting/replacing failure cause for metrics purposes - taskFailedCause = - WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE; - - String messagePrefix = - String.format( - "Failed to send workflow task %s", - taskFailed == null ? "completion" : "failure"); - RespondWorkflowTaskFailedRequest.Builder taskFailedBuilder = - RespondWorkflowTaskFailedRequest.newBuilder() - .setFailure( - grpcMessageTooLargeFailure( - workflowExecution.getWorkflowId(), e, messagePrefix)) - .setCause( - WorkflowTaskFailedCause - .WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE); - sendTaskFailed( - currentTask.getTaskToken(), - taskFailedBuilder, - result.getRequestRetryOptions(), - workflowTypeScope); + if (taskFailedCause != null) { + iterationFailed = true; + String taskFailureType; + switch (taskFailedCause) { + case WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR: + taskFailureType = "NonDeterminismError"; + break; + case WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE: + taskFailureType = "GrpcMessageTooLarge"; + break; + default: + taskFailureType = "WorkflowError"; } + Scope workflowTaskFailureScope = + workflowTypeScope.tagged(ImmutableMap.of(TASK_FAILURE_TYPE, taskFailureType)); + // we don't trigger the counter in case of the legacy query + // (which never has taskFailed set) + workflowTaskFailureScope + .counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER) + .inc(1); + } + if (nextWFTResponse.isPresent()) { + workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_HEARTBEAT_COUNTER).inc(1); } } catch (Exception e) { - releaseReason = SlotReleaseReason.error(e); - handleReportingFailure(e, currentTask, result, workflowExecution, workflowTypeScope); + iterationFailed = true; throw e; - } - - if (taskFailedCause != null) { - String taskFailureType; - switch (taskFailedCause) { - case WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR: - taskFailureType = "NonDeterminismError"; - break; - case WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE: - taskFailureType = "GrpcMessageTooLarge"; - break; - default: - taskFailureType = "WorkflowError"; + } finally { + taskCounter.recordProcessed(); + if (iterationFailed) { + taskCounter.recordFailed(); } - Scope workflowTaskFailureScope = - workflowTypeScope.tagged(ImmutableMap.of(TASK_FAILURE_TYPE, taskFailureType)); - // we don't trigger the counter in case of the legacy query - // (which never has taskFailed set) - workflowTaskFailureScope - .counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER) - .inc(1); - } - if (nextWFTResponse.isPresent()) { - workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_HEARTBEAT_COUNTER).inc(1); } } while (nextWFTResponse.isPresent()); } finally { diff --git a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java index 5ffd300e9..4f309630a 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java @@ -5,6 +5,14 @@ import com.google.common.base.Strings; import com.uber.m3.tally.Scope; import com.uber.m3.util.ImmutableMap; +import io.temporal.api.deployment.v1.WorkerDeploymentVersion; +import io.temporal.api.enums.v1.TaskQueueType; +import io.temporal.api.enums.v1.WorkerStatus; +import io.temporal.api.worker.v1.PluginInfo; +import io.temporal.api.worker.v1.WorkerHeartbeat; +import io.temporal.api.worker.v1.WorkerHostInfo; +import io.temporal.api.worker.v1.WorkerPollerInfo; +import io.temporal.api.worker.v1.WorkerSlotsInfo; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowClientOptions; import io.temporal.common.Experimental; @@ -16,12 +24,19 @@ import io.temporal.internal.sync.WorkflowInternal; import io.temporal.internal.sync.WorkflowThreadExecutor; import io.temporal.internal.worker.*; +import io.temporal.internal.worker.TaskCounter; import io.temporal.serviceclient.MetricsTag; +import io.temporal.serviceclient.Version; import io.temporal.worker.tuning.*; import io.temporal.workflow.Functions; import io.temporal.workflow.Functions.Func; import io.temporal.workflow.WorkflowMethod; +import java.lang.management.ManagementFactory; +import java.net.InetAddress; import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -29,6 +44,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -47,6 +64,23 @@ public final class Worker { final SyncActivityWorker activityWorker; final SyncNexusWorker nexusWorker; private final AtomicBoolean started = new AtomicBoolean(); + private volatile boolean shuttingDown = false; + private boolean hasNexusServices = false; + private final String workerInstanceKey = UUID.randomUUID().toString(); + private final Instant startTime = Instant.now(); + private final WorkflowClientOptions clientOptions; + private final @Nonnull WorkflowExecutorCache cache; + private final Map previousSnapshots = new HashMap<>(); + + private static final class TaskSnapshot { + final int processed; + final int failed; + + TaskSnapshot(int processed, int failed) { + this.processed = processed; + this.failed = failed; + } + } /** * Creates worker that connects to an instance of the Temporal Service. @@ -70,7 +104,7 @@ public final class Worker { WorkflowThreadExecutor workflowThreadExecutor, List contextPropagators, @Nonnull List plugins, - @Nonnull AtomicBoolean serverSupportsAutoscaling) { + @Nonnull NamespaceCapabilities namespaceCapabilities) { Objects.requireNonNull(client, "client should not be null"); this.plugins = Objects.requireNonNull(plugins, "plugins should not be null"); @@ -78,6 +112,8 @@ public final class Worker { !Strings.isNullOrEmpty(taskQueue), "taskQueue should not be an empty string"); this.taskQueue = taskQueue; this.options = WorkerOptions.newBuilder(options).validateAndBuildWithDefaults(); + this.clientOptions = client.getOptions(); + this.cache = cache; factoryOptions = WorkerFactoryOptions.newBuilder(factoryOptions).validateAndBuildWithDefaults(); WorkflowClientOptions clientOptions = client.getOptions(); String namespace = clientOptions.getNamespace(); @@ -104,7 +140,7 @@ public final class Worker { this.options.getMaxTaskQueueActivitiesPerSecond(), activityOptions, activitySlotSupplier, - serverSupportsAutoscaling); + namespaceCapabilities); } EagerActivityDispatcher eagerActivityDispatcher = @@ -123,12 +159,7 @@ public final class Worker { nexusWorker = new SyncNexusWorker( - client, - namespace, - taskQueue, - nexusOptions, - nexusSlotSupplier, - serverSupportsAutoscaling); + client, namespace, taskQueue, nexusOptions, nexusSlotSupplier, namespaceCapabilities); SingleWorkerOptions singleWorkerOptions = toWorkflowWorkerOptions( @@ -158,6 +189,8 @@ public final class Worker { client, namespace, taskQueue, + workerInstanceKey, + this::getActiveTaskQueueTypes, singleWorkerOptions, localActivityOptions, runLocks, @@ -167,7 +200,7 @@ public final class Worker { eagerActivityDispatcher, workflowSlotSupplier, localActivitySlotSupplier, - serverSupportsAutoscaling); + namespaceCapabilities); } /** @@ -411,6 +444,7 @@ public void registerNexusServiceImplementation(Object... nexusServiceImplementat !started.get(), "registerNexusServiceImplementation is not allowed after worker has started"); nexusWorker.registerNexusServiceImplementation(nexusServiceImplementations); + hasNexusServices = true; } void start() { @@ -425,6 +459,7 @@ void start() { } CompletableFuture shutdown(ShutdownManager shutdownManager, boolean interruptUserTasks) { + shuttingDown = true; CompletableFuture workflowWorkerShutdownFuture = workflowWorker.shutdown(shutdownManager, interruptUserTasks); CompletableFuture nexusWorkerShutdownFuture = @@ -457,6 +492,198 @@ void awaitTermination(long timeout, TimeUnit unit) { ShutdownManager.awaitTermination(workflowWorker, timeoutMillis); } + String getWorkerInstanceKey() { + return workerInstanceKey; + } + + List getActiveTaskQueueTypes() { + List types = new ArrayList<>(); + types.add(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW); + if (activityWorker != null) { + types.add(TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY); + } + if (hasNexusServices) { + types.add(TaskQueueType.TASK_QUEUE_TYPE_NEXUS); + } + return types; + } + + Supplier buildHeartbeatCallback(String workerGroupingKey) { + // The callback can be invoked concurrently from the heartbeat scheduler and the shutdown path + final Object callbackLock = new Object(); + final AtomicReference lastHeartbeatTime = new AtomicReference<>(null); + return () -> { + synchronized (callbackLock) { + Instant now = Instant.now(); + WorkerHeartbeat.Builder hb = + WorkerHeartbeat.newBuilder() + .setWorkerInstanceKey(workerInstanceKey) + .setWorkerIdentity( + options.getIdentity() != null + ? options.getIdentity() + : clientOptions.getIdentity()) + .setTaskQueue(taskQueue) + .setSdkName(Version.SDK_NAME) + .setSdkVersion(Version.LIBRARY_VERSION) + .setStatus( + shuttingDown + ? WorkerStatus.WORKER_STATUS_SHUTTING_DOWN + : WorkerStatus.WORKER_STATUS_RUNNING) + .setStartTime(toProtoTimestamp(startTime)) + .setHeartbeatTime(toProtoTimestamp(now)); + + Instant previousHeartbeat = lastHeartbeatTime.get(); + if (previousHeartbeat != null) { + Duration elapsed = Duration.between(previousHeartbeat, now); + hb.setElapsedSinceLastHeartbeat( + com.google.protobuf.Duration.newBuilder() + .setSeconds(elapsed.getSeconds()) + .setNanos(elapsed.getNano()) + .build()); + } + lastHeartbeatTime.set(now); + + // Deployment version + if (options.getDeploymentOptions() != null + && options.getDeploymentOptions().getVersion() != null) { + hb.setDeploymentVersion( + WorkerDeploymentVersion.newBuilder() + .setDeploymentName( + options.getDeploymentOptions().getVersion().getDeploymentName()) + .setBuildId(options.getDeploymentOptions().getVersion().getBuildId()) + .build()); + } + + hb.setHostInfo(buildHostInfo(workerGroupingKey)); + + // Slot info with task counters + hb.setWorkflowTaskSlotsInfo( + buildSlotsInfo( + "workflow", + workflowWorker.getWorkflowSlotSupplier(), + workflowWorker.getWorkflowTaskCounter())); + + if (activityWorker != null) { + hb.setActivityTaskSlotsInfo( + buildSlotsInfo( + "activity", activityWorker.getSlotSupplier(), activityWorker.getTaskCounter())); + } + + hb.setLocalActivitySlotsInfo( + buildSlotsInfo( + "local-activity", + workflowWorker.getLocalActivitySlotSupplier(), + workflowWorker.getLocalActivityTaskCounter())); + + hb.setNexusTaskSlotsInfo( + buildSlotsInfo("nexus", nexusWorker.getSlotSupplier(), nexusWorker.getTaskCounter())); + + // Poller info + hb.setWorkflowPollerInfo( + buildPollerInfo( + workflowWorker.getWorkflowPollerOptions(), + workflowWorker.getWorkflowPollerTracker())); + if (workflowWorker.getStickyTaskQueueName() != null) { + hb.setWorkflowStickyPollerInfo( + buildPollerInfo( + workflowWorker.getWorkflowPollerOptions(), + workflowWorker.getStickyPollerTracker())); + } + if (activityWorker != null) { + hb.setActivityPollerInfo( + buildPollerInfo( + activityWorker.getPollerOptions(), activityWorker.getPollerTracker())); + } + hb.setNexusPollerInfo( + buildPollerInfo(nexusWorker.getPollerOptions(), nexusWorker.getPollerTracker())); + + // Sticky cache stats + hb.setTotalStickyCacheHit(cache.getCacheHits()); + hb.setTotalStickyCacheMiss(cache.getCacheMisses()); + hb.setCurrentStickyCacheSize(cache.getCurrentCacheSize()); + + // Plugins + for (WorkerPlugin plugin : plugins) { + hb.addPlugins(PluginInfo.newBuilder().setName(plugin.getName()).build()); + } + + return hb.build(); + } + }; + } + + private WorkerSlotsInfo buildSlotsInfo( + String key, TrackingSlotSupplier tracker, TaskCounter taskCounter) { + int maxSlots = tracker.maximumSlots().orElse(-1); + int usedSlots = tracker.getUsedSlotCount(); + int currentProcessed = taskCounter.getTotalProcessed(); + int currentFailed = taskCounter.getTotalFailed(); + TaskSnapshot previous = + previousSnapshots.put(key, new TaskSnapshot(currentProcessed, currentFailed)); + int intervalProcessed = previous != null ? currentProcessed - previous.processed : 0; + int intervalFailed = previous != null ? currentFailed - previous.failed : 0; + return WorkerSlotsInfo.newBuilder() + .setCurrentAvailableSlots(maxSlots >= 0 ? Math.max(0, maxSlots - usedSlots) : -1) + .setCurrentUsedSlots(usedSlots) + .setSlotSupplierKind(tracker.getSupplierKind()) + .setTotalProcessedTasks(currentProcessed) + .setTotalFailedTasks(currentFailed) + .setLastIntervalProcessedTasks(intervalProcessed) + .setLastIntervalFailureTasks(intervalFailed) + .build(); + } + + private static WorkerPollerInfo buildPollerInfo( + PollerOptions pollerOptions, PollerTracker tracker) { + WorkerPollerInfo.Builder builder = WorkerPollerInfo.newBuilder(); + PollerBehavior behavior = pollerOptions.getPollerBehavior(); + if (behavior instanceof PollerBehaviorAutoscaling) { + builder.setIsAutoscaling(true); + } + builder.setCurrentPollers(tracker.getInFlightPolls()); + Instant lastPoll = tracker.getLastSuccessfulPollTime(); + if (lastPoll != null) { + builder.setLastSuccessfulPollTime(toProtoTimestamp(lastPoll)); + } + return builder.build(); + } + + private static final JVMSystemResourceInfo systemResourceInfo = new JVMSystemResourceInfo(); + + private static final String CACHED_HOSTNAME; + private static final String CACHED_PID; + + static { + String h; + try { + h = InetAddress.getLocalHost().getHostName(); + } catch (Exception e) { + h = "unknown"; + } + CACHED_HOSTNAME = h; + + String name = ManagementFactory.getRuntimeMXBean().getName(); + int atIndex = name.indexOf('@'); + CACHED_PID = atIndex > 0 ? name.substring(0, atIndex) : "unknown"; + } + + private static WorkerHostInfo buildHostInfo(String workerGroupingKey) { + return WorkerHostInfo.newBuilder() + .setHostName(CACHED_HOSTNAME) + .setWorkerGroupingKey(workerGroupingKey) + .setProcessId(CACHED_PID) + .setCurrentHostCpuUsage((float) systemResourceInfo.getCPUUsagePercent()) + .setCurrentHostMemUsage((float) systemResourceInfo.getMemoryUsagePercent()) + .build(); + } + + private static com.google.protobuf.Timestamp toProtoTimestamp(Instant instant) { + return com.google.protobuf.Timestamp.newBuilder() + .setSeconds(instant.getEpochSecond()) + .setNanos(instant.getNano()) + .build(); + } + @Override public String toString() { return "Worker{" + "options=" + options + '}'; diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java index 3a22b7351..32f795691 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java @@ -4,6 +4,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.uber.m3.tally.Scope; +import io.temporal.api.worker.v1.WorkerHeartbeat; import io.temporal.api.workflowservice.v1.DescribeNamespaceRequest; import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse; import io.temporal.client.WorkflowClient; @@ -13,6 +14,8 @@ import io.temporal.internal.common.PluginUtils; import io.temporal.internal.sync.WorkflowThreadExecutor; import io.temporal.internal.task.VirtualThreadDelegate; +import io.temporal.internal.worker.HeartbeatManager; +import io.temporal.internal.worker.NamespaceCapabilities; import io.temporal.internal.worker.ShutdownManager; import io.temporal.internal.worker.WorkflowExecutorCache; import io.temporal.internal.worker.WorkflowRunLockManager; @@ -29,10 +32,10 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -58,8 +61,8 @@ public final class WorkerFactory { /** Plugins propagated from the client and applied to this factory. */ private final List plugins; - /** Set during start() if the namespace has the poller_autoscaling capability. */ - private final AtomicBoolean pollerAutoscaling = new AtomicBoolean(false); + /** Namespace capabilities populated during start() from DescribeNamespace response. */ + private final NamespaceCapabilities namespaceCapabilities = new NamespaceCapabilities(); private State state = State.Initial; @@ -198,7 +201,7 @@ public synchronized Worker newWorker(String taskQueue, WorkerOptions options) { workflowThreadExecutor, workflowClient.getOptions().getContextPropagators(), plugins, - pollerAutoscaling); + namespaceCapabilities); workers.put(taskQueue, worker); // Go through the plugins to call plugin initializeWorker hooks (e.g. register workflows, @@ -265,8 +268,16 @@ public synchronized void start() { DescribeNamespaceRequest.newBuilder() .setNamespace(workflowClient.getOptions().getNamespace()) .build()); + if (describeNamespaceResponse.getNamespaceInfo().getCapabilities().getWorkerHeartbeats()) { + namespaceCapabilities.setWorkerHeartbeats(true); + } else { + log.debug( + "Server does not support worker heartbeats for namespace {}", + workflowClient.getOptions().getNamespace()); + } + if (describeNamespaceResponse.getNamespaceInfo().getCapabilities().getPollerAutoscaling()) { - pollerAutoscaling.set(true); + namespaceCapabilities.setPollerAutoscaling(true); } // Build plugin execution chain (reverse order for proper nesting) @@ -300,6 +311,20 @@ private void doStart() { startChain.accept(taskQueue, worker); } + // Register heartbeat callbacks after workers are started. + WorkflowClientInternal clientInternal = (WorkflowClientInternal) workflowClient.getInternal(); + HeartbeatManager hbManager = clientInternal.getHeartbeatManager(); + if (hbManager != null && namespaceCapabilities.isWorkerHeartbeats()) { + String namespace = workflowClient.getOptions().getNamespace(); + String workerGroupingKey = clientInternal.getWorkerGroupingKey(); + for (Worker worker : workers.values()) { + Supplier heartbeatSupplier = + worker.buildHeartbeatCallback(workerGroupingKey); + hbManager.registerWorker(namespace, worker.getWorkerInstanceKey(), heartbeatSupplier); + worker.workflowWorker.setHeartbeatSupplier(heartbeatSupplier); + } + } + state = State.Started; ((WorkflowClientInternal) workflowClient.getInternal()).registerWorkerFactory(this); } @@ -418,6 +443,18 @@ private void doShutdown(boolean interruptUserTasks) { CompletableFuture.allOf(shutdownFutures.toArray(new CompletableFuture[0])) .thenApply( r -> { + // Unregister workers from heartbeat manager only after full shutdown, + // so heartbeats continue reporting SHUTTING_DOWN until the worker is fully stopped. + if (namespaceCapabilities.isWorkerHeartbeats()) { + HeartbeatManager hbManager = + ((WorkflowClientInternal) workflowClient.getInternal()).getHeartbeatManager(); + if (hbManager != null) { + String namespace = workflowClient.getOptions().getNamespace(); + for (Worker worker : workers.values()) { + hbManager.unregisterWorker(namespace, worker.getWorkerInstanceKey()); + } + } + } cache.invalidateAll(); workflowThreadPool.shutdownNow(); return null; diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/HeartbeatManagerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/HeartbeatManagerTest.java new file mode 100644 index 000000000..96192e4c0 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/HeartbeatManagerTest.java @@ -0,0 +1,231 @@ +package io.temporal.internal.worker; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import io.temporal.api.worker.v1.WorkerHeartbeat; +import io.temporal.api.workflowservice.v1.*; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.time.Duration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class HeartbeatManagerTest { + + private static final Duration FAST_INTERVAL = Duration.ofMillis(50); + private static final int VERIFY_TIMEOUT_MS = 1000; + + private WorkflowServiceStubs service; + private WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub; + private HeartbeatManager manager; + + @Before + public void setUp() { + service = mock(WorkflowServiceStubs.class); + blockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); + when(service.blockingStub()).thenReturn(blockingStub); + when(blockingStub.recordWorkerHeartbeat(any())) + .thenReturn(RecordWorkerHeartbeatResponse.getDefaultInstance()); + } + + @After + public void tearDown() { + if (manager != null) { + manager.shutdown(); + } + } + + @Test + public void testHeartbeatRpcSentAtInterval() throws Exception { + manager = new HeartbeatManager(service, "test-identity", FAST_INTERVAL); + + WorkerHeartbeat hb = + WorkerHeartbeat.newBuilder() + .setWorkerInstanceKey("worker-1") + .setTaskQueue("test-queue") + .build(); + manager.registerWorker("default", "worker-1", () -> hb); + + verify(blockingStub, timeout(VERIFY_TIMEOUT_MS).atLeastOnce()).recordWorkerHeartbeat(any()); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(RecordWorkerHeartbeatRequest.class); + verify(blockingStub, atLeastOnce()).recordWorkerHeartbeat(captor.capture()); + + RecordWorkerHeartbeatRequest request = captor.getValue(); + assertEquals("default", request.getNamespace()); + assertEquals("test-identity", request.getIdentity()); + assertTrue(request.getWorkerHeartbeatCount() > 0); + assertEquals("test-queue", request.getWorkerHeartbeat(0).getTaskQueue()); + } + + @Test + public void testMultipleWorkersInSingleRpc() throws Exception { + manager = new HeartbeatManager(service, "test-identity", FAST_INTERVAL); + + WorkerHeartbeat hb1 = + WorkerHeartbeat.newBuilder() + .setWorkerInstanceKey("worker-1") + .setTaskQueue("queue-1") + .build(); + WorkerHeartbeat hb2 = + WorkerHeartbeat.newBuilder() + .setWorkerInstanceKey("worker-2") + .setTaskQueue("queue-2") + .build(); + manager.registerWorker("default", "worker-1", () -> hb1); + manager.registerWorker("default", "worker-2", () -> hb2); + + verify(blockingStub, timeout(VERIFY_TIMEOUT_MS).atLeast(2)).recordWorkerHeartbeat(any()); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(RecordWorkerHeartbeatRequest.class); + verify(blockingStub, atLeast(2)).recordWorkerHeartbeat(captor.capture()); + + boolean foundBoth = + captor.getAllValues().stream().anyMatch(req -> req.getWorkerHeartbeatCount() == 2); + assertTrue("Expected at least one RPC with 2 worker heartbeats", foundBoth); + } + + @Test + public void testUnregisterStopsRpcWhenEmpty() throws Exception { + manager = new HeartbeatManager(service, "test-identity", FAST_INTERVAL); + + WorkerHeartbeat hb = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-1").build(); + manager.registerWorker("default", "worker-1", () -> hb); + + verify(blockingStub, timeout(VERIFY_TIMEOUT_MS).atLeastOnce()).recordWorkerHeartbeat(any()); + + manager.unregisterWorker("default", "worker-1"); + clearInvocations(blockingStub); + + verify(blockingStub, after(VERIFY_TIMEOUT_MS).never()).recordWorkerHeartbeat(any()); + } + + @Test + public void testDifferentNamespacesGetSeparateRpcs() throws Exception { + manager = new HeartbeatManager(service, "test-identity", FAST_INTERVAL); + + WorkerHeartbeat hb1 = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-ns1").build(); + WorkerHeartbeat hb2 = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-ns2").build(); + manager.registerWorker("namespace-1", "worker-ns1", () -> hb1); + manager.registerWorker("namespace-2", "worker-ns2", () -> hb2); + + verify(blockingStub, timeout(VERIFY_TIMEOUT_MS).atLeast(2)).recordWorkerHeartbeat(any()); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(RecordWorkerHeartbeatRequest.class); + verify(blockingStub, atLeast(2)).recordWorkerHeartbeat(captor.capture()); + + boolean foundNs1 = + captor.getAllValues().stream().anyMatch(req -> "namespace-1".equals(req.getNamespace())); + boolean foundNs2 = + captor.getAllValues().stream().anyMatch(req -> "namespace-2".equals(req.getNamespace())); + assertTrue("Expected heartbeat RPC for namespace-1", foundNs1); + assertTrue("Expected heartbeat RPC for namespace-2", foundNs2); + + // Each RPC should only contain workers for its own namespace + boolean noMixing = + captor.getAllValues().stream().allMatch(req -> req.getWorkerHeartbeatCount() == 1); + assertTrue("Each namespace RPC should contain exactly 1 worker", noMixing); + } + + @Test + public void testExceptionsCaughtAndLogged() throws Exception { + when(blockingStub.recordWorkerHeartbeat(any())).thenThrow(new RuntimeException("test error")); + + manager = new HeartbeatManager(service, "test-identity", FAST_INTERVAL); + + WorkerHeartbeat hb = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-1").build(); + manager.registerWorker("default", "worker-1", () -> hb); + + // Wait for at least 2 ticks — proves the scheduler survived the exception + verify(blockingStub, timeout(VERIFY_TIMEOUT_MS).atLeast(2)).recordWorkerHeartbeat(any()); + } + + @Test + public void testNoRpcsWhenNoWorkersRegistered() throws Exception { + manager = new HeartbeatManager(service, "test-identity", FAST_INTERVAL); + + verify(blockingStub, after(VERIFY_TIMEOUT_MS).never()).recordWorkerHeartbeat(any()); + } + + @Test + public void testUnimplementedStopsScheduler() throws Exception { + when(blockingStub.recordWorkerHeartbeat(any())) + .thenThrow(new io.grpc.StatusRuntimeException(io.grpc.Status.UNIMPLEMENTED)); + + manager = new HeartbeatManager(service, "test-identity", FAST_INTERVAL); + + WorkerHeartbeat hb = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-1").build(); + manager.registerWorker("default", "worker-1", () -> hb); + + // Wait for the first tick to hit UNIMPLEMENTED + verify(blockingStub, timeout(VERIFY_TIMEOUT_MS).atLeastOnce()).recordWorkerHeartbeat(any()); + + // After UNIMPLEMENTED, scheduler should stop — no more RPCs + clearInvocations(blockingStub); + verify(blockingStub, after(VERIFY_TIMEOUT_MS).never()).recordWorkerHeartbeat(any()); + } + + @Test + public void testUnregisterFromOneNamespaceDoesNotAffectAnother() throws Exception { + manager = new HeartbeatManager(service, "test-identity", FAST_INTERVAL); + + WorkerHeartbeat hb1 = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-ns1").build(); + WorkerHeartbeat hb2 = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-ns2").build(); + manager.registerWorker("namespace-1", "worker-ns1", () -> hb1); + manager.registerWorker("namespace-2", "worker-ns2", () -> hb2); + + // Both namespaces heartbeating + verify(blockingStub, timeout(VERIFY_TIMEOUT_MS).atLeast(2)).recordWorkerHeartbeat(any()); + + // Unregister from namespace-1 only + manager.unregisterWorker("namespace-1", "worker-ns1"); + clearInvocations(blockingStub); + + // namespace-2 should continue heartbeating + verify(blockingStub, timeout(VERIFY_TIMEOUT_MS).atLeastOnce()).recordWorkerHeartbeat(any()); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(RecordWorkerHeartbeatRequest.class); + verify(blockingStub, atLeastOnce()).recordWorkerHeartbeat(captor.capture()); + + // All RPCs after unregister should be for namespace-2 only + assertTrue( + "Only namespace-2 RPCs expected after unregistering namespace-1", + captor.getAllValues().stream().allMatch(req -> "namespace-2".equals(req.getNamespace()))); + } + + @Test + public void testNamespaceSchedulerStopsWhenLastWorkerUnregisters() throws Exception { + manager = new HeartbeatManager(service, "test-identity", FAST_INTERVAL); + + WorkerHeartbeat hb1 = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-1").build(); + WorkerHeartbeat hb2 = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-2").build(); + manager.registerWorker("default", "worker-1", () -> hb1); + manager.registerWorker("default", "worker-2", () -> hb2); + + verify(blockingStub, timeout(VERIFY_TIMEOUT_MS).atLeastOnce()).recordWorkerHeartbeat(any()); + + // Unregister first worker — namespace scheduler should still be running + manager.unregisterWorker("default", "worker-1"); + clearInvocations(blockingStub); + verify(blockingStub, timeout(VERIFY_TIMEOUT_MS).atLeastOnce()).recordWorkerHeartbeat(any()); + + // Unregister last worker — namespace scheduler should stop + manager.unregisterWorker("default", "worker-2"); + clearInvocations(blockingStub); + verify(blockingStub, after(VERIFY_TIMEOUT_MS).never()).recordWorkerHeartbeat(any()); + } + + @Test + public void testIntervalValidation() { + HeartbeatManager hm = new HeartbeatManager(service, "test-identity", Duration.ofSeconds(30)); + assertNotNull(hm); + hm.shutdown(); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/PollerTrackerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/PollerTrackerTest.java new file mode 100644 index 000000000..3ae265dff --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/PollerTrackerTest.java @@ -0,0 +1,86 @@ +package io.temporal.internal.worker; + +import static org.junit.Assert.*; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import org.junit.Test; + +public class PollerTrackerTest { + + @Test + public void testPollStartedIncrementsAndReturnsCount() { + PollerTracker tracker = new PollerTracker(); + assertEquals(0, tracker.getInFlightPolls()); + assertEquals(1, tracker.pollStarted()); + assertEquals(1, tracker.getInFlightPolls()); + assertEquals(2, tracker.pollStarted()); + assertEquals(2, tracker.getInFlightPolls()); + } + + @Test + public void testPollCompletedDecrementsAndReturnsCount() { + PollerTracker tracker = new PollerTracker(); + tracker.pollStarted(); + tracker.pollStarted(); + assertEquals(1, tracker.pollCompleted()); + assertEquals(1, tracker.getInFlightPolls()); + assertEquals(0, tracker.pollCompleted()); + assertEquals(0, tracker.getInFlightPolls()); + } + + @Test + public void testPollSucceededSetsLastPollTime() { + PollerTracker tracker = new PollerTracker(); + tracker.pollSucceeded(); + assertNotNull(tracker.getLastSuccessfulPollTime()); + } + + @Test + public void testLastPollTimeInitiallyNull() { + PollerTracker tracker = new PollerTracker(); + assertNull(tracker.getLastSuccessfulPollTime()); + } + + @Test + public void testConcurrentPollTracking() throws Exception { + PollerTracker tracker = new PollerTracker(); + int threadCount = 8; + int opsPerThread = 100; + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + List> futures = new ArrayList<>(); + + for (int i = 0; i < threadCount; i++) { + futures.add( + executor.submit( + () -> { + for (int j = 0; j < opsPerThread; j++) { + tracker.pollStarted(); + tracker.pollCompleted(); + } + })); + } + + for (Future f : futures) { + f.get(5, TimeUnit.SECONDS); + } + executor.shutdown(); + + assertEquals(0, tracker.getInFlightPolls()); + } + + @Test + public void testMultipleSuccessfulPolls() { + PollerTracker tracker = new PollerTracker(); + tracker.pollSucceeded(); + Instant first = tracker.getLastSuccessfulPollTime(); + assertNotNull(first); + + tracker.pollSucceeded(); + Instant second = tracker.getLastSuccessfulPollTime(); + assertNotNull(second); + assertFalse("Last poll time should advance with each successful poll", second.isBefore(first)); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java index ee2525547..e4223c0b5 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java @@ -84,7 +84,9 @@ public void supplierIsCalledAppropriately() { trackingSS, stickyQueueBalancer, metricsScope, - () -> GetSystemInfoResponse.Capabilities.newBuilder().build()); + () -> GetSystemInfoResponse.Capabilities.newBuilder().build(), + new PollerTracker(), + new PollerTracker()); PollWorkflowTaskQueueResponse pollResponse = PollWorkflowTaskQueueResponse.newBuilder() @@ -173,7 +175,8 @@ public void asyncPollerSupplierIsCalledAppropriately() throws Exception { new WorkerVersioningOptions("", false, null), trackingSS, metricsScope, - () -> GetSystemInfoResponse.Capabilities.newBuilder().build()); + () -> GetSystemInfoResponse.Capabilities.newBuilder().build(), + new PollerTracker()); SlotPermit permit = new SlotPermit(); diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java index 0a080ec63..59538ac8b 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java @@ -72,7 +72,9 @@ public void stickyQueueBacklogResetTest() { slotSupplier, stickyQueueBalancer, metricsScope, - () -> GetSystemInfoResponse.Capabilities.newBuilder().build()); + () -> GetSystemInfoResponse.Capabilities.newBuilder().build(), + new PollerTracker(), + new PollerTracker()); PollWorkflowTaskQueueResponse pollResponse = PollWorkflowTaskQueueResponse.newBuilder() diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/TrackingSlotSupplierKindTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/TrackingSlotSupplierKindTest.java new file mode 100644 index 000000000..392bec955 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/TrackingSlotSupplierKindTest.java @@ -0,0 +1,63 @@ +package io.temporal.internal.worker; + +import static org.junit.Assert.*; + +import com.uber.m3.tally.NoopScope; +import io.temporal.worker.tuning.*; +import org.junit.Test; + +public class TrackingSlotSupplierKindTest { + + @Test + public void testFixedSupplierKind() { + SlotSupplier supplier = new FixedSizeSlotSupplier<>(10); + TrackingSlotSupplier tracking = + new TrackingSlotSupplier<>(supplier, new NoopScope()); + assertEquals("Fixed", tracking.getSupplierKind()); + } + + @Test + public void testResourceBasedSupplierKind() { + ResourceBasedController controller = + ResourceBasedController.newSystemInfoController( + ResourceBasedControllerOptions.newBuilder(0.5, 0.5).build()); + SlotSupplier supplier = + ResourceBasedSlotSupplier.createForActivity( + controller, + ResourceBasedSlotOptions.newBuilder().setMinimumSlots(1).setMaximumSlots(10).build()); + TrackingSlotSupplier tracking = + new TrackingSlotSupplier<>(supplier, new NoopScope()); + assertEquals("ResourceBased", tracking.getSupplierKind()); + } + + @Test + public void testCustomSupplierKind() { + SlotSupplier custom = new CustomTestSlotSupplier<>(); + TrackingSlotSupplier tracking = + new TrackingSlotSupplier<>(custom, new NoopScope()); + assertEquals("CustomTestSlotSupplier", tracking.getSupplierKind()); + } + + private static class CustomTestSlotSupplier implements SlotSupplier { + @Override + public SlotSupplierFuture reserveSlot(SlotReserveContext ctx) { + return SlotSupplierFuture.completedFuture(new SlotPermit()); + } + + @Override + public java.util.Optional tryReserveSlot(SlotReserveContext ctx) { + return java.util.Optional.of(new SlotPermit()); + } + + @Override + public void markSlotUsed(SlotMarkUsedContext ctx) {} + + @Override + public void releaseSlot(SlotReleaseContext ctx) {} + + @Override + public java.util.Optional getMaximumSlots() { + return java.util.Optional.of(5); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java index cbe194091..d4e6e947c 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java @@ -14,6 +14,7 @@ import com.uber.m3.util.ImmutableMap; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.common.v1.WorkflowType; +import io.temporal.api.enums.v1.TaskQueueType; import io.temporal.api.workflowservice.v1.*; import io.temporal.common.reporter.TestStatsReporter; import io.temporal.internal.common.InternalUtils; @@ -29,9 +30,12 @@ import io.temporal.worker.tuning.SlotSupplier; import io.temporal.worker.tuning.WorkflowSlotInfo; import java.time.Duration; +import java.util.Arrays; +import java.util.List; import java.util.UUID; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.junit.Test; import org.mockito.stubbing.Answer; import org.slf4j.Logger; @@ -70,6 +74,8 @@ public void concurrentPollRequestLockTest() throws Exception { client, "default", "task_queue", + "test-worker-instance-key", + java.util.Collections::emptyList, "sticky_task_queue", SingleWorkerOptions.newBuilder() .setIdentity("test_identity") @@ -85,7 +91,7 @@ public void concurrentPollRequestLockTest() throws Exception { taskHandler, eagerActivityDispatcher, slotSupplier, - new AtomicBoolean(false)); + new NamespaceCapabilities()); WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub = mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class); @@ -240,6 +246,8 @@ public void respondWorkflowTaskFailureMetricTest() throws Exception { client, "default", "task_queue", + "test-worker-instance-key", + java.util.Collections::emptyList, "sticky_task_queue", SingleWorkerOptions.newBuilder() .setIdentity("test_identity") @@ -255,7 +263,7 @@ public void respondWorkflowTaskFailureMetricTest() throws Exception { taskHandler, eagerActivityDispatcher, slotSupplier, - new AtomicBoolean(false)); + new NamespaceCapabilities()); WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub = mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class); @@ -383,6 +391,8 @@ public boolean isAnyTypeSupported() { client, "default", "taskQueue", + "test-worker-instance-key", + java.util.Collections::emptyList, "sticky", SingleWorkerOptions.newBuilder() .setIdentity("test_identity") @@ -398,11 +408,17 @@ public boolean isAnyTypeSupported() { taskHandler, eagerActivityDispatcher, slotSupplier, - new AtomicBoolean(false)); + new NamespaceCapabilities()); + + WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub = + mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class); + when(futureStub.shutdownWorker(any(ShutdownWorkerRequest.class))) + .thenReturn(Futures.immediateFuture(ShutdownWorkerResponse.newBuilder().build())); WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); when(client.blockingStub()).thenReturn(blockingStub); + when(client.futureStub()).thenReturn(futureStub); when(blockingStub.withOption(any(), any())).thenReturn(blockingStub); PollWorkflowTaskQueueResponse pollResponse = @@ -428,6 +444,80 @@ public boolean isAnyTypeSupported() { worker.shutdown(new ShutdownManager(), true).get(); } + @Test + public void activeTaskQueueTypesEvaluatedAtShutdownTime() throws Exception { + WorkflowServiceStubs client = mock(WorkflowServiceStubs.class); + when(client.getServerCapabilities()) + .thenReturn(() -> GetSystemInfoResponse.Capabilities.newBuilder().build()); + + WorkflowRunLockManager runLockManager = new WorkflowRunLockManager(); + Scope metricsScope = new NoopScope(); + WorkflowExecutorCache cache = new WorkflowExecutorCache(10, runLockManager, metricsScope); + SlotSupplier slotSupplier = new FixedSizeSlotSupplier<>(10); + + WorkflowTaskHandler taskHandler = mock(WorkflowTaskHandler.class); + when(taskHandler.isAnyTypeSupported()).thenReturn(true); + + // Supplier that starts with WORKFLOW only, then adds NEXUS later + AtomicReference> typesRef = + new AtomicReference<>(Arrays.asList(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW)); + Supplier> supplier = typesRef::get; + + EagerActivityDispatcher eagerActivityDispatcher = mock(EagerActivityDispatcher.class); + WorkflowWorker worker = + new WorkflowWorker( + client, + "default", + "task_queue", + "test-worker-instance-key", + supplier, + null, + SingleWorkerOptions.newBuilder() + .setIdentity("test_identity") + .setBuildId(UUID.randomUUID().toString()) + .setPollerOptions( + PollerOptions.newBuilder() + .setPollerBehavior(new PollerBehaviorSimpleMaximum(1)) + .build()) + .setMetricsScope(metricsScope) + .build(), + runLockManager, + cache, + taskHandler, + eagerActivityDispatcher, + slotSupplier, + new NamespaceCapabilities()); + + // Simulate registering Nexus after construction + typesRef.set( + Arrays.asList( + TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW, + TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY, + TaskQueueType.TASK_QUEUE_TYPE_NEXUS)); + + WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub = + mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class); + when(client.futureStub()).thenReturn(futureStub); + when(futureStub.shutdownWorker(any(ShutdownWorkerRequest.class))) + .thenReturn(Futures.immediateFuture(ShutdownWorkerResponse.newBuilder().build())); + + worker.shutdown(new ShutdownManager(), true).get(5, TimeUnit.SECONDS); + + org.mockito.ArgumentCaptor captor = + org.mockito.ArgumentCaptor.forClass(ShutdownWorkerRequest.class); + verify(futureStub).shutdownWorker(captor.capture()); + List shutdownTypes = captor.getValue().getTaskQueueTypesList(); + assertTrue( + "ShutdownWorkerRequest should include NEXUS type added after construction", + shutdownTypes.contains(TaskQueueType.TASK_QUEUE_TYPE_NEXUS)); + assertTrue( + "ShutdownWorkerRequest should include WORKFLOW type", + shutdownTypes.contains(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW)); + assertTrue( + "ShutdownWorkerRequest should include ACTIVITY type", + shutdownTypes.contains(TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY)); + } + private ReplayWorkflowFactory setUpMockWorkflowFactory() throws Throwable { ReplayWorkflow mockWorkflow = mock(ReplayWorkflow.class); ReplayWorkflowFactory mockFactory = mock(ReplayWorkflowFactory.class); diff --git a/temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatDeploymentVersionTest.java b/temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatDeploymentVersionTest.java new file mode 100644 index 000000000..e97f6bcf2 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatDeploymentVersionTest.java @@ -0,0 +1,135 @@ +package io.temporal.worker; + +import static io.temporal.testUtils.Eventually.assertEventually; +import static org.junit.Assert.*; +import static org.junit.Assume.assumeTrue; + +import io.temporal.api.worker.v1.WorkerHeartbeat; +import io.temporal.api.workflowservice.v1.*; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.common.WorkerDeploymentVersion; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class WorkerHeartbeatDeploymentVersionTest { + + private static final Duration EVENTUALLY_TIMEOUT = Duration.ofSeconds(10); + + private static final String TEST_DEPLOYMENT_NAME = "test-deployment"; + private static final String TEST_BUILD_ID = "1.0.0"; + + @Before + public void checkServerSupportsHeartbeats() { + assumeTrue( + "Requires real server with worker heartbeat support", + SDKTestWorkflowRule.useExternalService); + assumeTrue( + "Server does not support worker heartbeats", + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .describeNamespace( + DescribeNamespaceRequest.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .build()) + .getNamespaceInfo() + .getCapabilities() + .getWorkerHeartbeats()); + } + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setUseExternalService(true) + .setTestTimeoutSeconds(15) + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder() + .setWorkerHeartbeatInterval(Duration.ofSeconds(1)) + .build()) + .setWorkerOptions( + WorkerOptions.newBuilder() + .setDeploymentOptions( + WorkerDeploymentOptions.newBuilder() + .setVersion( + new WorkerDeploymentVersion(TEST_DEPLOYMENT_NAME, TEST_BUILD_ID)) + .build()) + .build()) + .setDoNotStart(true) + .build(); + + @Test + public void testDeploymentVersionInHeartbeat() throws Exception { + testWorkflowRule.getTestEnvironment().start(); + + String taskQueue = testWorkflowRule.getTaskQueue(); + + // Discover the worker via ListWorkers, then verify deployment version via DescribeWorker + assertEventually( + EVENTUALLY_TIMEOUT, + () -> { + List workers = listWorkersForQueue(taskQueue); + assertFalse("worker should appear via ListWorkers", workers.isEmpty()); + + WorkerHeartbeat hb = describeWorker(workers.get(0).getWorkerInstanceKey()); + assertNotNull("DescribeWorker should return stored heartbeat", hb); + assertTrue("deployment_version should be set", hb.hasDeploymentVersion()); + assertEquals( + "deployment_version.deployment_name should match configured value", + TEST_DEPLOYMENT_NAME, + hb.getDeploymentVersion().getDeploymentName()); + assertEquals( + "deployment_version.build_id should match configured value", + TEST_BUILD_ID, + hb.getDeploymentVersion().getBuildId()); + }); + } + + /** + * Uses deprecated WorkersInfo field because the replacement is not yet populated by the server. + */ + @SuppressWarnings("deprecation") + private List listWorkersForQueue(String taskQueue) { + ListWorkersResponse resp = + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .listWorkers( + ListWorkersRequest.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setPageSize(100) + .build()); + return resp.getWorkersInfoList().stream() + .map(info -> info.getWorkerHeartbeat()) + .filter(hb -> hb.getTaskQueue().equals(taskQueue)) + .collect(Collectors.toList()); + } + + private WorkerHeartbeat describeWorker(String workerInstanceKey) { + try { + DescribeWorkerResponse resp = + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .describeWorker( + DescribeWorkerRequest.newBuilder() + .setNamespace( + testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setWorkerInstanceKey(workerInstanceKey) + .build()); + return resp.getWorkerInfo().getWorkerHeartbeat(); + } catch (io.grpc.StatusRuntimeException e) { + if (e.getStatus().getCode() == io.grpc.Status.Code.NOT_FOUND) { + return null; + } + throw e; + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatIntegrationTest.java b/temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatIntegrationTest.java new file mode 100644 index 000000000..644b339fb --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatIntegrationTest.java @@ -0,0 +1,934 @@ +package io.temporal.worker; + +import static io.temporal.testUtils.Eventually.assertEventually; +import static org.junit.Assert.*; +import static org.junit.Assume.assumeTrue; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.ActivityOptions; +import io.temporal.api.enums.v1.WorkerStatus; +import io.temporal.api.worker.v1.WorkerHeartbeat; +import io.temporal.api.worker.v1.WorkerSlotsInfo; +import io.temporal.api.workflowservice.v1.*; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.tuning.ResourceBasedControllerOptions; +import io.temporal.worker.tuning.ResourceBasedTuner; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class WorkerHeartbeatIntegrationTest { + + private static final Duration EVENTUALLY_TIMEOUT = Duration.ofSeconds(10); + + @Before + public void checkServerSupportsHeartbeats() { + assumeTrue( + "Requires real server with worker heartbeat support", + SDKTestWorkflowRule.useExternalService); + assumeTrue( + "Server does not support worker heartbeats", + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .describeNamespace( + DescribeNamespaceRequest.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .build()) + .getNamespaceInfo() + .getCapabilities() + .getWorkerHeartbeats()); + } + + // Shared latches for blocking activity tests + static final CountDownLatch blockingActivityStarted = new CountDownLatch(1); + static final CountDownLatch blockingActivityRelease = new CountDownLatch(1); + + // Separate latches for sticky cache miss test + static final CountDownLatch cacheTestActivityStarted = new CountDownLatch(1); + static final CountDownLatch cacheTestActivityRelease = new CountDownLatch(1); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setUseExternalService(true) + .setTestTimeoutSeconds(15) + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder() + .setWorkerHeartbeatInterval(Duration.ofSeconds(1)) + .build()) + .setActivityImplementations( + new TestActivityImpl(), + new FailingActivityImpl(), + new BlockingActivityImpl(), + new CacheTestActivityImpl()) + .setWorkflowTypes( + TestWorkflowImpl.class, + FailingWorkflowImpl.class, + BlockingWorkflowImpl.class, + CacheTestWorkflowImpl.class) + .setDoNotStart(true) + .build(); + + /** + * Combined test for basic heartbeat fields that only require starting the environment (no + * workflow execution needed). Covers: RPC fields, host info, timestamps, plugins, and + * elapsed_since_last_heartbeat — all verified via DescribeWorker round-trip. + */ + @Test + public void testBasicHeartbeatFields() throws Exception { + testWorkflowRule.getTestEnvironment().start(); + + String workerInstanceKey = waitForWorkerInstanceKey(); + + // --- RPC fields via DescribeWorker round-trip --- + WorkerHeartbeat hb = describeWorker(workerInstanceKey); + assertNotNull("DescribeWorker should return stored heartbeat", hb); + assertEquals("temporal-java", hb.getSdkName()); + assertFalse("sdk version should be set", hb.getSdkVersion().isEmpty()); + assertFalse("task queue should be set", hb.getTaskQueue().isEmpty()); + assertEquals(workerInstanceKey, hb.getWorkerInstanceKey()); + assertEquals(WorkerStatus.WORKER_STATUS_RUNNING, hb.getStatus()); + assertTrue("start time should be set", hb.hasStartTime()); + assertTrue("heartbeat time should be set", hb.hasHeartbeatTime()); + assertTrue("host info should be set", hb.hasHostInfo()); + assertFalse("host name should be set", hb.getHostInfo().getHostName().isEmpty()); + assertFalse("process id should be set", hb.getHostInfo().getProcessId().isEmpty()); + + // --- Host info details --- + assertFalse( + "host_info.worker_grouping_key should not be empty", + hb.getHostInfo().getWorkerGroupingKey().isEmpty()); + assertTrue( + "host_info.current_host_cpu_usage should be >= 0", + hb.getHostInfo().getCurrentHostCpuUsage() >= 0.0f); + assertTrue( + "host_info.current_host_mem_usage should be >= 0", + hb.getHostInfo().getCurrentHostMemUsage() >= 0.0f); + + // --- Timestamps --- + long startTimeSec = hb.getStartTime().getSeconds(); + long nowSec = java.time.Instant.now().getEpochSecond(); + assertTrue("start_time should be within 30 seconds of now", nowSec - startTimeSec <= 30); + long heartbeatTimeSec = hb.getHeartbeatTime().getSeconds(); + assertTrue( + "heartbeat_time should be within 30 seconds of now", nowSec - heartbeatTimeSec <= 30); + assertTrue("heartbeat_time should be >= start_time", heartbeatTimeSec >= startTimeSec); + + // --- Plugins --- + assertEquals( + "plugins list should be empty when no plugins are configured", 0, hb.getPluginsCount()); + + // --- Elapsed since last heartbeat (set after the first heartbeat cycle) --- + assertEventually( + EVENTUALLY_TIMEOUT, + () -> { + WorkerHeartbeat latest = describeWorker(workerInstanceKey); + assertNotNull("DescribeWorker should return stored heartbeat", latest); + assertTrue( + "elapsed_since_last_heartbeat should be set after multiple heartbeat cycles", + latest.hasElapsedSinceLastHeartbeat()); + com.google.protobuf.Duration d = latest.getElapsedSinceLastHeartbeat(); + assertTrue( + "elapsed_since_last_heartbeat should be non-zero", + d.getSeconds() > 0 || d.getNanos() > 0); + }); + } + + @Test + public void testShutdownHeartbeatStatus() throws Exception { + testWorkflowRule.getTestEnvironment().start(); + + // Create a separate factory so we can shut it down within the test + String taskQueue = testWorkflowRule.getTaskQueue() + "-shutdown"; + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + WorkerFactory factory = + WorkerFactory.newInstance(client, testWorkflowRule.getWorkerFactoryOptions()); + Worker worker = factory.newWorker(taskQueue); + worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class); + worker.registerActivitiesImplementations(new TestActivityImpl()); + factory.start(); + + // Discover worker instance key via ListWorkers + String workerInstanceKey = + assertEventually( + EVENTUALLY_TIMEOUT, + () -> { + List workers = listWorkersForQueue(taskQueue); + assertFalse("worker should appear via ListWorkers", workers.isEmpty()); + return workers.get(0).getWorkerInstanceKey(); + }); + + // Graceful shutdown sends ShutdownWorkerRequest with SHUTTING_DOWN status + factory.shutdown(); + factory.awaitTermination(10, TimeUnit.SECONDS); + + // After shutdown, the server should reflect SHUTTING_DOWN status + WorkerHeartbeat hb = describeWorker(workerInstanceKey); + assertNotNull("DescribeWorker should return stored heartbeat after shutdown", hb); + assertEquals( + "status should be WORKER_STATUS_SHUTTING_DOWN after shutdown", + WorkerStatus.WORKER_STATUS_SHUTTING_DOWN, + hb.getStatus()); + assertFalse("task_queue should be set", hb.getTaskQueue().isEmpty()); + } + + /** + * Combined test for heartbeat fields that require workflow execution. Covers: slot info, task + * counters, and poller info. + */ + @Test + public void testHeartbeatAfterWorkflowExecution() throws Exception { + testWorkflowRule.getTestEnvironment().start(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + TestWorkflow wf = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowExecutionTimeout(Duration.ofSeconds(30)) + .build()); + assertEquals("done", wf.execute("test")); + + String workerInstanceKey = waitForWorkerInstanceKey(); + + // --- Slot info via DescribeWorker --- + assertEventually( + EVENTUALLY_TIMEOUT, + () -> { + WorkerHeartbeat describedHb = describeWorker(workerInstanceKey); + assertNotNull("DescribeWorker should return stored heartbeat", describedHb); + + assertTrue( + "workflow_task_slots_info should be set", describedHb.hasWorkflowTaskSlotsInfo()); + assertTrue( + "activity_task_slots_info should be set", describedHb.hasActivityTaskSlotsInfo()); + assertTrue( + "local_activity_slots_info should be set", describedHb.hasLocalActivitySlotsInfo()); + assertTrue("nexus_task_slots_info should be set", describedHb.hasNexusTaskSlotsInfo()); + + assertFalse( + "workflow slot supplier kind should be set", + describedHb.getWorkflowTaskSlotsInfo().getSlotSupplierKind().isEmpty()); + assertFalse( + "activity slot supplier kind should be set", + describedHb.getActivityTaskSlotsInfo().getSlotSupplierKind().isEmpty()); + + assertEquals( + "workflow used slots should be 0 after completion", + 0, + describedHb.getWorkflowTaskSlotsInfo().getCurrentUsedSlots()); + assertEquals( + "activity used slots should be 0 after completion", + 0, + describedHb.getActivityTaskSlotsInfo().getCurrentUsedSlots()); + + assertTrue( + "workflow available slots should be > 0", + describedHb.getWorkflowTaskSlotsInfo().getCurrentAvailableSlots() > 0); + assertTrue( + "activity available slots should be > 0", + describedHb.getActivityTaskSlotsInfo().getCurrentAvailableSlots() > 0); + + // --- Task counters --- + assertTrue( + "workflow_task_slots_info.total_processed_tasks should be >= 1", + describedHb.getWorkflowTaskSlotsInfo().getTotalProcessedTasks() >= 1); + assertTrue( + "activity_task_slots_info.total_processed_tasks should be >= 1", + describedHb.getActivityTaskSlotsInfo().getTotalProcessedTasks() >= 1); + + // --- Poller info --- + assertTrue("workflow_poller_info should be set", describedHb.hasWorkflowPollerInfo()); + assertTrue( + "workflow_poller_info should have current_pollers > 0", + describedHb.getWorkflowPollerInfo().getCurrentPollers() > 0); + assertTrue("activity_poller_info should be set", describedHb.hasActivityPollerInfo()); + assertTrue( + "activity_poller_info should have current_pollers > 0", + describedHb.getActivityPollerInfo().getCurrentPollers() > 0); + assertTrue("nexus_poller_info should be set", describedHb.hasNexusPollerInfo()); + assertTrue( + "workflow_poller_info should have last_successful_poll_time set", + describedHb.getWorkflowPollerInfo().hasLastSuccessfulPollTime()); + // activity_poller_info.last_successful_poll_time is asserted in + // testActivityInFlightSlotTracking where a blocking activity ensures the task + // goes through the poller (eager execution bypasses it here). + + assertFalse( + "workflow_poller_info.is_autoscaling should be false with default pollers", + describedHb.getWorkflowPollerInfo().getIsAutoscaling()); + assertFalse( + "activity_poller_info.is_autoscaling should be false with default pollers", + describedHb.getActivityPollerInfo().getIsAutoscaling()); + }); + } + + @Test + public void testFailureMetricsInHeartbeat() throws Exception { + testWorkflowRule.getTestEnvironment().start(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + FailingWorkflow wf = + client.newWorkflowStub( + FailingWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowExecutionTimeout(Duration.ofSeconds(30)) + .build()); + try { + wf.execute(); + } catch (Exception e) { + // Expected: the activity fails and the workflow fails + } + + String workerInstanceKey = waitForWorkerInstanceKey(); + + // ApplicationFailure is handled within the activity handler and returned as a result, + // so it counts as a processed task, not a failed task. "Failed tasks" tracks + // infrastructure-level failures where the task handler itself threw an exception. + assertEventually( + EVENTUALLY_TIMEOUT, + () -> { + WorkerHeartbeat hb = describeWorker(workerInstanceKey); + assertNotNull("DescribeWorker should return stored heartbeat", hb); + assertTrue( + "activity_task_slots_info.total_processed_tasks should be >= 1 after activity execution", + hb.hasActivityTaskSlotsInfo() + && hb.getActivityTaskSlotsInfo().getTotalProcessedTasks() >= 1); + + // Invariant: totalFailed must never exceed totalProcessed. Before the fix, + // recordFailed() could be called without recordProcessed() in catch blocks, + // violating this invariant. + for (String slotType : new String[] {"workflow", "activity", "local_activity", "nexus"}) { + WorkerSlotsInfo slots; + switch (slotType) { + case "workflow": + slots = hb.getWorkflowTaskSlotsInfo(); + break; + case "activity": + slots = hb.getActivityTaskSlotsInfo(); + break; + case "local_activity": + slots = hb.getLocalActivitySlotsInfo(); + break; + case "nexus": + slots = hb.getNexusTaskSlotsInfo(); + break; + default: + throw new AssertionError("unexpected slot type"); + } + if (slots != null) { + assertTrue( + slotType + " total_failed_tasks should not exceed total_processed_tasks", + slots.getTotalFailedTasks() <= slots.getTotalProcessedTasks()); + } + } + }); + } + + @Test + public void testWorkflowTaskProcessedCounts() throws Exception { + testWorkflowRule.getTestEnvironment().start(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + for (int i = 0; i < 3; i++) { + TestWorkflow wf = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowExecutionTimeout(Duration.ofSeconds(30)) + .build()); + assertEquals("done", wf.execute("test" + i)); + } + + String workerInstanceKey = waitForWorkerInstanceKey(); + + assertEventually( + EVENTUALLY_TIMEOUT, + () -> { + WorkerHeartbeat hb = describeWorker(workerInstanceKey); + assertNotNull("DescribeWorker should return stored heartbeat", hb); + assertTrue( + "workflow_task_slots_info.total_processed_tasks should be >= 3 after 3 workflows", + hb.hasWorkflowTaskSlotsInfo() + && hb.getWorkflowTaskSlotsInfo().getTotalProcessedTasks() >= 3); + }); + } + + /** Verifies activity slots are occupied while an activity is running, then released after. */ + @Test + public void testActivityInFlightSlotTracking() throws Exception { + testWorkflowRule.getTestEnvironment().start(); + + String workerInstanceKey = waitForWorkerInstanceKey(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + BlockingWorkflow wf = + client.newWorkflowStub( + BlockingWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowExecutionTimeout(Duration.ofSeconds(30)) + .build()); + + // Start workflow async — the activity will block until we release it + CompletableFuture wfFuture = WorkflowClient.execute(wf::execute); + + // Wait for the blocking activity to start + assertTrue( + "blocking activity should have started", + blockingActivityStarted.await(10, TimeUnit.SECONDS)); + + // While the activity is running, eventually a heartbeat should show used slots >= 1 + // and last_successful_poll_time should be set (the blocking activity goes through the poller, + // unlike eager-executed activities) + assertEventually( + EVENTUALLY_TIMEOUT, + () -> { + WorkerHeartbeat hb = describeWorker(workerInstanceKey); + assertNotNull("DescribeWorker should return stored heartbeat", hb); + assertTrue( + "activity_task_slots_info.current_used_slots should be >= 1 while activity is running", + hb.hasActivityTaskSlotsInfo() + && hb.getActivityTaskSlotsInfo().getCurrentUsedSlots() >= 1); + assertTrue( + "activity_poller_info should have last_successful_poll_time set", + hb.hasActivityPollerInfo() && hb.getActivityPollerInfo().hasLastSuccessfulPollTime()); + }); + + // Release the activity + blockingActivityRelease.countDown(); + wfFuture.get(10, TimeUnit.SECONDS); + + // After completion, used slots should return to 0 + assertEventually( + EVENTUALLY_TIMEOUT, + () -> { + WorkerHeartbeat hb = describeWorker(workerInstanceKey); + assertNotNull("DescribeWorker should return stored heartbeat", hb); + assertEquals( + "activity used slots should be 0 after activity completes", + 0, + hb.getActivityTaskSlotsInfo().getCurrentUsedSlots()); + }); + } + + /** Verifies sticky cache counters are reported in heartbeat. */ + @Test + public void testStickyCacheCountersInHeartbeat() throws Exception { + testWorkflowRule.getTestEnvironment().start(); + + // Run a workflow to generate at least one sticky cache hit or populate the cache + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + TestWorkflow wf = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowExecutionTimeout(Duration.ofSeconds(30)) + .build()); + assertEquals("done", wf.execute("test")); + + String workerInstanceKey = waitForWorkerInstanceKey(); + + assertEventually( + EVENTUALLY_TIMEOUT, + () -> { + WorkerHeartbeat hb = describeWorker(workerInstanceKey); + assertNotNull("DescribeWorker should return stored heartbeat", hb); + + // Sticky cache fields should be present (values may be 0 if no cache hits yet) + assertTrue( + "total_sticky_cache_hit + total_sticky_cache_miss + current_sticky_cache_size should be >= 0", + hb.getTotalStickyCacheHit() >= 0 + && hb.getTotalStickyCacheMiss() >= 0 + && hb.getCurrentStickyCacheSize() >= 0); + }); + } + + /** + * Verifies sticky cache misses are tracked in heartbeat. Starts a workflow with a blocking + * activity, purges the cache while the activity runs, then completes. The workflow task on resume + * triggers a cache miss. Matches Go's TestWorkerHeartbeatStickyCacheMiss and Rust's + * worker_heartbeat_sticky_cache_miss. + */ + @Test + public void testStickyCacheMissInHeartbeat() throws Exception { + testWorkflowRule.getTestEnvironment().start(); + + String workerInstanceKey = waitForWorkerInstanceKey(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + CacheTestWorkflow wf = + client.newWorkflowStub( + CacheTestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowExecutionTimeout(Duration.ofSeconds(30)) + .build()); + + // Start workflow async — the activity will block until we release it + CompletableFuture wfFuture = WorkflowClient.execute(wf::execute); + + // Wait for the blocking activity to start + assertTrue( + "cache test activity should have started", + cacheTestActivityStarted.await(10, TimeUnit.SECONDS)); + + // Purge the sticky cache so the workflow's next WFT triggers a cache miss + testWorkflowRule.invalidateWorkflowCache(); + + // Release the activity — workflow resumes on non-sticky queue + cacheTestActivityRelease.countDown(); + assertEquals("done", wfFuture.get(10, TimeUnit.SECONDS)); + + // Wait for heartbeat to capture the sticky cache miss + assertEventually( + EVENTUALLY_TIMEOUT, + () -> { + WorkerHeartbeat hb = describeWorker(workerInstanceKey); + assertNotNull("DescribeWorker should return stored heartbeat", hb); + assertTrue( + "should have at least 1 sticky cache miss after cache purge", + hb.getTotalStickyCacheMiss() >= 1); + }); + } + + /** + * Verifies that interval counters (last_interval_processed_tasks) reset between heartbeat + * intervals. After workflow execution, the counter should be >0, then reset to 0 once idle. + */ + @Test + public void testIntervalCounterReset() throws Exception { + testWorkflowRule.getTestEnvironment().start(); + + String workerInstanceKey = waitForWorkerInstanceKey(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + + // Run a workflow to generate processed tasks + TestWorkflow wf = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowExecutionTimeout(Duration.ofSeconds(30)) + .build()); + assertEquals("done", wf.execute("test")); + + // After the workflow completes and the worker goes idle, interval counter should reset to 0 + assertEventually( + EVENTUALLY_TIMEOUT, + () -> { + WorkerHeartbeat hb = describeWorker(workerInstanceKey); + assertNotNull("DescribeWorker should return stored heartbeat", hb); + assertTrue("workflow_task_slots_info should be set", hb.hasWorkflowTaskSlotsInfo()); + // total_processed_tasks should reflect the work was done + assertTrue( + "total_processed_tasks should be >= 1", + hb.getWorkflowTaskSlotsInfo().getTotalProcessedTasks() >= 1); + // After going idle, the interval counter should reset to 0 + assertEquals( + "last_interval_processed_tasks should reset to 0 when no new work occurs", + 0, + hb.getWorkflowTaskSlotsInfo().getLastIntervalProcessedTasks()); + }); + } + + /** + * Tests that two workers on different task queues produce distinct instance keys but share the + * same worker_grouping_key. Matches Go's TestWorkerHeartbeatMultipleWorkers. + */ + @Test + public void testMultipleWorkersHaveDistinctInstanceKeys() throws Exception { + testWorkflowRule.getTestEnvironment().start(); + + String taskQueue1 = testWorkflowRule.getTaskQueue() + "-multi1"; + String taskQueue2 = testWorkflowRule.getTaskQueue() + "-multi2"; + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + WorkerFactory factory = + WorkerFactory.newInstance(client, testWorkflowRule.getWorkerFactoryOptions()); + + Worker worker1 = factory.newWorker(taskQueue1); + worker1.registerWorkflowImplementationTypes(TestWorkflowImpl.class); + worker1.registerActivitiesImplementations(new TestActivityImpl()); + + Worker worker2 = factory.newWorker(taskQueue2); + worker2.registerWorkflowImplementationTypes(TestWorkflowImpl.class); + worker2.registerActivitiesImplementations(new TestActivityImpl()); + + factory.start(); + + // Wait for both workers to appear via ListWorkers (e2e server-side verification) + assertEventually( + EVENTUALLY_TIMEOUT, + () -> { + List workers1 = listWorkersForQueue(taskQueue1); + List workers2 = listWorkersForQueue(taskQueue2); + assertFalse("worker 1 should appear via ListWorkers", workers1.isEmpty()); + assertFalse("worker 2 should appear via ListWorkers", workers2.isEmpty()); + + WorkerHeartbeat hb1 = workers1.get(0); + WorkerHeartbeat hb2 = workers2.get(0); + + assertFalse( + "worker 1 instance key should not be empty", hb1.getWorkerInstanceKey().isEmpty()); + assertFalse( + "worker 2 instance key should not be empty", hb2.getWorkerInstanceKey().isEmpty()); + assertNotEquals( + "workers should have distinct instance keys", + hb1.getWorkerInstanceKey(), + hb2.getWorkerInstanceKey()); + + assertTrue("worker 1 should have host info", hb1.hasHostInfo()); + assertTrue("worker 2 should have host info", hb2.hasHostInfo()); + assertEquals( + "workers should share the same worker_grouping_key", + hb1.getHostInfo().getWorkerGroupingKey(), + hb2.getHostInfo().getWorkerGroupingKey()); + + // Verify both are also accessible via DescribeWorker + WorkerHeartbeat described1 = describeWorker(hb1.getWorkerInstanceKey()); + WorkerHeartbeat described2 = describeWorker(hb2.getWorkerInstanceKey()); + assertNotNull("worker 1 should be stored server-side", described1); + assertNotNull("worker 2 should be stored server-side", described2); + assertEquals(taskQueue1, described1.getTaskQueue()); + assertEquals(taskQueue2, described2.getTaskQueue()); + }); + + factory.shutdown(); + factory.awaitTermination(10, TimeUnit.SECONDS); + } + + /** + * Tests that resource-based tuner reports SlotSupplierKind as "ResourceBased". Matches Go's + * TestWorkerHeartbeatResourceBasedTuner. + */ + @Test + public void testResourceBasedSlotSupplierKind() throws Exception { + testWorkflowRule.getTestEnvironment().start(); + + String taskQueue = testWorkflowRule.getTaskQueue() + "-resource"; + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + WorkerFactory factory = + WorkerFactory.newInstance(client, testWorkflowRule.getWorkerFactoryOptions()); + + Worker worker = + factory.newWorker( + taskQueue, + WorkerOptions.newBuilder() + .setWorkerTuner( + ResourceBasedTuner.newBuilder() + .setControllerOptions( + ResourceBasedControllerOptions.newBuilder(0.7, 0.7).build()) + .build()) + .build()); + worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class); + worker.registerActivitiesImplementations(new TestActivityImpl()); + + factory.start(); + + // Discover the worker via ListWorkers, then verify slot supplier kind via DescribeWorker + assertEventually( + EVENTUALLY_TIMEOUT, + () -> { + List workers = listWorkersForQueue(taskQueue); + assertFalse("worker should appear via ListWorkers", workers.isEmpty()); + + WorkerHeartbeat described = describeWorker(workers.get(0).getWorkerInstanceKey()); + assertNotNull("DescribeWorker should return stored heartbeat", described); + assertEquals( + "workflow slot supplier kind should be ResourceBased", + "ResourceBased", + described.getWorkflowTaskSlotsInfo().getSlotSupplierKind()); + assertEquals( + "activity slot supplier kind should be ResourceBased", + "ResourceBased", + described.getActivityTaskSlotsInfo().getSlotSupplierKind()); + assertEquals( + "local activity slot supplier kind should be ResourceBased", + "ResourceBased", + described.getLocalActivitySlotsInfo().getSlotSupplierKind()); + }); + + factory.shutdown(); + factory.awaitTermination(10, TimeUnit.SECONDS); + } + + /** + * Tests that no heartbeats are sent when heartbeat interval is not configured. Matches Go's + * TestWorkerHeartbeatDisabled and Rust's worker_heartbeat_no_runtime_heartbeat. + */ + /** + * Verifies no heartbeats are sent when heartbeat interval is not configured. Polls ListWorkers + * repeatedly over 5 seconds to confirm the worker never appears. + */ + @SuppressWarnings("deprecation") + @Test + public void testNoHeartbeatsSentWhenDisabled() throws Exception { + SDKTestWorkflowRule noHeartbeatRule = + SDKTestWorkflowRule.newBuilder() + .setUseExternalService(true) + // No workerHeartbeatInterval — heartbeats should be disabled + .setDoNotStart(true) + .build(); + + try { + noHeartbeatRule.getTestEnvironment().start(); + + String taskQueue = noHeartbeatRule.getTaskQueue(); + String namespace = noHeartbeatRule.getWorkflowClient().getOptions().getNamespace(); + + // Poll ListWorkers repeatedly over 5 seconds — worker should never appear + long deadline = System.currentTimeMillis() + 5000; + while (System.currentTimeMillis() < deadline) { + ListWorkersResponse resp = + noHeartbeatRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .listWorkers( + ListWorkersRequest.newBuilder() + .setNamespace(namespace) + .setPageSize(100) + .build()); + List workers = + resp.getWorkersInfoList().stream() + .map(info -> info.getWorkerHeartbeat()) + .filter(hb -> hb.getTaskQueue().equals(taskQueue)) + .collect(Collectors.toList()); + assertTrue( + "no workers should appear via ListWorkers when heartbeat interval is not configured", + workers.isEmpty()); + } + } finally { + noHeartbeatRule.getTestEnvironment().shutdown(); + noHeartbeatRule.getTestEnvironment().awaitTermination(10, TimeUnit.SECONDS); + } + } + + /** + * Discovers the workerInstanceKey by polling ListWorkers until a worker appears for the default + * task queue. + */ + private String waitForWorkerInstanceKey() { + String taskQueue = testWorkflowRule.getTaskQueue(); + return assertEventually( + EVENTUALLY_TIMEOUT, + () -> { + List workers = listWorkersForQueue(taskQueue); + assertFalse( + "no workers found via ListWorkers for queue: " + taskQueue, workers.isEmpty()); + String key = workers.get(0).getWorkerInstanceKey(); + assertFalse("workerInstanceKey should not be empty", key.isEmpty()); + return key; + }); + } + + /** + * Lists workers via the ListWorkers RPC, filtered to a specific task queue. Uses the deprecated + * WorkersInfo field because the replacement (WorkerListInfo) is not yet populated by the server. + */ + @SuppressWarnings("deprecation") + private List listWorkersForQueue(String taskQueue) { + ListWorkersResponse resp = + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .listWorkers( + ListWorkersRequest.newBuilder() + .setNamespace(testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setPageSize(100) + .build()); + return resp.getWorkersInfoList().stream() + .map(info -> info.getWorkerHeartbeat()) + .filter(hb -> hb.getTaskQueue().equals(taskQueue)) + .collect(Collectors.toList()); + } + + /** + * Queries the test server for the stored heartbeat of a given worker via the DescribeWorker RPC. + */ + private WorkerHeartbeat describeWorker(String workerInstanceKey) { + try { + DescribeWorkerResponse resp = + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .describeWorker( + DescribeWorkerRequest.newBuilder() + .setNamespace( + testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setWorkerInstanceKey(workerInstanceKey) + .build()); + return resp.getWorkerInfo().getWorkerHeartbeat(); + } catch (io.grpc.StatusRuntimeException e) { + if (e.getStatus().getCode() == io.grpc.Status.Code.NOT_FOUND) { + return null; + } + throw e; + } + } + + // --- Workflow and Activity types --- + + @WorkflowInterface + public interface TestWorkflow { + @WorkflowMethod + String execute(String input); + } + + public static class TestWorkflowImpl implements TestWorkflow { + @Override + public String execute(String input) { + TestActivity activity = + Workflow.newActivityStub( + TestActivity.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build()); + return activity.doWork(input); + } + } + + @ActivityInterface + public interface TestActivity { + @ActivityMethod + String doWork(String input); + } + + public static class TestActivityImpl implements TestActivity { + @Override + public String doWork(String input) { + return "done"; + } + } + + @WorkflowInterface + public interface FailingWorkflow { + @WorkflowMethod + void execute(); + } + + public static class FailingWorkflowImpl implements FailingWorkflow { + @Override + public void execute() { + FailingActivity activity = + Workflow.newActivityStub( + FailingActivity.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .setRetryOptions( + io.temporal.common.RetryOptions.newBuilder().setMaximumAttempts(1).build()) + .build()); + activity.fail(); + } + } + + @ActivityInterface + public interface FailingActivity { + @ActivityMethod + void fail(); + } + + public static class FailingActivityImpl implements FailingActivity { + @Override + public void fail() { + throw io.temporal.failure.ApplicationFailure.newFailure( + "intentional failure for test", "TestFailure"); + } + } + + @WorkflowInterface + public interface BlockingWorkflow { + @WorkflowMethod + void execute(); + } + + public static class BlockingWorkflowImpl implements BlockingWorkflow { + @Override + public void execute() { + BlockingActivity activity = + Workflow.newActivityStub( + BlockingActivity.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(30)) + .setDisableEagerExecution(true) + .build()); + activity.block(); + } + } + + @ActivityInterface + public interface BlockingActivity { + @ActivityMethod + void block(); + } + + public static class BlockingActivityImpl implements BlockingActivity { + @Override + public void block() { + blockingActivityStarted.countDown(); + try { + blockingActivityRelease.await(20, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + @WorkflowInterface + public interface CacheTestWorkflow { + @WorkflowMethod + String execute(); + } + + public static class CacheTestWorkflowImpl implements CacheTestWorkflow { + @Override + public String execute() { + CacheTestActivity activity = + Workflow.newActivityStub( + CacheTestActivity.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(30)).build()); + return activity.doCacheWork(); + } + } + + @ActivityInterface + public interface CacheTestActivity { + @ActivityMethod + String doCacheWork(); + } + + public static class CacheTestActivityImpl implements CacheTestActivity { + @Override + public String doCacheWork() { + cacheTestActivityStarted.countDown(); + try { + cacheTestActivityRelease.await(20, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return "done"; + } + } +} diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index e73e6e7a8..abe814ead 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -733,6 +733,13 @@ public void getSystemInfo( responseObserver.onCompleted(); } + @Override + public void shutdownWorker( + ShutdownWorkerRequest request, StreamObserver responseObserver) { + responseObserver.onNext(ShutdownWorkerResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + private Context.CancellableContext deadlineCtx(Deadline deadline) { return Context.current().withDeadline(deadline, this.backgroundScheduler); } @@ -1902,7 +1909,8 @@ public void describeNamespace( NamespaceInfo.Capabilities.newBuilder() .setEagerWorkflowStart(true) .setAsyncUpdate(true) - .setSyncUpdate(true)) + .setSyncUpdate(true) + .setWorkerHeartbeats(true)) .build()) .build(); responseObserver.onNext(result);