-
Notifications
You must be signed in to change notification settings - Fork 267
[core] Optimistic concurrency control for event writes against stale logs #2113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
06bd1d4
e65e9b0
db3cf62
1fca755
1e69c82
ec7cad1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| --- | ||
| "@workflow/core": patch | ||
| "@workflow/world": patch | ||
| "@workflow/world-vercel": patch | ||
| --- | ||
|
|
||
| Add optional `lastKnownEventId` and `asOfTimestamp` params to `events.create`, which the World can use to do optimisti concurrency control fencing. Conflict surfaces as existing `EntityConflictError`, which the runtime already reloads-and-continues on. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -796,23 +796,101 @@ export function workflowEntrypoint( | |
| }, | ||
| })); | ||
|
|
||
| // The last known event ID is used for optimistic concurrency | ||
| // control, being provided to the World, which will reject if any | ||
| // other events arrive between our event log read and event write. | ||
| // On fail, we retry *in-place* with a freshly-loaded fence rather than | ||
| // terminating the invocation. Falling back to queue redelivery directly | ||
| // could cause retry-storms under high load. | ||
| let fenceEventId: string | undefined = | ||
| events.length > 0 | ||
| ? events[events.length - 1].eventId | ||
| : undefined; | ||
| const MAX_FENCE_RETRIES = 5; | ||
| for (const waitEvent of waitsToComplete) { | ||
| try { | ||
| await world.events.create(runId, waitEvent, { | ||
| requestId, | ||
| }); | ||
| } catch (err) { | ||
| if (EntityConflictError.is(err)) { | ||
| runtimeLogger.info( | ||
| 'Wait already completed, skipping', | ||
| let attempts = 0; | ||
| let written = false; | ||
| while (!written) { | ||
| try { | ||
| const result = await world.events.create( | ||
| runId, | ||
| waitEvent, | ||
| { | ||
| workflowRunId: runId, | ||
| correlationId: waitEvent.correlationId, | ||
| requestId, | ||
| ...(fenceEventId | ||
| ? { lastKnownEventId: fenceEventId } | ||
| : {}), | ||
| } | ||
| ); | ||
| continue; | ||
| if (result.event) { | ||
| fenceEventId = result.event.eventId; | ||
| } | ||
| written = true; | ||
| } catch (err) { | ||
| if (!EntityConflictError.is(err)) { | ||
| throw err; | ||
| } | ||
| // Fence conflicts surface a specific error | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: would be good to add a log message here so we can capture these in datadog |
||
| // message from workflow-server. | ||
| // Most 409s will simply exit since we assume a separate | ||
| // invocation is active. This should hold true for fence conflicts | ||
| // too, but to guarantee correctness, will be re-tried here directly. | ||
| // TODO: We can remove the retry here after extensive validation. | ||
| // The cost is low in the meantime. | ||
| const isFenceConflict = /fence conflict/i.test( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AI Review: brittle coupling to server error wording. |
||
| err.message | ||
| ); | ||
| if (!isFenceConflict) { | ||
| runtimeLogger.info( | ||
| 'Wait already completed, skipping', | ||
| { | ||
| workflowRunId: runId, | ||
| correlationId: waitEvent.correlationId, | ||
| } | ||
| ); | ||
| break; | ||
| } | ||
| attempts += 1; | ||
| if (attempts > MAX_FENCE_RETRIES) { | ||
| runtimeLogger.warn( | ||
| 'Wait completion gave up after fence retries; falling back to queue redelivery', | ||
| { | ||
| workflowRunId: runId, | ||
| correlationId: waitEvent.correlationId, | ||
| attempts, | ||
| } | ||
| ); | ||
| throw err; | ||
| } | ||
| const loaded = eventsCursor | ||
| ? await loadWorkflowRunEvents(runId, eventsCursor) | ||
| : await loadWorkflowRunEvents(runId); | ||
| if (eventsCursor) { | ||
| for (const e of loaded.events) { | ||
| if ( | ||
| !events.some((x) => x.eventId === e.eventId) | ||
| ) { | ||
| events.push(e); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AI Review: |
||
| } | ||
| eventsCursor = loaded.cursor ?? eventsCursor; | ||
| } else { | ||
| events = loaded.events; | ||
| eventsCursor = loaded.cursor; | ||
| } | ||
| const alreadyCompleted = events.some( | ||
| (e) => | ||
| e.eventType === 'wait_completed' && | ||
| e.correlationId === waitEvent.correlationId | ||
| ); | ||
| if (alreadyCompleted) { | ||
| break; | ||
| } | ||
| fenceEventId = events[events.length - 1]?.eventId; | ||
| await new Promise((r) => | ||
| setTimeout(r, 25 * attempts) | ||
| ); | ||
| } | ||
| throw err; | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -391,6 +391,21 @@ export interface CreateEventParams { | |
| resolveData?: ResolveData; | ||
| /** Request ID (x-vercel-id when on Vercel) for correlating request logs with workflow events. */ | ||
| requestId?: string; | ||
| /** | ||
| * Optimistic concurrency control fence: when set, the event write is | ||
| * rejected with a conflict unless the run's materialized `lastKnownEventId` | ||
| * equals this value. Lets the runtime stop an invocation operating on a stale | ||
| * event log snapshot from advancing the log. | ||
| */ | ||
| lastKnownEventId?: string; | ||
| /** | ||
| * Optimistic concurrency control fence (alternative form), see above for reasoning. | ||
| * This is a unix-ms cutoff. Server resolves to the highest eventId strictly before this | ||
| * timestamp and uses that as the expected fence. Lets `resumeHook` fence | ||
| * `hook_received` after anything the caller could have observed without paying | ||
| * for a separate read. Ignored when `lastKnownEventId` is also set. | ||
| */ | ||
| asOfTimestamp?: number; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AI Review: |
||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AI Review:
EventResultResolveWireSchema.eventis.optional()(packages/world-vercel/src/events.ts:63), so on any response withouteventyou don't refreshfenceEventIdand the next iteration's write uses the now-stale tail — that triggers a spurious fence conflict and forces a full reload/backoff cycle for every subsequent wait inwaitsToComplete. In practice the server probably always returns the created event, but the type allows the foot-gun. Either tighten the response schema to requireeventon create, or fall back to a deterministic value when missing.