Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ jobs:
--dynamic-config-value history.MaxBufferedQueryCount=10000 \
--dynamic-config-value frontend.workerVersioningDataAPIs=true \
--dynamic-config-value history.enableRequestIdRefLinks=true \
--dynamic-config-value frontend.ListWorkersEnabled=true \
--dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' &
sleep 10s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.temporal.internal.client.external.ManualActivityCompletionClientFactory;
import io.temporal.internal.common.PluginUtils;
import io.temporal.internal.sync.StubMarker;
import io.temporal.internal.worker.HeartbeatManager;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsPlugin;
Expand Down Expand Up @@ -53,6 +54,8 @@ final class WorkflowClientInternalImpl implements WorkflowClient, WorkflowClient
private final Scope metricsScope;
private final WorkflowClientInterceptor[] interceptors;
private final WorkerFactoryRegistry workerFactoryRegistry = new WorkerFactoryRegistry();
private final String workerGroupingKey = java.util.UUID.randomUUID().toString();
private final @Nullable HeartbeatManager heartbeatManager;

/**
* Creates client that connects to an instance of the Temporal Service. Cannot be used from within
Expand Down Expand Up @@ -112,6 +115,14 @@ public static WorkflowClient newInstance(
options.getNamespace(),
options.getIdentity(),
options.getDataConverter());

java.time.Duration heartbeatInterval = options.getWorkerHeartbeatInterval();
if (!heartbeatInterval.isNegative()) {
this.heartbeatManager =
new HeartbeatManager(workflowServiceStubs, options.getIdentity(), heartbeatInterval);
} else {
this.heartbeatManager = null;
}
}

private WorkflowClientCallsInterceptor initializeClientInvoker() {
Expand Down Expand Up @@ -790,6 +801,17 @@ public void deregisterWorkerFactory(WorkerFactory workerFactory) {
workerFactoryRegistry.deregister(workerFactory);
}

@Override
public String getWorkerGroupingKey() {
return workerGroupingKey;
}

@Override
@Nullable
public HeartbeatManager getHeartbeatManager() {
return heartbeatManager;
}

@Override
public NexusStartWorkflowResponse startNexus(
NexusStartWorkflowRequest request, Functions.Proc workflow) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package io.temporal.client;

import com.google.common.base.Preconditions;
import io.temporal.api.enums.v1.QueryRejectCondition;
import io.temporal.common.Experimental;
import io.temporal.common.context.ContextPropagator;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.GlobalDataConverter;
import io.temporal.common.interceptors.WorkflowClientInterceptor;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -49,6 +51,7 @@ public static final class Builder {
private List<ContextPropagator> contextPropagators;
private QueryRejectCondition queryRejectCondition;
private WorkflowClientPlugin[] plugins;
private Duration workerHeartbeatInterval;

private Builder() {}

Expand All @@ -64,6 +67,7 @@ private Builder(WorkflowClientOptions options) {
contextPropagators = options.contextPropagators;
queryRejectCondition = options.queryRejectCondition;
plugins = options.plugins;
workerHeartbeatInterval = options.workerHeartbeatInterval;
}

public Builder setNamespace(String namespace) {
Expand Down Expand Up @@ -153,6 +157,19 @@ public Builder setPlugins(WorkflowClientPlugin... plugins) {
return this;
}

/**
* Sets the interval at which workers send heartbeat RPCs to the server. If not set or set to
* zero, defaults to 60 seconds. A negative duration disables heartbeating. Positive values must
* be between 1 and 60 seconds inclusive.
*
* @param workerHeartbeatInterval the heartbeat interval, or a negative duration to disable
*/
@Experimental
public Builder setWorkerHeartbeatInterval(Duration workerHeartbeatInterval) {
this.workerHeartbeatInterval = workerHeartbeatInterval;
return this;
}

public WorkflowClientOptions build() {
return new WorkflowClientOptions(
namespace,
Expand All @@ -162,7 +179,8 @@ public WorkflowClientOptions build() {
binaryChecksum,
contextPropagators,
queryRejectCondition,
plugins == null ? EMPTY_PLUGINS : plugins);
plugins == null ? EMPTY_PLUGINS : plugins,
resolveHeartbeatInterval(workerHeartbeatInterval));
}

/**
Expand All @@ -188,7 +206,22 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {
queryRejectCondition == null
? QueryRejectCondition.QUERY_REJECT_CONDITION_UNSPECIFIED
: queryRejectCondition,
plugins == null ? EMPTY_PLUGINS : plugins);
plugins == null ? EMPTY_PLUGINS : plugins,
resolveHeartbeatInterval(workerHeartbeatInterval));
}

private static Duration resolveHeartbeatInterval(Duration raw) {
if (raw == null || raw.isZero()) {
return Duration.ofSeconds(60);
}
if (raw.isNegative()) {
return raw;
}
Preconditions.checkArgument(
raw.compareTo(Duration.ofSeconds(1)) >= 0 && raw.compareTo(Duration.ofSeconds(60)) <= 0,
"workerHeartbeatInterval must be between 1s and 60s, got %s",
raw);
return raw;
}
}

Expand All @@ -215,6 +248,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() {

private final WorkflowClientPlugin[] plugins;

private final Duration workerHeartbeatInterval;

private WorkflowClientOptions(
String namespace,
DataConverter dataConverter,
Expand All @@ -223,7 +258,8 @@ private WorkflowClientOptions(
String binaryChecksum,
List<ContextPropagator> contextPropagators,
QueryRejectCondition queryRejectCondition,
WorkflowClientPlugin[] plugins) {
WorkflowClientPlugin[] plugins,
Duration workerHeartbeatInterval) {
this.namespace = namespace;
this.dataConverter = dataConverter;
this.interceptors = interceptors;
Expand All @@ -232,6 +268,7 @@ private WorkflowClientOptions(
this.contextPropagators = contextPropagators;
this.queryRejectCondition = queryRejectCondition;
this.plugins = plugins;
this.workerHeartbeatInterval = workerHeartbeatInterval;
}

/**
Expand Down Expand Up @@ -289,6 +326,15 @@ public WorkflowClientPlugin[] getPlugins() {
return plugins;
}

/**
* Returns the worker heartbeat interval. Defaults to 60 seconds if not configured. A negative
* duration means heartbeating is explicitly disabled.
*/
@Experimental
public Duration getWorkerHeartbeatInterval() {
return workerHeartbeatInterval;
}

@Override
public String toString() {
return "WorkflowClientOptions{"
Expand All @@ -311,6 +357,8 @@ public String toString() {
+ queryRejectCondition
+ ", plugins="
+ Arrays.toString(plugins)
+ ", workerHeartbeatInterval="
+ workerHeartbeatInterval
+ '}';
}

Expand All @@ -326,7 +374,9 @@ public boolean equals(Object o) {
&& com.google.common.base.Objects.equal(binaryChecksum, that.binaryChecksum)
&& com.google.common.base.Objects.equal(contextPropagators, that.contextPropagators)
&& queryRejectCondition == that.queryRejectCondition
&& Arrays.equals(plugins, that.plugins);
&& Arrays.equals(plugins, that.plugins)
&& com.google.common.base.Objects.equal(
workerHeartbeatInterval, that.workerHeartbeatInterval);
}

@Override
Expand All @@ -339,6 +389,7 @@ public int hashCode() {
binaryChecksum,
contextPropagators,
queryRejectCondition,
Arrays.hashCode(plugins));
Arrays.hashCode(plugins),
workerHeartbeatInterval);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.temporal.internal.client;

import io.temporal.client.WorkflowClient;
import io.temporal.internal.worker.HeartbeatManager;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.Functions;
import javax.annotation.Nullable;

/**
* From OOP point of view, there is no reason for this interface not to extend {@link
Expand All @@ -18,4 +20,9 @@ public interface WorkflowClientInternal {
void deregisterWorkerFactory(WorkerFactory workerFactory);

NexusStartWorkflowResponse startNexus(NexusStartWorkflowRequest request, Functions.Proc workflow);

String getWorkerGroupingKey();

@Nullable
HeartbeatManager getHeartbeatManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.temporal.worker.PollerTypeMetricsTag;
import io.temporal.worker.tuning.*;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
Expand All @@ -30,7 +29,7 @@ final class ActivityPollTask implements MultiThreadedPoller.PollTask<ActivityTas
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
private final Scope metricsScope;
private final PollActivityTaskQueueRequest pollRequest;
private final AtomicInteger pollGauge = new AtomicInteger();
private final PollerTracker pollerTracker;

@SuppressWarnings("deprecation")
public ActivityPollTask(
Expand All @@ -42,10 +41,12 @@ public ActivityPollTask(
double activitiesPerSecond,
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier,
@Nonnull Scope metricsScope,
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities) {
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities,
@Nonnull PollerTracker pollerTracker) {
this.service = Objects.requireNonNull(service);
this.slotSupplier = slotSupplier;
this.metricsScope = Objects.requireNonNull(metricsScope);
this.pollerTracker = Objects.requireNonNull(pollerTracker);

PollActivityTaskQueueRequest.Builder pollRequest =
PollActivityTaskQueueRequest.newBuilder()
Expand Down Expand Up @@ -100,7 +101,7 @@ public ActivityTask poll() {

MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK)
.gauge(MetricsType.NUM_POLLERS)
.update(pollGauge.incrementAndGet());
.update(pollerTracker.pollStarted());

try {
response =
Expand All @@ -119,14 +120,15 @@ public ActivityTask poll() {
ProtobufTimeUtils.toM3Duration(
response.getStartedTime(), response.getCurrentAttemptScheduledTime()));
isSuccessful = true;
pollerTracker.pollSucceeded();
return new ActivityTask(
response,
permit,
() -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit));
} finally {
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK)
.gauge(MetricsType.NUM_POLLERS)
.update(pollGauge.decrementAndGet());
.update(pollerTracker.pollCompleted());

if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
}
Expand Down
Loading
Loading