Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .changeset/test-promise-race-prefix-determinism.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
---
---
245 changes: 245 additions & 0 deletions packages/core/src/hook-sleep-interaction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,251 @@ function defineTests(mode: 'sync' | 'async') {
);
});
});

// ─── Prefix replay determinism: Promise.race([sleep, hook]) twice ────
//
// Pattern from the diagram:
//
// const s = sleep('1d'); // wait_created
// const r1 = Promise.race([s, hook]); // hook_received A → hookA wins
// // "winner: hookA"
// // wait_completed (s resolves)
// const r2 = Promise.race([s, hook]); // hook_received B → sleep wins
// // (s already resolved, beats new hook await)
// // "winner: sleep"
//
// Full event log (in order):
// evnt_0: hook_created
// evnt_1: wait_created
// evnt_2: hook_received A
// evnt_3: wait_completed
// evnt_4: hook_received B
//
// We assert that the consumer goes down the same deterministic path no
// matter where replay stops: at every prefix, the workflow either suspends
// cleanly at the right point with the right invocationsQueue state, or
// completes with the right race winners. No prefix should ever produce an
// unconsumed-event error.
describe(`Promise.race([sleep, hook]) prefix determinism ${label}`, () => {
type RaceResult = { kind: 'hook'; value: unknown } | { kind: 'sleep' };

// Build the canonical 5-event log used by every prefix test. Helper takes
// pre-dehydrated payloads so each test can construct them once.
function buildFullEventLog(payloadA: unknown, payloadB: unknown): Event[] {
return [
{
eventId: 'evnt_0',
runId: 'wrun_test',
eventType: 'hook_created',
correlationId: `hook_${CORR_IDS[0]}`,
eventData: {
token: 'test-token',
isWebhook: false,
},
createdAt: new Date(),
},
{
eventId: 'evnt_1',
runId: 'wrun_test',
eventType: 'wait_created',
correlationId: `wait_${CORR_IDS[1]}`,
eventData: { resumeAt: new Date('2099-01-01') },
createdAt: new Date(),
},
{
eventId: 'evnt_2',
runId: 'wrun_test',
eventType: 'hook_received',
correlationId: `hook_${CORR_IDS[0]}`,
eventData: {
token: 'test-token',
payload: payloadA,
},
createdAt: new Date(),
},
{
eventId: 'evnt_3',
runId: 'wrun_test',
eventType: 'wait_completed',
correlationId: `wait_${CORR_IDS[1]}`,
eventData: { resumeAt: new Date('2099-01-01') },
createdAt: new Date(),
},
{
eventId: 'evnt_4',
runId: 'wrun_test',
eventType: 'hook_received',
correlationId: `hook_${CORR_IDS[0]}`,
eventData: {
token: 'test-token',
payload: payloadB,
},
createdAt: new Date(),
},
];
}

// The workflow body is identical across every prefix test. Returned
// results are wrapped in discriminated unions so the test can tell hook
// and sleep winners apart.
function makeWorkflowFn(ctx: WorkflowOrchestratorContext) {
const createHook = createCreateHook(ctx);
const sleep = createSleep(ctx);

return async () => {
const hook = createHook({ token: 'test-token' });
const s = sleep('1d');

const r1: RaceResult = await Promise.race([
s.then(() => ({ kind: 'sleep' as const })),
(hook as Promise<unknown>).then((value) => ({
kind: 'hook' as const,
value,
})),
]);

const r2: RaceResult = await Promise.race([
s.then(() => ({ kind: 'sleep' as const })),
(hook as Promise<unknown>).then((value) => ({
kind: 'hook' as const,
value,
})),
]);

return [r1, r2];
};
}

async function buildPayloads() {
const ops: Promise<any>[] = [];
const [payloadA, payloadB] = await Promise.all([
dehydrateStepReturnValue('A', 'wrun_test', undefined, ops),
dehydrateStepReturnValue('B', 'wrun_test', undefined, ops),
]);
return { payloadA, payloadB };
}

it('prefix [hook_created]: registers hook, then suspends with wait+hook pending', async () => {
await setupHydrateMock();
const { payloadA, payloadB } = await buildPayloads();
const fullLog = buildFullEventLog(payloadA, payloadB);
const ctx = setupWorkflowContext(fullLog.slice(0, 1));

const { error } = await runWithDiscontinuation(ctx, makeWorkflowFn(ctx));

expect(error).toBeDefined();
expect(WorkflowSuspension.is(error)).toBe(true);

// The wait was created in user code (sleep('1d')) but never saw its
// wait_created event — it sits in invocationsQueue without
// hasCreatedEvent set. The hook is registered but isn't an
// invocationsQueue entry (hooks are only queued when an awaiter is
// pending in some implementations — here, no hook payload arrives so
// the queue snapshot just shows the wait).
const pendingWaits = [...ctx.invocationsQueue.values()].filter(
(i) => i.type === 'wait'
);
expect(pendingWaits).toHaveLength(1);
expect(
pendingWaits[0].type === 'wait' && pendingWaits[0].hasCreatedEvent
).toBeFalsy();
});

it('prefix [hook_created, wait_created]: registers wait too, then suspends with neither race resolved', async () => {
await setupHydrateMock();
const { payloadA, payloadB } = await buildPayloads();
const fullLog = buildFullEventLog(payloadA, payloadB);
const ctx = setupWorkflowContext(fullLog.slice(0, 2));

const { error } = await runWithDiscontinuation(ctx, makeWorkflowFn(ctx));

expect(error).toBeDefined();
expect(WorkflowSuspension.is(error)).toBe(true);

// The wait_created was consumed: the wait item should now be flagged.
const pendingWaits = [...ctx.invocationsQueue.values()].filter(
(i) => i.type === 'wait'
);
expect(pendingWaits).toHaveLength(1);
expect(
pendingWaits[0].type === 'wait' && pendingWaits[0].hasCreatedEvent
).toBe(true);
});

it('prefix [..., hook_received A]: race1 resolves with hookA, then suspends before race2 can resolve', async () => {
await setupHydrateMock();
const { payloadA, payloadB } = await buildPayloads();
const fullLog = buildFullEventLog(payloadA, payloadB);
const ctx = setupWorkflowContext(fullLog.slice(0, 3));

const { error } = await runWithDiscontinuation(ctx, makeWorkflowFn(ctx));

// Race 1 resolved with hookA, race 2 is now awaiting both s (not
// resolved — no wait_completed) and a fresh `await hook` (no more
// hook_received). With nothing left, the workflow must suspend.
expect(error).toBeDefined();
expect(WorkflowSuspension.is(error)).toBe(true);

// Wait should still be in the queue with hasCreatedEvent === true.
const pendingWaits = [...ctx.invocationsQueue.values()].filter(
(i) => i.type === 'wait'
);
expect(pendingWaits).toHaveLength(1);
expect(
pendingWaits[0].type === 'wait' && pendingWaits[0].hasCreatedEvent
).toBe(true);
});

it('prefix [..., wait_completed]: race1 = hookA, race2 = sleep, workflow returns cleanly', async () => {
await setupHydrateMock();
const { payloadA, payloadB } = await buildPayloads();
const fullLog = buildFullEventLog(payloadA, payloadB);
const ctx = setupWorkflowContext(fullLog.slice(0, 4));

const { result, error } = await runWithDiscontinuation(
ctx,
makeWorkflowFn(ctx)
);

// No error: race 1 resolves with hookA (hook_received A), then s
// resolves (wait_completed), and race 2's fresh `await hook` is beaten
// by the already-resolved s — so sleep wins race 2.
expect(error).toBeUndefined();
expect(result).toEqual([{ kind: 'hook', value: 'A' }, { kind: 'sleep' }]);

// After wait_completed, the wait is removed from invocationsQueue.
const pendingWaits = [...ctx.invocationsQueue.values()].filter(
(i) => i.type === 'wait'
);
expect(pendingWaits).toHaveLength(0);
});

it('full event log [..., hook_received B]: race1 = hookA, race2 = sleep; B is consumed by the still-subscribed hook awaiter without orphan error', async () => {
await setupHydrateMock();
const { payloadA, payloadB } = await buildPayloads();
const fullLog = buildFullEventLog(payloadA, payloadB);
const ctx = setupWorkflowContext(fullLog);

const { result, error } = await runWithDiscontinuation(
ctx,
makeWorkflowFn(ctx)
);

// The race outcome must match the 4-event prefix exactly — adding
// hook_received B at the end must not change the deterministic path
// the workflow takes.
expect(error).toBeUndefined();
expect(result).toEqual([{ kind: 'hook', value: 'A' }, { kind: 'sleep' }]);

// The dangling hook awaiter (the loser of race 2) is still subscribed
// when hook_received B arrives, so the event is consumed and no
// unconsumed-event error fires.
const pendingWaits = [...ctx.invocationsQueue.values()].filter(
(i) => i.type === 'wait'
);
expect(pendingWaits).toHaveLength(0);
});
});
}

// ─── Run tests in both modes ────────────────────────────
Expand Down
Loading