Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ enum HandleEventStatus {
/** Initial set of SDK flags that will be set on all new workflow executions. */
@VisibleForTesting
public static List<SdkFlag> initialFlags =
Collections.unmodifiableList(Arrays.asList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));
Collections.singletonList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION);

/**
* Keep track of the change versions that have been seen by the SDK. This is used to generate the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ public int getVersion(String changeId, int minSupported, int maxSupported) {
* Previously the SDK would yield on the getVersion call to the scheduler. This is not ideal because it can lead to non-deterministic
* scheduling if the getVersion call was removed.
* */
if (replayContext.checkSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION)) {
if (replayContext.tryUseSdkFlag(SdkFlag.SKIP_YIELD_ON_VERSION)) {
// This can happen if we are replaying a workflow and encounter a getVersion call that did not
// exist on the original execution and the range does not include the default version.
if (versionToUse == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.temporal.workflow.failure;

import static io.temporal.testUtils.Eventually.assertEventually;

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowException;
import io.temporal.client.WorkflowStub;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
import java.time.Duration;
import java.util.List;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

public class WorkflowFailureGetVersionTest {

@Rule public TestName testName = new TestName();

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestWorkflowGetVersionAndException.class)
.build();

@Test
public void getVersionAndException() {
TestWorkflow1 workflow = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
WorkflowExecution execution = WorkflowClient.start(workflow::execute, testName.getMethodName());
WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow);

try {
HistoryEvent workflowTaskFailed =
assertEventually(
Duration.ofSeconds(5),
() -> {
List<HistoryEvent> failedEvents =
testWorkflowRule.getHistoryEvents(
execution.getWorkflowId(), EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED);
Assert.assertFalse("No workflow task failure recorded", failedEvents.isEmpty());
return failedEvents.get(0);
});

Failure failure =
getDeepestFailure(workflowTaskFailed.getWorkflowTaskFailedEventAttributes().getFailure());
Assert.assertEquals("Any error", failure.getMessage());
Assert.assertTrue(failure.hasApplicationFailureInfo());
Assert.assertEquals(
RuntimeException.class.getName(), failure.getApplicationFailureInfo().getType());
} finally {
try {
workflowStub.terminate("terminate test workflow");
} catch (WorkflowException ignored) {
}
}
}

private static Failure getDeepestFailure(Failure failure) {
while (failure.hasCause()) {
failure = failure.getCause();
}
return failure;
}

public static class TestWorkflowGetVersionAndException implements TestWorkflow1 {

@Override
public String execute(String unused) {
String changeId = "change-id";
Workflow.getVersion(changeId, Workflow.DEFAULT_VERSION, 1);
Workflow.getVersion(changeId, Workflow.DEFAULT_VERSION, 1);
throw new RuntimeException("Any error");
}
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,33 @@
package io.temporal.workflow.versionTests;

import static org.junit.Assert.*;
import static org.junit.Assume.assumeTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import io.temporal.internal.Issue;
import io.temporal.testing.WorkflowReplayer;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.testing.internal.TracingWorkerInterceptor;
import io.temporal.worker.WorkerOptions;
import io.temporal.workflow.Async;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows;
import io.temporal.workflow.unsafe.WorkflowUnsafe;
import java.time.Duration;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

@Issue("https://github.com/temporalio/sdk-java/issues/2307")
public class GetVersionMultithreadingRemoveTest extends BaseVersionTest {
public class GetVersionMultithreadingRemoveTest {

private static boolean hasReplayed;

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(
getDefaultWorkflowImplementationOptions(), TestGetVersionWorkflowImpl.class)
.setWorkflowTypes(TestGetVersionWorkflowImpl.class)
.setActivityImplementations(new TestActivities.TestActivitiesImpl())
// Forcing a replay. Full history arrived from a normal queue causing a replay.
.setWorkerOptions(
Expand All @@ -35,18 +36,30 @@ public class GetVersionMultithreadingRemoveTest extends BaseVersionTest {
.build())
.build();

public GetVersionMultithreadingRemoveTest(boolean setVersioningFlag, boolean upsertVersioningSA) {
super(setVersioningFlag, upsertVersioningSA);
@Before
public void setUp() {
hasReplayed = false;
}

@Test
public void testGetVersionMultithreadingRemoval() {
assumeTrue("This test only passes if SKIP_YIELD_ON_VERSION is enabled", setVersioningFlag);
TestWorkflows.TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);

String result = workflowStub.execute(testWorkflowRule.getTaskQueue());

assertTrue(hasReplayed);
assertEquals("activity1", result);
testWorkflowRule
.getInterceptor(TracingWorkerInterceptor.class)
.setExpected(
"interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP,
"newThread workflow-method",
"newThread null",
"getVersion",
"executeActivity customActivity1",
"sleep PT1S",
"activity customActivity1");
}

@Test
Expand Down Expand Up @@ -76,9 +89,8 @@ public String execute(String taskQueue) {
} else {
hasReplayed = true;
}
String result =
"activity" + testActivities.activity1(1); // This is executed in non-replay mode.
return result;

return "activity" + testActivities.activity1(1);
}
}
}
Loading