Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/build-native-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ jobs:
- name: Build native test server (Docker non-musl)
if: matrix.os_family == 'linux' && matrix.musl == false
run: |
IMAGE_ID=$(docker build -q ./docker/native-image)
IMAGE_ID_FILE="$(mktemp)"
docker build --iidfile "$IMAGE_ID_FILE" ./docker/native-image
IMAGE_ID="$(cat "$IMAGE_ID_FILE")"
docker run \
--rm -w /github/workspace -v "$(pwd):/github/workspace" \
"$IMAGE_ID" \
Expand All @@ -96,7 +98,9 @@ jobs:
- name: Build native test server (Docker musl)
if: matrix.os_family == 'linux' && matrix.musl == true
run: |
IMAGE_ID=$(docker build -q ./docker/native-image-musl)
IMAGE_ID_FILE="$(mktemp)"
docker build --iidfile "$IMAGE_ID_FILE" ./docker/native-image-musl
IMAGE_ID="$(cat "$IMAGE_ID_FILE")"
docker run \
--rm -w /github/workspace -v "$(pwd):/github/workspace" \
"$IMAGE_ID" \
Expand Down
12 changes: 6 additions & 6 deletions docker/native-image-musl/dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ FROM ubuntu:24.04
ENV JAVA_HOME=/usr/lib64/graalvm/graalvm-community-java23
COPY --from=ghcr.io/graalvm/native-image-community:23 $JAVA_HOME $JAVA_HOME
ENV PATH="${JAVA_HOME}/bin:${PATH}"
RUN apt-get -y update --allow-releaseinfo-change && apt-get install -y -V git build-essential curl binutils
RUN apt-get -y update --allow-releaseinfo-change && apt-get install -y -V git build-essential curl ca-certificates binutils
COPY install-musl.sh /opt/install-musl.sh
RUN chmod +x /opt/install-musl.sh
WORKDIR /opt
# We need to build musl and zlibc with musl to for a static build
# See https://www.graalvm.org/21.3/reference-manual/native-image/StaticImages/index.html
# We need to build musl and zlib with musl for a static build.
# See https://www.graalvm.org/21.3/reference-manual/native-image/StaticImages/index.html.
RUN ./install-musl.sh
ENV MUSL_HOME=/opt/musl-toolchain
ENV PATH="$MUSL_HOME/bin:$PATH"
# Verify installation
# Verify installation.
RUN x86_64-linux-musl-gcc --version
# Avoid errors like: "fatal: detected dubious ownership in repository"
RUN git config --global --add safe.directory '*'
# Avoid errors like: "fatal: detected dubious ownership in repository".
RUN git config --global --add safe.directory '*'
41 changes: 24 additions & 17 deletions docker/native-image-musl/install-musl.sh
Original file line number Diff line number Diff line change
@@ -1,28 +1,35 @@
# Specify an installation directory for musl:
export MUSL_HOME=$PWD/musl-toolchain
#!/usr/bin/env bash
set -euo pipefail

# Download musl and zlib sources:
curl -O https://musl.libc.org/releases/musl-1.2.5.tar.gz
curl -O https://zlib.net/fossils/zlib-1.2.13.tar.gz
readonly MUSL_VERSION=1.2.5
readonly ZLIB_VERSION=1.2.13

# Build musl from source
tar -xzvf musl-1.2.5.tar.gz
cd musl-1.2.5 || exit
export MUSL_HOME="$PWD/musl-toolchain"

curl --fail --location --retry 5 --retry-all-errors --output "musl-${MUSL_VERSION}.tar.gz" \
"https://musl.libc.org/releases/musl-${MUSL_VERSION}.tar.gz"
curl --fail --location --retry 5 --retry-all-errors --output "zlib-${ZLIB_VERSION}.tar.gz" \
"https://github.com/madler/zlib/releases/download/v${ZLIB_VERSION}/zlib-${ZLIB_VERSION}.tar.gz"

# Build musl from source.
tar -xzf "musl-${MUSL_VERSION}.tar.gz"
cd "musl-${MUSL_VERSION}"
./configure --prefix=$MUSL_HOME --static
# The next operation may require privileged access to system resources, so use sudo
make && make install
make -j"$(nproc)"
make install
cd ..

# Install a symlink for use by native-image
ln -s $MUSL_HOME/bin/musl-gcc $MUSL_HOME/bin/x86_64-linux-musl-gcc
# Install a symlink for use by native-image.
ln -sf "$MUSL_HOME/bin/musl-gcc" "$MUSL_HOME/bin/x86_64-linux-musl-gcc"

# Extend the system path and confirm that musl is available by printing its version
# Extend the system path and confirm that musl is available by printing its version.
export PATH="$MUSL_HOME/bin:$PATH"
x86_64-linux-musl-gcc --version

# Build zlib with musl from source and install into the MUSL_HOME directory
tar -xzvf zlib-1.2.13.tar.gz
cd zlib-1.2.13 || exit
# Build zlib with musl from source and install into the MUSL_HOME directory.
tar -xzf "zlib-${ZLIB_VERSION}.tar.gz"
cd "zlib-${ZLIB_VERSION}"
CC=musl-gcc ./configure --prefix=$MUSL_HOME --static
make && make install
make -j"$(nproc)"
make install
cd ..
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ public enum SdkFlag {
* condition is resolved before the timeout.
*/
CANCEL_AWAIT_TIMER_ON_CONDITION(4),
/*
* Changes replay behavior of GetVersion to wait for the matching marker event before executing
* the callback.
*
* Introduced: 1.36.0
*
* Enabled: (pending)
*
* Bug: https://github.com/temporalio/sdk-java/issues/2796
*/
VERSION_WAIT_FOR_MARKER(5),
Comment thread
mjameswh marked this conversation as resolved.
UNKNOWN(Integer.MAX_VALUE);

private final int value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,20 @@ class InvocationStateMachine

private final int minSupported;
private final int maxSupported;
private final boolean waitForMarkerRecordedReplaying;
private final Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback;
private final Functions.Proc2<Integer, RuntimeException> resultCallback;

InvocationStateMachine(
int minSupported,
int maxSupported,
boolean waitForMarkerRecordedReplaying,
Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback,
Functions.Proc2<Integer, RuntimeException> callback) {
super(STATE_MACHINE_DEFINITION, VersionStateMachine.this.commandSink, stateMachineSink);
this.minSupported = minSupported;
this.maxSupported = maxSupported;
this.waitForMarkerRecordedReplaying = waitForMarkerRecordedReplaying;
this.upsertSearchAttributeCallback = upsertSearchAttributeCallback;
this.resultCallback = Objects.requireNonNull(callback);
}
Expand Down Expand Up @@ -264,9 +267,14 @@ void notifySkippedExecuting() {
}

void notifyMarkerCreatedReplaying() {
if (waitForMarkerRecordedReplaying) {
// Replay already preloads the version value, so delay the callback until the real marker
// event is matched.
return;
}
try {
// it's a replay and the version to return from the getVersion call should be preloaded from
// the history
// It's a replay and the version to return from the getVersion call should be preloaded
// from the history.
final boolean usePreloadedVersion = true;
validateVersionAndThrow(usePreloadedVersion);
notifyFromVersion(usePreloadedVersion);
Expand Down Expand Up @@ -295,6 +303,14 @@ void flushPreloadedVersionAndUpdateFromEventReplaying() {
Preconditions.checkState(
preloadedVersion != null, "preloadedVersion is expected to be initialized");
flushPreloadedVersionAndUpdateFromEvent(currentEvent);
if (waitForMarkerRecordedReplaying) {
try {
validateVersionAndThrow(false);
notifyFromVersion(false);
} catch (RuntimeException ex) {
notifyFromException(ex);
}
}
}

void notifySkippedReplaying() {
Expand Down Expand Up @@ -393,11 +409,16 @@ private VersionStateMachine(
public Integer getVersion(
int minSupported,
int maxSupported,
boolean waitForMarkerRecordedReplaying,
Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback,
Functions.Proc2<Integer, RuntimeException> callback) {
InvocationStateMachine ism =
new InvocationStateMachine(
minSupported, maxSupported, upsertSearchAttributeCallback, callback);
minSupported,
maxSupported,
waitForMarkerRecordedReplaying,
upsertSearchAttributeCallback,
callback);
ism.explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE);
ism.explicitEvent(ExplicitEvent.SCHEDULE);
// If the state is SKIPPED_REPLAYING that means we:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,7 @@ public Integer getVersion(
return stateMachine.getVersion(
minSupported,
maxSupported,
checkSdkFlag(SdkFlag.VERSION_WAIT_FOR_MARKER),
(version) -> {
if (!workflowImplOptions.isEnableUpsertVersionSearchAttributes()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.temporal.internal.replay;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;

import com.uber.m3.tally.NoopScope;
import io.temporal.api.query.v1.WorkflowQuery;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
import io.temporal.client.WorkflowClient;
import io.temporal.common.WorkflowExecutionHistory;
import io.temporal.internal.worker.QueryReplayHelper;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.testing.TestWorkflowEnvironment;
import io.temporal.worker.Worker;
import io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest;
import io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest.GreetingWorkflowImpl;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import org.junit.Test;

public class GetVersionInterleavedUpdateReplayTaskHandlerTest {
private static final String EXPECTED_FIRST_CHANGE_ID = "ChangeId1";
private static final String EXPECTED_SECOND_CHANGE_ID = "ChangeId2";

/** Regression test for the lower-level replay path behind the public replayer API. */
@Test
public void testReplayDirectQueryWorkflowTaskSucceeds() throws Throwable {
WorkflowExecutionHistory history =
GetVersionInterleavedUpdateReplayTest.captureReplayableHistory();
assertEquals(
Arrays.asList(EXPECTED_FIRST_CHANGE_ID, EXPECTED_SECOND_CHANGE_ID),
GetVersionInterleavedUpdateReplayTest.extractVersionChangeIds(history.getEvents()));

TestWorkflowEnvironment testEnvironment = TestWorkflowEnvironment.newInstance();
ReplayWorkflowRunTaskHandler runTaskHandler = null;
try {
Worker worker = testEnvironment.newWorker(GetVersionInterleavedUpdateReplayTest.TASK_QUEUE);
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);

ReplayWorkflowTaskHandler replayTaskHandler = getNonStickyReplayTaskHandler(worker);
PollWorkflowTaskQueueResponse.Builder replayTask = newReplayTask(history);
runTaskHandler = createStatefulHandler(replayTaskHandler, replayTask);

WorkflowServiceStubs service =
getField(replayTaskHandler, "service", WorkflowServiceStubs.class);
String namespace = getField(replayTaskHandler, "namespace", String.class);
ServiceWorkflowHistoryIterator historyIterator =
new ServiceWorkflowHistoryIterator(service, namespace, replayTask, new NoopScope());

QueryResult result =
runTaskHandler.handleDirectQueryWorkflowTask(replayTask, historyIterator);
assertNotNull(result);
assertFalse(result.isWorkflowMethodCompleted());
assertFalse(result.getResponsePayloads().isPresent());
} finally {
if (runTaskHandler != null) {
runTaskHandler.close();
}
testEnvironment.close();
}
}

private static PollWorkflowTaskQueueResponse.Builder newReplayTask(
WorkflowExecutionHistory history) {
return PollWorkflowTaskQueueResponse.newBuilder()
.setWorkflowExecution(history.getWorkflowExecution())
.setWorkflowType(
history
.getHistory()
.getEvents(0)
.getWorkflowExecutionStartedEventAttributes()
.getWorkflowType())
.setStartedEventId(Long.MAX_VALUE)
.setPreviousStartedEventId(Long.MAX_VALUE)
.setHistory(history.getHistory())
.setQuery(WorkflowQuery.newBuilder().setQueryType(WorkflowClient.QUERY_TYPE_REPLAY_ONLY));
}

private static ReplayWorkflowTaskHandler getNonStickyReplayTaskHandler(Worker worker)
throws Exception {
Object workflowWorker = getField(worker, "workflowWorker", Object.class);
QueryReplayHelper queryReplayHelper =
getField(workflowWorker, "queryReplayHelper", QueryReplayHelper.class);
return getField(queryReplayHelper, "handler", ReplayWorkflowTaskHandler.class);
}

private static ReplayWorkflowRunTaskHandler createStatefulHandler(
ReplayWorkflowTaskHandler replayTaskHandler, PollWorkflowTaskQueueResponse.Builder replayTask)
throws Exception {
Method method =
ReplayWorkflowTaskHandler.class.getDeclaredMethod(
"createStatefulHandler",
PollWorkflowTaskQueueResponse.Builder.class,
com.uber.m3.tally.Scope.class);
method.setAccessible(true);
return (ReplayWorkflowRunTaskHandler)
method.invoke(replayTaskHandler, replayTask, new NoopScope());
}

private static <T> T getField(Object target, String fieldName, Class<T> expectedType)
throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return expectedType.cast(field.get(target));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public void testBasicWorkerVersioning() {
DescribeWorkerDeploymentResponse describeResp1 = waitUntilWorkerDeploymentVisible(v1);

setCurrentVersion(v1, describeResp1.getConflictToken());
waitForRoutingConfigPropagation(v1);

// Start workflow 1 which will use the 1.0 worker on auto-upgrade
TestWorkflows.QueryableWorkflow wf1 =
Expand All @@ -160,6 +161,7 @@ public void testBasicWorkerVersioning() {
new WorkerDeploymentVersion(testWorkflowRule.getDeploymentName(), "2.0");
DescribeWorkerDeploymentResponse describeResp2 = waitUntilWorkerDeploymentVisible(v2);
setCurrentVersion(v2, describeResp2.getConflictToken());
waitForRoutingConfigPropagation(v2);

TestWorkflows.QueryableWorkflow wf2 =
testWorkflowRule.newWorkflowStubTimeoutOptions(
Expand All @@ -173,6 +175,7 @@ public void testBasicWorkerVersioning() {

// Set current version to 3.0
setCurrentVersion(v3, describeResp3.getConflictToken());
waitForRoutingConfigPropagation(v3);

TestWorkflows.QueryableWorkflow wf3 =
testWorkflowRule.newWorkflowStubTimeoutOptions(
Expand Down Expand Up @@ -224,8 +227,10 @@ public void testRampWorkerVersioning() {
// Set cur ver to 1 & ramp 100% to 2
SetWorkerDeploymentCurrentVersionResponse setCurR =
setCurrentVersion(v1, describeResp1.getConflictToken());
waitForRoutingConfigPropagation(v1);
SetWorkerDeploymentRampingVersionResponse rampResp =
setRampingVersion(v2, 100, setCurR.getConflictToken());
waitForRoutingConfigPropagation(v1, v2);
// Run workflows and verify they've both started & run on v2
for (int i = 0; i < 3; i++) {
String res = runWorkflow("versioning-ramp-100");
Expand All @@ -234,12 +239,14 @@ public void testRampWorkerVersioning() {
// Set ramp to 0, and see them start on v1
SetWorkerDeploymentRampingVersionResponse rampResp2 =
setRampingVersion(v2, 0, rampResp.getConflictToken());
waitForRoutingConfigPropagation(v1, v2);
for (int i = 0; i < 3; i++) {
String res = runWorkflow("versioning-ramp-0");
Assert.assertEquals("version-v1", res);
}
// Set to 50% and see we eventually will have one run on v1 and one on v2
setRampingVersion(v2, 50, rampResp2.getConflictToken());
waitForRoutingConfigPropagation(v1, v2);
HashSet<String> seenRanOn = new HashSet<>();
Eventually.assertEventually(
Duration.ofSeconds(30),
Expand Down
Loading
Loading