diff --git a/.changeset/event-write-occ-fence.md b/.changeset/event-write-occ-fence.md new file mode 100644 index 0000000000..df5ca57ec9 --- /dev/null +++ b/.changeset/event-write-occ-fence.md @@ -0,0 +1,7 @@ +--- +"@workflow/core": patch +"@workflow/world": patch +"@workflow/world-vercel": patch +--- + +Add optional `lastKnownEventId` and `asOfTimestamp` params to `events.create`, which the World can use to do optimisti concurrency control fencing. Conflict surfaces as existing `EntityConflictError`, which the runtime already reloads-and-continues on. diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 1aa36e32ad..e204eb9e56 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -796,23 +796,101 @@ export function workflowEntrypoint( }, })); + // The last known event ID is used for optimistic concurrency + // control, being provided to the World, which will reject if any + // other events arrive between our event log read and event write. + // On fail, we retry *in-place* with a freshly-loaded fence rather than + // terminating the invocation. Falling back to queue redelivery directly + // could cause retry-storms under high load. + let fenceEventId: string | undefined = + events.length > 0 + ? events[events.length - 1].eventId + : undefined; + const MAX_FENCE_RETRIES = 5; for (const waitEvent of waitsToComplete) { - try { - await world.events.create(runId, waitEvent, { - requestId, - }); - } catch (err) { - if (EntityConflictError.is(err)) { - runtimeLogger.info( - 'Wait already completed, skipping', + let attempts = 0; + let written = false; + while (!written) { + try { + const result = await world.events.create( + runId, + waitEvent, { - workflowRunId: runId, - correlationId: waitEvent.correlationId, + requestId, + ...(fenceEventId + ? { lastKnownEventId: fenceEventId } + : {}), } ); - continue; + if (result.event) { + fenceEventId = result.event.eventId; + } + written = true; + } catch (err) { + if (!EntityConflictError.is(err)) { + throw err; + } + // Fence conflicts surface a specific error + // message from workflow-server. + // Most 409s will simply exit since we assume a separate + // invocation is active. This should hold true for fence conflicts + // too, but to guarantee correctness, will be re-tried here directly. + // TODO: We can remove the retry here after extensive validation. + // The cost is low in the meantime. + const isFenceConflict = /fence conflict/i.test( + err.message + ); + if (!isFenceConflict) { + runtimeLogger.info( + 'Wait already completed, skipping', + { + workflowRunId: runId, + correlationId: waitEvent.correlationId, + } + ); + break; + } + attempts += 1; + if (attempts > MAX_FENCE_RETRIES) { + runtimeLogger.warn( + 'Wait completion gave up after fence retries; falling back to queue redelivery', + { + workflowRunId: runId, + correlationId: waitEvent.correlationId, + attempts, + } + ); + throw err; + } + const loaded = eventsCursor + ? await loadWorkflowRunEvents(runId, eventsCursor) + : await loadWorkflowRunEvents(runId); + if (eventsCursor) { + for (const e of loaded.events) { + if ( + !events.some((x) => x.eventId === e.eventId) + ) { + events.push(e); + } + } + eventsCursor = loaded.cursor ?? eventsCursor; + } else { + events = loaded.events; + eventsCursor = loaded.cursor; + } + const alreadyCompleted = events.some( + (e) => + e.eventType === 'wait_completed' && + e.correlationId === waitEvent.correlationId + ); + if (alreadyCompleted) { + break; + } + fenceEventId = events[events.length - 1]?.eventId; + await new Promise((r) => + setTimeout(r, 25 * attempts) + ); } - throw err; } } diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index 0787d4d4e5..c6722f822a 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -156,7 +156,11 @@ export async function resumeHook( }) ); - // Create a hook_received event with the payload + // Create a hook_received event with the payload + // + // From a concurrency control perspective, this is done unconditionally. + // Any other event creations or invocations use the `lastKnownEventId` + // fence to ensure hook_received being added won't cause ordering issues. await world.events.create( hook.runId, { diff --git a/packages/world-vercel/src/events.ts b/packages/world-vercel/src/events.ts index 433cedcc7c..48a417f7e1 100644 --- a/packages/world-vercel/src/events.ts +++ b/packages/world-vercel/src/events.ts @@ -457,6 +457,12 @@ async function createWorkflowRunEventInner( ...data, remoteRefBehavior, ...(params?.requestId ? { vercelId: params.requestId } : {}), + ...(params?.lastKnownEventId + ? { lastKnownEventId: params.lastKnownEventId } + : {}), + ...(params?.asOfTimestamp !== undefined + ? { asOfTimestamp: params.asOfTimestamp } + : {}), }, config, schema: EventResultResolveWireSchema, @@ -482,6 +488,12 @@ async function createWorkflowRunEventInner( ...data, remoteRefBehavior, ...(params?.requestId ? { vercelId: params.requestId } : {}), + ...(params?.lastKnownEventId + ? { lastKnownEventId: params.lastKnownEventId } + : {}), + ...(params?.asOfTimestamp !== undefined + ? { asOfTimestamp: params.asOfTimestamp } + : {}), }, config, schema: EventResultLazyWireSchema, diff --git a/packages/world/src/events.ts b/packages/world/src/events.ts index a541ac0563..0d385175c7 100644 --- a/packages/world/src/events.ts +++ b/packages/world/src/events.ts @@ -391,6 +391,21 @@ export interface CreateEventParams { resolveData?: ResolveData; /** Request ID (x-vercel-id when on Vercel) for correlating request logs with workflow events. */ requestId?: string; + /** + * Optimistic concurrency control fence: when set, the event write is + * rejected with a conflict unless the run's materialized `lastKnownEventId` + * equals this value. Lets the runtime stop an invocation operating on a stale + * event log snapshot from advancing the log. + */ + lastKnownEventId?: string; + /** + * Optimistic concurrency control fence (alternative form), see above for reasoning. + * This is a unix-ms cutoff. Server resolves to the highest eventId strictly before this + * timestamp and uses that as the expected fence. Lets `resumeHook` fence + * `hook_received` after anything the caller could have observed without paying + * for a separate read. Ignored when `lastKnownEventId` is also set. + */ + asOfTimestamp?: number; } /**