Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ public enum SdkFlag {
* condition is resolved before the timeout.
*/
CANCEL_AWAIT_TIMER_ON_CONDITION(4),
/*
* Changes replay behavior of GetVersion to wait for the matching marker event before executing
* the callback.
*/
VERSION_WAIT_FOR_MARKER(5),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're getting more of these. We should think ahead at how to make sure this remains maintainable in the future.

Would you mind adding some comments on this flag and others above, indicating:

  • Which release introduced support for the flag (assume next patch release for new flags)
  • Which release turned it on (or "pending" if not yet turned on)
  • Ticket to the bug that the flag resolves.

UNKNOWN(Integer.MAX_VALUE);

private final int value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,20 @@ class InvocationStateMachine

private final int minSupported;
private final int maxSupported;
private final boolean waitForMarkerRecordedReplaying;
private final Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback;
private final Functions.Proc2<Integer, RuntimeException> resultCallback;

InvocationStateMachine(
int minSupported,
int maxSupported,
boolean waitForMarkerRecordedReplaying,
Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback,
Functions.Proc2<Integer, RuntimeException> callback) {
super(STATE_MACHINE_DEFINITION, VersionStateMachine.this.commandSink, stateMachineSink);
this.minSupported = minSupported;
this.maxSupported = maxSupported;
this.waitForMarkerRecordedReplaying = waitForMarkerRecordedReplaying;
this.upsertSearchAttributeCallback = upsertSearchAttributeCallback;
this.resultCallback = Objects.requireNonNull(callback);
}
Expand Down Expand Up @@ -264,9 +267,14 @@ void notifySkippedExecuting() {
}

void notifyMarkerCreatedReplaying() {
if (waitForMarkerRecordedReplaying) {
// Replay already preloads the version value, so delay the callback until the real marker
// event is matched.
return;
}
try {
// it's a replay and the version to return from the getVersion call should be preloaded from
// the history
// It's a replay and the version to return from the getVersion call should be preloaded
// from the history.
final boolean usePreloadedVersion = true;
validateVersionAndThrow(usePreloadedVersion);
notifyFromVersion(usePreloadedVersion);
Expand Down Expand Up @@ -295,6 +303,14 @@ void flushPreloadedVersionAndUpdateFromEventReplaying() {
Preconditions.checkState(
preloadedVersion != null, "preloadedVersion is expected to be initialized");
flushPreloadedVersionAndUpdateFromEvent(currentEvent);
if (waitForMarkerRecordedReplaying) {
try {
validateVersionAndThrow(false);
notifyFromVersion(false);
} catch (RuntimeException ex) {
notifyFromException(ex);
}
}
}

void notifySkippedReplaying() {
Expand Down Expand Up @@ -393,11 +409,16 @@ private VersionStateMachine(
public Integer getVersion(
int minSupported,
int maxSupported,
boolean waitForMarkerRecordedReplaying,
Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback,
Functions.Proc2<Integer, RuntimeException> callback) {
InvocationStateMachine ism =
new InvocationStateMachine(
minSupported, maxSupported, upsertSearchAttributeCallback, callback);
minSupported,
maxSupported,
waitForMarkerRecordedReplaying,
upsertSearchAttributeCallback,
callback);
ism.explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE);
ism.explicitEvent(ExplicitEvent.SCHEDULE);
// If the state is SKIPPED_REPLAYING that means we:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,7 @@ public Integer getVersion(
return stateMachine.getVersion(
minSupported,
maxSupported,
checkSdkFlag(SdkFlag.VERSION_WAIT_FOR_MARKER),
(version) -> {
if (!workflowImplOptions.isEnableUpsertVersionSearchAttributes()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.temporal.internal.replay;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;

import com.uber.m3.tally.NoopScope;
import io.temporal.api.query.v1.WorkflowQuery;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
import io.temporal.client.WorkflowClient;
import io.temporal.common.WorkflowExecutionHistory;
import io.temporal.internal.worker.QueryReplayHelper;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.testing.TestWorkflowEnvironment;
import io.temporal.worker.Worker;
import io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest;
import io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest.GreetingWorkflowImpl;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import org.junit.Test;

public class GetVersionInterleavedUpdateReplayTaskHandlerTest {
private static final String EXPECTED_FIRST_CHANGE_ID = "ChangeId1";
private static final String EXPECTED_SECOND_CHANGE_ID = "ChangeId2";

/** Regression test for the lower-level replay path behind the public replayer API. */
@Test
public void testReplayDirectQueryWorkflowTaskSucceeds() throws Throwable {
WorkflowExecutionHistory history =
GetVersionInterleavedUpdateReplayTest.captureReplayableHistory();
assertEquals(
Arrays.asList(EXPECTED_FIRST_CHANGE_ID, EXPECTED_SECOND_CHANGE_ID),
GetVersionInterleavedUpdateReplayTest.extractVersionChangeIds(history.getEvents()));

TestWorkflowEnvironment testEnvironment = TestWorkflowEnvironment.newInstance();
ReplayWorkflowRunTaskHandler runTaskHandler = null;
try {
Worker worker = testEnvironment.newWorker(GetVersionInterleavedUpdateReplayTest.TASK_QUEUE);
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);

ReplayWorkflowTaskHandler replayTaskHandler = getNonStickyReplayTaskHandler(worker);
PollWorkflowTaskQueueResponse.Builder replayTask = newReplayTask(history);
runTaskHandler = createStatefulHandler(replayTaskHandler, replayTask);

WorkflowServiceStubs service =
getField(replayTaskHandler, "service", WorkflowServiceStubs.class);
String namespace = getField(replayTaskHandler, "namespace", String.class);
ServiceWorkflowHistoryIterator historyIterator =
new ServiceWorkflowHistoryIterator(service, namespace, replayTask, new NoopScope());

QueryResult result =
runTaskHandler.handleDirectQueryWorkflowTask(replayTask, historyIterator);
assertNotNull(result);
assertFalse(result.isWorkflowMethodCompleted());
assertFalse(result.getResponsePayloads().isPresent());
} finally {
if (runTaskHandler != null) {
runTaskHandler.close();
}
testEnvironment.close();
}
}

private static PollWorkflowTaskQueueResponse.Builder newReplayTask(
WorkflowExecutionHistory history) {
return PollWorkflowTaskQueueResponse.newBuilder()
.setWorkflowExecution(history.getWorkflowExecution())
.setWorkflowType(
history
.getHistory()
.getEvents(0)
.getWorkflowExecutionStartedEventAttributes()
.getWorkflowType())
.setStartedEventId(Long.MAX_VALUE)
.setPreviousStartedEventId(Long.MAX_VALUE)
.setHistory(history.getHistory())
.setQuery(WorkflowQuery.newBuilder().setQueryType(WorkflowClient.QUERY_TYPE_REPLAY_ONLY));
}

private static ReplayWorkflowTaskHandler getNonStickyReplayTaskHandler(Worker worker)
throws Exception {
Object workflowWorker = getField(worker, "workflowWorker", Object.class);
QueryReplayHelper queryReplayHelper =
getField(workflowWorker, "queryReplayHelper", QueryReplayHelper.class);
return getField(queryReplayHelper, "handler", ReplayWorkflowTaskHandler.class);
}

private static ReplayWorkflowRunTaskHandler createStatefulHandler(
ReplayWorkflowTaskHandler replayTaskHandler, PollWorkflowTaskQueueResponse.Builder replayTask)
throws Exception {
Method method =
ReplayWorkflowTaskHandler.class.getDeclaredMethod(
"createStatefulHandler",
PollWorkflowTaskQueueResponse.Builder.class,
com.uber.m3.tally.Scope.class);
method.setAccessible(true);
return (ReplayWorkflowRunTaskHandler)
method.invoke(replayTaskHandler, replayTask, new NoopScope());
}

private static <T> T getField(Object target, String fieldName, Class<T> expectedType)
throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return expectedType.cast(field.get(target));
}
}
Loading
Loading