From 06bd1d48e66b376ebfeb302446a61d75bc7b84cb Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 26 May 2026 18:08:29 +0200 Subject: [PATCH 1/6] feat(core,world-vercel): fence event writes against a stale snapshot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The elapsed-wait scan now snapshots the loaded events' tail eventId and passes it as `lastKnownEventId` on each `wait_completed` write, so a concurrent `resumeHook` that has already advanced the canonical log is detected — the server's CAS rejects the write, we surface it as the existing `EntityConflictError`, and the next iteration re-replays against the fresh event list (mirroring the duplicate-wait fall-through that was already there). `resumeHook` sends `asOfTimestamp` (Date.now() at call time) so the server resolves the fence to the highest eventId strictly before resume time — no client-side event pre-read needed. Plumbed through `CreateEventParams` on `@workflow/world` so future worlds can forward as-is. Co-Authored-By: Claude Opus 4.7 (1M context) --- .changeset/event-write-occ-fence.md | 13 ++++++++++ packages/core/src/runtime.ts | 33 +++++++++++++++++++++--- packages/core/src/runtime/resume-hook.ts | 10 +++++-- packages/world-vercel/src/events.ts | 6 +++++ packages/world/src/events.ts | 14 ++++++++++ 5 files changed, 70 insertions(+), 6 deletions(-) create mode 100644 .changeset/event-write-occ-fence.md diff --git a/.changeset/event-write-occ-fence.md b/.changeset/event-write-occ-fence.md new file mode 100644 index 0000000000..ab4b7decc1 --- /dev/null +++ b/.changeset/event-write-occ-fence.md @@ -0,0 +1,13 @@ +--- +"@workflow/core": patch +"@workflow/world": patch +"@workflow/world-vercel": patch +--- + +Add an optimistic-concurrency fence to event writes that talk to workflow-server. + +- The elapsed-wait scan now passes `lastKnownEventId` snapshotted from the loaded events when committing `wait_completed`, so a stale-snapshot tick can't slip a sleep-branch event past a freshly-committed `hook_received`. +- `resumeHook` sends `asOfTimestamp` with the new `hook_received` event so the server-side fence is anchored at the resume call's wall-clock without paying for a client-side event pre-read. +- The `CreateEventParams` shape on `@workflow/world` grows two optional fields (`lastKnownEventId`, `asOfTimestamp`) that worlds may forward as-is. + +Conflict surfaces as the existing `EntityConflictError`, which the runtime already reloads-and-continues on. diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 1aa36e32ad..f7a4e2c304 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -796,18 +796,43 @@ export function workflowEntrypoint( }, })); + // Snapshot the loaded events' tail eventId as the OCC + // fence. If a concurrent writer (e.g. `resumeHook`) + // committed something between our load and these + // writes, the server's CAS will reject and we'll + // surface that as `EntityConflictError` — same handling + // as a duplicate wait completion (skip + continue, + // we'll re-replay the next iteration with fresh events). + let fenceEventId: string | undefined = + events.length > 0 + ? events[events.length - 1].eventId + : undefined; for (const waitEvent of waitsToComplete) { try { - await world.events.create(runId, waitEvent, { - requestId, - }); + const result = await world.events.create( + runId, + waitEvent, + { + requestId, + ...(fenceEventId + ? { lastKnownEventId: fenceEventId } + : {}), + } + ); + // Advance the local fence so the next wait_completed + // (or subsequent write) chains off the just-committed + // event, not the snapshot tail. + if (result.event) { + fenceEventId = result.event.eventId; + } } catch (err) { if (EntityConflictError.is(err)) { runtimeLogger.info( - 'Wait already completed, skipping', + 'Wait already completed or fence conflict, skipping', { workflowRunId: runId, correlationId: waitEvent.correlationId, + fenceEventId, } ); continue; diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index 0787d4d4e5..85381ec6a8 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -156,7 +156,13 @@ export async function resumeHook( }) ); - // Create a hook_received event with the payload + // Create a hook_received event with the payload. `asOfTimestamp` + // anchors the OCC fence: the server resolves it to the highest + // eventId strictly before `now` and uses that as the expected + // previous fence, so this `hook_received` is guaranteed to land + // after anything the caller could have observed at resume time — + // no client-side event pre-load required. + const asOfTimestamp = Date.now(); await world.events.create( hook.runId, { @@ -168,7 +174,7 @@ export async function resumeHook( payload: dehydratedPayload, }, }, - { v1Compat } + { v1Compat, asOfTimestamp } ); span?.setAttributes({ diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 433cedcc7c..a896e04330 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -457,6 +457,12 @@ async function createWorkflowRunEventInner( ...data, remoteRefBehavior, ...(params?.requestId ? { vercelId: params.requestId } : {}), + ...(params?.lastKnownEventId + ? { lastKnownEventId: params.lastKnownEventId } + : {}), + ...(params?.asOfTimestamp !== undefined + ? { asOfTimestamp: params.asOfTimestamp } + : {}), }, config, schema: EventResultResolveWireSchema, diff --git a/packages/world/src/events.ts b/packages/world/src/events.ts index a541ac0563..3a33e2bfc0 100644 --- a/packages/world/src/events.ts +++ b/packages/world/src/events.ts @@ -391,6 +391,20 @@ export interface CreateEventParams { resolveData?: ResolveData; /** Request ID (x-vercel-id when on Vercel) for correlating request logs with workflow events. */ requestId?: string; + /** + * OCC fence: when set, the event write is rejected with a conflict + * unless the run's materialized `lastKnownEventId` equals this value. + * Lets the runtime stop a stale-snapshot tick from advancing the log. + */ + lastKnownEventId?: string; + /** + * OCC fence (alternative form): unix-ms cutoff. Server resolves to the + * highest eventId strictly before this timestamp and uses that as the + * expected fence. Lets `resumeHook` fence `hook_received` after anything + * the caller could have observed without paying for a separate read. + * Ignored when `lastKnownEventId` is also set. + */ + asOfTimestamp?: number; } /** From e65e9b06c9bbbe7d900086572852cb623313d695 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 26 May 2026 18:20:49 +0200 Subject: [PATCH 2/6] resume-hook: drop asOfTimestamp fence (let hook_received always append) --- packages/core/src/runtime/resume-hook.ts | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index 85381ec6a8..c89aec4621 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -156,13 +156,12 @@ export async function resumeHook( }) ); - // Create a hook_received event with the payload. `asOfTimestamp` - // anchors the OCC fence: the server resolves it to the highest - // eventId strictly before `now` and uses that as the expected - // previous fence, so this `hook_received` is guaranteed to land - // after anything the caller could have observed at resume time — - // no client-side event pre-load required. - const asOfTimestamp = Date.now(); + // Append `hook_received` unconditionally — ULID ordering already + // places this write after anything committed before us. We do + // NOT send `lastKnownEventId` here: a fence would only ever + // reject the hook in favor of an unrelated concurrent write, + // which would lose the user's hook signal. Stale-snapshot + // protection lives on the *tick* writes that consume hooks. await world.events.create( hook.runId, { @@ -174,7 +173,7 @@ export async function resumeHook( payload: dehydratedPayload, }, }, - { v1Compat, asOfTimestamp } + { v1Compat } ); span?.setAttributes({ From db3cf6292c3a8f8eee75030cf706d9ceab33894b Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 26 May 2026 20:36:44 +0200 Subject: [PATCH 3/6] runtime: retry in-place on fence conflict (avoid thunder-herd) --- packages/core/src/runtime.ts | 117 +++++++++++++++++++++++++---------- 1 file changed, 86 insertions(+), 31 deletions(-) diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index f7a4e2c304..90d87688f6 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -798,46 +798,101 @@ export function workflowEntrypoint( // Snapshot the loaded events' tail eventId as the OCC // fence. If a concurrent writer (e.g. `resumeHook`) - // committed something between our load and these - // writes, the server's CAS will reject and we'll - // surface that as `EntityConflictError` — same handling - // as a duplicate wait completion (skip + continue, - // we'll re-replay the next iteration with fresh events). + // committed something between our load and this write, + // the server's CAS rejects and we retry *in-place* + // with a freshly-loaded fence rather than throwing + // the whole tick away. Falling back to queue + // redelivery thunder-herds — every redelivery spawns + // another concurrent tick which fences-conflicts + // again, and workflows stall in `running`. let fenceEventId: string | undefined = events.length > 0 ? events[events.length - 1].eventId : undefined; + const MAX_FENCE_RETRIES = 5; for (const waitEvent of waitsToComplete) { - try { - const result = await world.events.create( - runId, - waitEvent, - { - requestId, - ...(fenceEventId - ? { lastKnownEventId: fenceEventId } - : {}), - } - ); - // Advance the local fence so the next wait_completed - // (or subsequent write) chains off the just-committed - // event, not the snapshot tail. - if (result.event) { - fenceEventId = result.event.eventId; - } - } catch (err) { - if (EntityConflictError.is(err)) { - runtimeLogger.info( - 'Wait already completed or fence conflict, skipping', + let attempts = 0; + let written = false; + while (!written) { + try { + const result = await world.events.create( + runId, + waitEvent, { - workflowRunId: runId, - correlationId: waitEvent.correlationId, - fenceEventId, + requestId, + ...(fenceEventId + ? { lastKnownEventId: fenceEventId } + : {}), } ); - continue; + if (result.event) { + fenceEventId = result.event.eventId; + } + written = true; + } catch (err) { + if (!EntityConflictError.is(err)) { + throw err; + } + const isFenceConflict = /fence conflict/i.test( + err.message + ); + const isDuplicateWait = /workflow wait/i.test( + err.message + ); + if (isDuplicateWait) { + runtimeLogger.info( + 'Wait already completed, skipping', + { + workflowRunId: runId, + correlationId: waitEvent.correlationId, + } + ); + break; + } + if (!isFenceConflict) { + throw err; + } + attempts += 1; + if (attempts > MAX_FENCE_RETRIES) { + runtimeLogger.warn( + 'Wait completion gave up after fence retries; falling back to queue redelivery', + { + workflowRunId: runId, + correlationId: waitEvent.correlationId, + attempts, + } + ); + throw err; + } + const loaded = eventsCursor + ? await loadWorkflowRunEvents(runId, eventsCursor) + : await loadWorkflowRunEvents(runId); + if (eventsCursor) { + for (const e of loaded.events) { + if ( + !events.some((x) => x.eventId === e.eventId) + ) { + events.push(e); + } + } + eventsCursor = loaded.cursor ?? eventsCursor; + } else { + events = loaded.events; + eventsCursor = loaded.cursor; + } + const alreadyCompleted = events.some( + (e) => + e.eventType === 'wait_completed' && + e.correlationId === waitEvent.correlationId + ); + if (alreadyCompleted) { + break; + } + fenceEventId = events[events.length - 1]?.eventId; + await new Promise((r) => + setTimeout(r, 25 * attempts) + ); } - throw err; } } From 1fca755fadce9e855a7858b4719ad5f460b9bbc2 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Tue, 26 May 2026 21:25:46 +0200 Subject: [PATCH 4/6] runtime: treat all non-fence wait conflicts as "already completed" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The earlier revision filtered duplicate-wait conflicts by a workflow-server-specific error message ("Workflow wait ..."), which meant world-local's "Wait \"...\" already completed" (and any other world's duplicate-wait error shape) fell through and bubbled the EntityConflictError out of the elapsed-wait scan. abortHookOrdering e2e suites started failing as a result. Invert the filter: only the fence-conflict message (a workflow-server- only error) drives the retry path. Anything else is the pre-OCC "skip and continue" shape — matches the original behavior across all world implementations. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/core/src/runtime.ts | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 90d87688f6..69f0d8db14 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -833,13 +833,19 @@ export function workflowEntrypoint( if (!EntityConflictError.is(err)) { throw err; } + // Fence conflicts surface a specific error + // message from workflow-server. Anything + // else (workflow-server "Workflow wait …", + // world-local 'Wait "…" already completed', + // and any other world's duplicate-wait + // shape) is the existing + // wait-already-completed conflict — skip + // and continue, matching pre-OCC behavior + // across worlds. const isFenceConflict = /fence conflict/i.test( err.message ); - const isDuplicateWait = /workflow wait/i.test( - err.message - ); - if (isDuplicateWait) { + if (!isFenceConflict) { runtimeLogger.info( 'Wait already completed, skipping', { @@ -849,9 +855,6 @@ export function workflowEntrypoint( ); break; } - if (!isFenceConflict) { - throw err; - } attempts += 1; if (attempts > MAX_FENCE_RETRIES) { runtimeLogger.warn( From 1e69c82b3bbf433a56f731c1673506d55a93306d Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 27 May 2026 09:59:56 +0200 Subject: [PATCH 5/6] world-vercel: forward fence params in the lazy event-create branch The lazy-refs branch of createWorkflowRunEventInner forgot to thread `lastKnownEventId` and `asOfTimestamp` into the request body, so the fence was silently dropped for any event whose type went through the lazy path (i.e., not in `eventsNeedingResolve`). The resolve branch already had the forwarding. Caught by Vercel Agent Review. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/world-vercel/src/events.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index a896e04330..48a417f7e1 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -488,6 +488,12 @@ async function createWorkflowRunEventInner( ...data, remoteRefBehavior, ...(params?.requestId ? { vercelId: params.requestId } : {}), + ...(params?.lastKnownEventId + ? { lastKnownEventId: params.lastKnownEventId } + : {}), + ...(params?.asOfTimestamp !== undefined + ? { asOfTimestamp: params.asOfTimestamp } + : {}), }, config, schema: EventResultLazyWireSchema, From ec7cad1bd2b0910160a7af4ccb815e3929a28c76 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Wed, 27 May 2026 11:53:40 +0200 Subject: [PATCH 6/6] Apply suggestions from code review Co-authored-by: Peter Wielander Signed-off-by: Peter Wielander --- .changeset/event-write-occ-fence.md | 8 +------ packages/core/src/runtime.ts | 29 ++++++++++-------------- packages/core/src/runtime/resume-hook.ts | 11 ++++----- packages/world/src/events.ts | 17 +++++++------- 4 files changed, 27 insertions(+), 38 deletions(-) diff --git a/.changeset/event-write-occ-fence.md b/.changeset/event-write-occ-fence.md index ab4b7decc1..df5ca57ec9 100644 --- a/.changeset/event-write-occ-fence.md +++ b/.changeset/event-write-occ-fence.md @@ -4,10 +4,4 @@ "@workflow/world-vercel": patch --- -Add an optimistic-concurrency fence to event writes that talk to workflow-server. - -- The elapsed-wait scan now passes `lastKnownEventId` snapshotted from the loaded events when committing `wait_completed`, so a stale-snapshot tick can't slip a sleep-branch event past a freshly-committed `hook_received`. -- `resumeHook` sends `asOfTimestamp` with the new `hook_received` event so the server-side fence is anchored at the resume call's wall-clock without paying for a client-side event pre-read. -- The `CreateEventParams` shape on `@workflow/world` grows two optional fields (`lastKnownEventId`, `asOfTimestamp`) that worlds may forward as-is. - -Conflict surfaces as the existing `EntityConflictError`, which the runtime already reloads-and-continues on. +Add optional `lastKnownEventId` and `asOfTimestamp` params to `events.create`, which the World can use to do optimisti concurrency control fencing. Conflict surfaces as existing `EntityConflictError`, which the runtime already reloads-and-continues on. diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 69f0d8db14..e204eb9e56 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -796,15 +796,12 @@ export function workflowEntrypoint( }, })); - // Snapshot the loaded events' tail eventId as the OCC - // fence. If a concurrent writer (e.g. `resumeHook`) - // committed something between our load and this write, - // the server's CAS rejects and we retry *in-place* - // with a freshly-loaded fence rather than throwing - // the whole tick away. Falling back to queue - // redelivery thunder-herds — every redelivery spawns - // another concurrent tick which fences-conflicts - // again, and workflows stall in `running`. + // The last known event ID is used for optimistic concurrency + // control, being provided to the World, which will reject if any + // other events arrive between our event log read and event write. + // On fail, we retry *in-place* with a freshly-loaded fence rather than + // terminating the invocation. Falling back to queue redelivery directly + // could cause retry-storms under high load. let fenceEventId: string | undefined = events.length > 0 ? events[events.length - 1].eventId @@ -834,14 +831,12 @@ export function workflowEntrypoint( throw err; } // Fence conflicts surface a specific error - // message from workflow-server. Anything - // else (workflow-server "Workflow wait …", - // world-local 'Wait "…" already completed', - // and any other world's duplicate-wait - // shape) is the existing - // wait-already-completed conflict — skip - // and continue, matching pre-OCC behavior - // across worlds. + // message from workflow-server. + // Most 409s will simply exit since we assume a separate + // invocation is active. This should hold true for fence conflicts + // too, but to guarantee correctness, will be re-tried here directly. + // TODO: We can remove the retry here after extensive validation. + // The cost is low in the meantime. const isFenceConflict = /fence conflict/i.test( err.message ); diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index c89aec4621..c6722f822a 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -156,12 +156,11 @@ export async function resumeHook( }) ); - // Append `hook_received` unconditionally — ULID ordering already - // places this write after anything committed before us. We do - // NOT send `lastKnownEventId` here: a fence would only ever - // reject the hook in favor of an unrelated concurrent write, - // which would lose the user's hook signal. Stale-snapshot - // protection lives on the *tick* writes that consume hooks. + // Create a hook_received event with the payload + // + // From a concurrency control perspective, this is done unconditionally. + // Any other event creations or invocations use the `lastKnownEventId` + // fence to ensure hook_received being added won't cause ordering issues. await world.events.create( hook.runId, { diff --git a/packages/world/src/events.ts b/packages/world/src/events.ts index 3a33e2bfc0..0d385175c7 100644 --- a/packages/world/src/events.ts +++ b/packages/world/src/events.ts @@ -392,17 +392,18 @@ export interface CreateEventParams { /** Request ID (x-vercel-id when on Vercel) for correlating request logs with workflow events. */ requestId?: string; /** - * OCC fence: when set, the event write is rejected with a conflict - * unless the run's materialized `lastKnownEventId` equals this value. - * Lets the runtime stop a stale-snapshot tick from advancing the log. + * Optimistic concurrency control fence: when set, the event write is + * rejected with a conflict unless the run's materialized `lastKnownEventId` + * equals this value. Lets the runtime stop an invocation operating on a stale + * event log snapshot from advancing the log. */ lastKnownEventId?: string; /** - * OCC fence (alternative form): unix-ms cutoff. Server resolves to the - * highest eventId strictly before this timestamp and uses that as the - * expected fence. Lets `resumeHook` fence `hook_received` after anything - * the caller could have observed without paying for a separate read. - * Ignored when `lastKnownEventId` is also set. + * Optimistic concurrency control fence (alternative form), see above for reasoning. + * This is a unix-ms cutoff. Server resolves to the highest eventId strictly before this + * timestamp and uses that as the expected fence. Lets `resumeHook` fence + * `hook_received` after anything the caller could have observed without paying + * for a separate read. Ignored when `lastKnownEventId` is also set. */ asOfTimestamp?: number; }