Conversation
Implements periodic worker heartbeat RPCs that report worker status, slot usage, poller info, and task counters to the server. Key components: - HeartbeatManager: per-namespace scheduler that aggregates heartbeats from all workers sharing that namespace - PollerTracker: tracks in-flight poll count and last successful poll time - WorkflowClientOptions.workerHeartbeatInterval: configurable interval (default 60s, range 1-60s, negative to disable) - TrackingSlotSupplier: extended with slot type reporting - Worker: builds SharedNamespaceWorker heartbeat data from activity, workflow, and nexus worker stats - TestWorkflowService: implements recordWorkerHeartbeat, describeWorker, and shutdownWorker RPCs for testing
9eeea9b to
bfd3fc6
Compare
temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java
Show resolved
Hide resolved
| private final AtomicInteger totalProcessedTasks = new AtomicInteger(); | ||
| private final AtomicInteger totalFailedTasks = new AtomicInteger(); |
There was a problem hiding this comment.
All the workers now have something like this - can we abstract these out into something else? Seems like we could use a shared interface or base class.
There was a problem hiding this comment.
moved into a new TaskCounter class
| DescribeNamespaceRequest.newBuilder() | ||
| .setNamespace(workflowClient.getOptions().getNamespace()) | ||
| .build()); | ||
| boolean heartbeatsSupported = |
There was a problem hiding this comment.
In my PR I've bundled a bunch of capability stuff up into a class, FYI, this will want to go in there
There was a problem hiding this comment.
Thanks for the heads up, had Claude pull in your new class, so merge conflict should be minimal now. Feel free to merge first
temporal-sdk/src/test/java/io/temporal/internal/worker/HeartbeatManagerTest.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatIntegrationTest.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatIntegrationTest.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatIntegrationTest.java
Outdated
Show resolved
Hide resolved
… use separate TaskCounter, get TaskQueueType live not just on start
temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java
Outdated
Show resolved
Hide resolved
temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java
Outdated
Show resolved
Hide resolved
temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java
Show resolved
Hide resolved
… anticipate merge conflict
temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/internal/worker/HeartbeatManager.java
Show resolved
Hide resolved
| if (hasNexusServices) { | ||
| types.add(TaskQueueType.TASK_QUEUE_TYPE_NEXUS); | ||
| } | ||
| return types; |
There was a problem hiding this comment.
Active queue types reported inaccurately
Medium Severity
getActiveTaskQueueTypes() derives types from object presence (workflowWorker, activityWorker) instead of whether those workers are actually polling. This reports TASK_QUEUE_TYPE_WORKFLOW (and often TASK_QUEUE_TYPE_ACTIVITY) even when no corresponding implementations are registered, so ShutdownWorkerRequest.task_queue_types can be incorrect.
| private volatile boolean shuttingDown = false; | ||
| private boolean hasNexusServices = false; | ||
| private final String workerInstanceKey = UUID.randomUUID().toString(); | ||
| private final Instant startTime = Instant.now(); |
There was a problem hiding this comment.
temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java
Show resolved
Hide resolved
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Heartbeat thread may leak during shutdown
Medium Severity
SharedNamespaceWorker.shutdown() only calls scheduler.shutdown() and waits 5 seconds. If heartbeatTick() is blocked in recordWorkerHeartbeat, the running task is never interrupted, so the scheduler thread can remain alive after shutdown and keep resources pinned.
Additional Locations (1)
| private final Instant startTime = Instant.now(); | ||
| private final WorkflowClientOptions clientOptions; | ||
| private final @Nonnull WorkflowExecutorCache cache; | ||
| private final Map<String, TaskSnapshot> previousSnapshots = new HashMap<>(); |
There was a problem hiding this comment.
Non-thread-safe HashMap used from heartbeat scheduler thread
Low Severity
previousSnapshots is a plain HashMap mutated by buildSlotsInfo from the heartbeat scheduler thread. While currently protected by callbackLock inside the callback closure, the buildSlotsInfo method itself has no synchronization and doesn't document this threading requirement. Using a ConcurrentHashMap or moving the snapshot state into the synchronized callback closure (alongside lastHeartbeatTime) would make the thread-safety guarantee self-evident and prevent future accidental unsynchronized access.


What was changed
Implements periodic worker heartbeat RPC that reports worker status, slot usage, poller info, host metrics, and sticky cache counters to the Temporal server. Includes HeartbeatManager, PollerTracker, and integration tests covering all heartbeat fields.
HeartbeatManager— Per-namespace heartbeat scheduler. Workers register/unregister; scheduler fires at the configured interval. Gracefully shuts down if server returns UNIMPLEMENTED.PollerTracker— Tracks in-flight poll count and last successful poll time per worker type. Only records success when a poll returns actual work.WorkflowClientOptions.workerHeartbeatInterval— New option to configure heartbeat interval. Defaults to 60s. Can be set between 1-60s, or a negative duration to disable.Worker.getActiveTaskQueueTypes()— Reports WORKFLOW, ACTIVITY, and NEXUS (only when Nexus services are registered, matching Go SDK).Worker.buildHeartbeat()— Assembles the full WorkerHeartbeat proto with slot info, poller info, host metrics, sticky cache counters, and timestamps.TrackingSlotSupplier.getSlotSupplierKind()— Reports FixedSize vs ResourceBased in heartbeats.Why?
New feature!
Checklist
Closes Worker Heartbeating #2716
How was this tested:
Note
Medium Risk
Touches core worker polling/shutdown paths and introduces periodic RPCs and new configuration; issues could impact worker performance or shutdown semantics, though behavior is capability-gated and can be disabled.
Overview
Adds periodic worker heartbeat reporting to Temporal Server, including a new
HeartbeatManagerscheduler that aggregates per-namespace heartbeats and disables itself onUNIMPLEMENTED.Extends
WorkflowClientOptionswith experimentalworkerHeartbeatInterval(default 60s; negative disables) and wires heartbeating intoWorkerFactory/Workerlifecycle: workers generate detailedWorkerHeartbeatpayloads (status, active task queue types, slot usage, poller stats, sticky cache hit/miss counters, host metrics, plugin info, deployment version) and send a final shutting down heartbeat duringshutdownWorker.Refactors poller/task accounting to support heartbeats by introducing
PollerTracker(in-flight polls + last successful poll time) andTaskCounter(processed/failed), updating poll tasks/workers to use these trackers, enhancingTrackingSlotSupplierwith supplier-kind/used-slot introspection, and enabling CI’s dev server withfrontend.ListWorkersEnabled=truefor tests.Written by Cursor Bugbot for commit cb9b3b7. This will update automatically on new commits. Configure here.