Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/event-write-occ-fence.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@workflow/core": patch
"@workflow/world": patch
"@workflow/world-vercel": patch
---

Add optional `lastKnownEventId` and `asOfTimestamp` params to `events.create`, which the World can use to do optimisti concurrency control fencing. Conflict surfaces as existing `EntityConflictError`, which the runtime already reloads-and-continues on.
102 changes: 90 additions & 12 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -796,23 +796,101 @@ export function workflowEntrypoint(
},
}));

// The last known event ID is used for optimistic concurrency
// control, being provided to the World, which will reject if any
// other events arrive between our event log read and event write.
// On fail, we retry *in-place* with a freshly-loaded fence rather than
// terminating the invocation. Falling back to queue redelivery directly
// could cause retry-storms under high load.
let fenceEventId: string | undefined =
events.length > 0
? events[events.length - 1].eventId
: undefined;
const MAX_FENCE_RETRIES = 5;
for (const waitEvent of waitsToComplete) {
try {
await world.events.create(runId, waitEvent, {
requestId,
});
} catch (err) {
if (EntityConflictError.is(err)) {
runtimeLogger.info(
'Wait already completed, skipping',
let attempts = 0;
let written = false;
while (!written) {
try {
const result = await world.events.create(
runId,
waitEvent,
{
workflowRunId: runId,
correlationId: waitEvent.correlationId,
requestId,
...(fenceEventId
? { lastKnownEventId: fenceEventId }
: {}),
}
);
continue;
if (result.event) {
fenceEventId = result.event.eventId;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AI Review: EventResultResolveWireSchema.event is .optional() (packages/world-vercel/src/events.ts:63), so on any response without event you don't refresh fenceEventId and the next iteration's write uses the now-stale tail — that triggers a spurious fence conflict and forces a full reload/backoff cycle for every subsequent wait in waitsToComplete. In practice the server probably always returns the created event, but the type allows the foot-gun. Either tighten the response schema to require event on create, or fall back to a deterministic value when missing.

written = true;
} catch (err) {
if (!EntityConflictError.is(err)) {
throw err;
}
// Fence conflicts surface a specific error
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would be good to add a log message here so we can capture these in datadog

// message from workflow-server.
// Most 409s will simply exit since we assume a separate
// invocation is active. This should hold true for fence conflicts
// too, but to guarantee correctness, will be re-tried here directly.
// TODO: We can remove the retry here after extensive validation.
// The cost is low in the meantime.
const isFenceConflict = /fence conflict/i.test(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AI Review: brittle coupling to server error wording. /fence conflict/i.test(err.message) against the free-form 409 message means any rewording on the workflow-server side silently routes fence conflicts into the Wait already completed, skipping branch below — workflows keep moving but stale-snapshot protection is gone, and you'd only notice via the CORRUPTED_EVENT_LOG you're trying to eliminate. Prefer surfacing a typed code from the server (e.g. errorData.code === 'FENCE_CONFLICT' carried on EntityConflictError) and matching on that. Worth doing in the paired server PR so this regex never has to ship.

err.message
);
if (!isFenceConflict) {
runtimeLogger.info(
'Wait already completed, skipping',
{
workflowRunId: runId,
correlationId: waitEvent.correlationId,
}
);
break;
}
attempts += 1;
if (attempts > MAX_FENCE_RETRIES) {
runtimeLogger.warn(
'Wait completion gave up after fence retries; falling back to queue redelivery',
{
workflowRunId: runId,
correlationId: waitEvent.correlationId,
attempts,
}
);
throw err;
}
const loaded = eventsCursor
? await loadWorkflowRunEvents(runId, eventsCursor)
: await loadWorkflowRunEvents(runId);
if (eventsCursor) {
for (const e of loaded.events) {
if (
!events.some((x) => x.eventId === e.eventId)
) {
events.push(e);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AI Review: events.some(x => x.eventId === e.eventId) inside the for makes this O(n²) over the existing log on every fence-retry reload. Event logs aren't huge today, but the retry path is exactly where they'll be longest. A Set of existing ids built once before the loop avoids it for free.

}
eventsCursor = loaded.cursor ?? eventsCursor;
} else {
events = loaded.events;
eventsCursor = loaded.cursor;
}
const alreadyCompleted = events.some(
(e) =>
e.eventType === 'wait_completed' &&
e.correlationId === waitEvent.correlationId
);
if (alreadyCompleted) {
break;
}
fenceEventId = events[events.length - 1]?.eventId;
await new Promise((r) =>
setTimeout(r, 25 * attempts)
);
}
throw err;
}
}

Expand Down
6 changes: 5 additions & 1 deletion packages/core/src/runtime/resume-hook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@ export async function resumeHook<T = any>(
})
);

// Create a hook_received event with the payload
// Create a hook_received event with the payload
//
// From a concurrency control perspective, this is done unconditionally.
// Any other event creations or invocations use the `lastKnownEventId`
// fence to ensure hook_received being added won't cause ordering issues.
await world.events.create(
hook.runId,
{
Expand Down
12 changes: 12 additions & 0 deletions packages/world-vercel/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,12 @@ async function createWorkflowRunEventInner(
...data,
remoteRefBehavior,
...(params?.requestId ? { vercelId: params.requestId } : {}),
...(params?.lastKnownEventId
? { lastKnownEventId: params.lastKnownEventId }
: {}),
...(params?.asOfTimestamp !== undefined
? { asOfTimestamp: params.asOfTimestamp }
: {}),
},
config,
schema: EventResultResolveWireSchema,
Expand All @@ -482,6 +488,12 @@ async function createWorkflowRunEventInner(
...data,
remoteRefBehavior,
...(params?.requestId ? { vercelId: params.requestId } : {}),
...(params?.lastKnownEventId
? { lastKnownEventId: params.lastKnownEventId }
: {}),
...(params?.asOfTimestamp !== undefined
? { asOfTimestamp: params.asOfTimestamp }
: {}),
},
config,
schema: EventResultLazyWireSchema,
Expand Down
15 changes: 15 additions & 0 deletions packages/world/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,21 @@ export interface CreateEventParams {
resolveData?: ResolveData;
/** Request ID (x-vercel-id when on Vercel) for correlating request logs with workflow events. */
requestId?: string;
/**
* Optimistic concurrency control fence: when set, the event write is
* rejected with a conflict unless the run's materialized `lastKnownEventId`
* equals this value. Lets the runtime stop an invocation operating on a stale
* event log snapshot from advancing the log.
*/
lastKnownEventId?: string;
/**
* Optimistic concurrency control fence (alternative form), see above for reasoning.
* This is a unix-ms cutoff. Server resolves to the highest eventId strictly before this
* timestamp and uses that as the expected fence. Lets `resumeHook` fence
* `hook_received` after anything the caller could have observed without paying
* for a separate read. Ignored when `lastKnownEventId` is also set.
*/
asOfTimestamp?: number;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AI Review: asOfTimestamp is added to the public CreateEventParams and threaded through world-vercel, but no caller in this PR uses it — the docstring points at resumeHook, which the PR explicitly keeps unfenced. Public API surface with no exerciser tends to rot (or drift from the server's interpretation) before its first real caller arrives. Consider dropping it from this PR until resumeHook (or another caller) actually needs it, or wiring up that single caller now so the contract is tested end-to-end.

}

/**
Expand Down
Loading