From 234412a9d73cb3fc063741c5ad2a2802d5c7b137 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Wed, 27 May 2026 15:18:08 -0700 Subject: [PATCH] [core] Add Promise.race([sleep, hook]) prefix determinism tests (#2125) Translates the diagrammed pattern into 5 prefix-replay tests that run the same workflow against progressively longer prefixes of a 5-event log (hook_created, wait_created, hook_received A, wait_completed, hook_received B). Each test asserts the consumer takes the same deterministic path: suspending at the right intermediate point with the right invocationsQueue state, or completing with race winners [hookA, sleep]. The full-log test verifies that the trailing hook_received B is consumed by the dangling race-2 hook awaiter without producing an unconsumed-event error. Runs in both sync and async deserialization modes via the existing defineTests harness. --- .../test-promise-race-prefix-determinism.md | 2 + .../core/src/hook-sleep-interaction.test.ts | 245 ++++++++++++++++++ 2 files changed, 247 insertions(+) create mode 100644 .changeset/test-promise-race-prefix-determinism.md diff --git a/.changeset/test-promise-race-prefix-determinism.md b/.changeset/test-promise-race-prefix-determinism.md new file mode 100644 index 0000000000..a845151cc8 --- /dev/null +++ b/.changeset/test-promise-race-prefix-determinism.md @@ -0,0 +1,2 @@ +--- +--- diff --git a/packages/core/src/hook-sleep-interaction.test.ts b/packages/core/src/hook-sleep-interaction.test.ts index 31c18ec40c..7c03534875 100644 --- a/packages/core/src/hook-sleep-interaction.test.ts +++ b/packages/core/src/hook-sleep-interaction.test.ts @@ -914,6 +914,251 @@ function defineTests(mode: 'sync' | 'async') { ); }); }); + + // ─── Prefix replay determinism: Promise.race([sleep, hook]) twice ──── + // + // Pattern from the diagram: + // + // const s = sleep('1d'); // wait_created + // const r1 = Promise.race([s, hook]); // hook_received A → hookA wins + // // "winner: hookA" + // // wait_completed (s resolves) + // const r2 = Promise.race([s, hook]); // hook_received B → sleep wins + // // (s already resolved, beats new hook await) + // // "winner: sleep" + // + // Full event log (in order): + // evnt_0: hook_created + // evnt_1: wait_created + // evnt_2: hook_received A + // evnt_3: wait_completed + // evnt_4: hook_received B + // + // We assert that the consumer goes down the same deterministic path no + // matter where replay stops: at every prefix, the workflow either suspends + // cleanly at the right point with the right invocationsQueue state, or + // completes with the right race winners. No prefix should ever produce an + // unconsumed-event error. + describe(`Promise.race([sleep, hook]) prefix determinism ${label}`, () => { + type RaceResult = { kind: 'hook'; value: unknown } | { kind: 'sleep' }; + + // Build the canonical 5-event log used by every prefix test. Helper takes + // pre-dehydrated payloads so each test can construct them once. + function buildFullEventLog(payloadA: unknown, payloadB: unknown): Event[] { + return [ + { + eventId: 'evnt_0', + runId: 'wrun_test', + eventType: 'hook_created', + correlationId: `hook_${CORR_IDS[0]}`, + eventData: { + token: 'test-token', + isWebhook: false, + }, + createdAt: new Date(), + }, + { + eventId: 'evnt_1', + runId: 'wrun_test', + eventType: 'wait_created', + correlationId: `wait_${CORR_IDS[1]}`, + eventData: { resumeAt: new Date('2099-01-01') }, + createdAt: new Date(), + }, + { + eventId: 'evnt_2', + runId: 'wrun_test', + eventType: 'hook_received', + correlationId: `hook_${CORR_IDS[0]}`, + eventData: { + token: 'test-token', + payload: payloadA, + }, + createdAt: new Date(), + }, + { + eventId: 'evnt_3', + runId: 'wrun_test', + eventType: 'wait_completed', + correlationId: `wait_${CORR_IDS[1]}`, + eventData: { resumeAt: new Date('2099-01-01') }, + createdAt: new Date(), + }, + { + eventId: 'evnt_4', + runId: 'wrun_test', + eventType: 'hook_received', + correlationId: `hook_${CORR_IDS[0]}`, + eventData: { + token: 'test-token', + payload: payloadB, + }, + createdAt: new Date(), + }, + ]; + } + + // The workflow body is identical across every prefix test. Returned + // results are wrapped in discriminated unions so the test can tell hook + // and sleep winners apart. + function makeWorkflowFn(ctx: WorkflowOrchestratorContext) { + const createHook = createCreateHook(ctx); + const sleep = createSleep(ctx); + + return async () => { + const hook = createHook({ token: 'test-token' }); + const s = sleep('1d'); + + const r1: RaceResult = await Promise.race([ + s.then(() => ({ kind: 'sleep' as const })), + (hook as Promise).then((value) => ({ + kind: 'hook' as const, + value, + })), + ]); + + const r2: RaceResult = await Promise.race([ + s.then(() => ({ kind: 'sleep' as const })), + (hook as Promise).then((value) => ({ + kind: 'hook' as const, + value, + })), + ]); + + return [r1, r2]; + }; + } + + async function buildPayloads() { + const ops: Promise[] = []; + const [payloadA, payloadB] = await Promise.all([ + dehydrateStepReturnValue('A', 'wrun_test', undefined, ops), + dehydrateStepReturnValue('B', 'wrun_test', undefined, ops), + ]); + return { payloadA, payloadB }; + } + + it('prefix [hook_created]: registers hook, then suspends with wait+hook pending', async () => { + await setupHydrateMock(); + const { payloadA, payloadB } = await buildPayloads(); + const fullLog = buildFullEventLog(payloadA, payloadB); + const ctx = setupWorkflowContext(fullLog.slice(0, 1)); + + const { error } = await runWithDiscontinuation(ctx, makeWorkflowFn(ctx)); + + expect(error).toBeDefined(); + expect(WorkflowSuspension.is(error)).toBe(true); + + // The wait was created in user code (sleep('1d')) but never saw its + // wait_created event — it sits in invocationsQueue without + // hasCreatedEvent set. The hook is registered but isn't an + // invocationsQueue entry (hooks are only queued when an awaiter is + // pending in some implementations — here, no hook payload arrives so + // the queue snapshot just shows the wait). + const pendingWaits = [...ctx.invocationsQueue.values()].filter( + (i) => i.type === 'wait' + ); + expect(pendingWaits).toHaveLength(1); + expect( + pendingWaits[0].type === 'wait' && pendingWaits[0].hasCreatedEvent + ).toBeFalsy(); + }); + + it('prefix [hook_created, wait_created]: registers wait too, then suspends with neither race resolved', async () => { + await setupHydrateMock(); + const { payloadA, payloadB } = await buildPayloads(); + const fullLog = buildFullEventLog(payloadA, payloadB); + const ctx = setupWorkflowContext(fullLog.slice(0, 2)); + + const { error } = await runWithDiscontinuation(ctx, makeWorkflowFn(ctx)); + + expect(error).toBeDefined(); + expect(WorkflowSuspension.is(error)).toBe(true); + + // The wait_created was consumed: the wait item should now be flagged. + const pendingWaits = [...ctx.invocationsQueue.values()].filter( + (i) => i.type === 'wait' + ); + expect(pendingWaits).toHaveLength(1); + expect( + pendingWaits[0].type === 'wait' && pendingWaits[0].hasCreatedEvent + ).toBe(true); + }); + + it('prefix [..., hook_received A]: race1 resolves with hookA, then suspends before race2 can resolve', async () => { + await setupHydrateMock(); + const { payloadA, payloadB } = await buildPayloads(); + const fullLog = buildFullEventLog(payloadA, payloadB); + const ctx = setupWorkflowContext(fullLog.slice(0, 3)); + + const { error } = await runWithDiscontinuation(ctx, makeWorkflowFn(ctx)); + + // Race 1 resolved with hookA, race 2 is now awaiting both s (not + // resolved — no wait_completed) and a fresh `await hook` (no more + // hook_received). With nothing left, the workflow must suspend. + expect(error).toBeDefined(); + expect(WorkflowSuspension.is(error)).toBe(true); + + // Wait should still be in the queue with hasCreatedEvent === true. + const pendingWaits = [...ctx.invocationsQueue.values()].filter( + (i) => i.type === 'wait' + ); + expect(pendingWaits).toHaveLength(1); + expect( + pendingWaits[0].type === 'wait' && pendingWaits[0].hasCreatedEvent + ).toBe(true); + }); + + it('prefix [..., wait_completed]: race1 = hookA, race2 = sleep, workflow returns cleanly', async () => { + await setupHydrateMock(); + const { payloadA, payloadB } = await buildPayloads(); + const fullLog = buildFullEventLog(payloadA, payloadB); + const ctx = setupWorkflowContext(fullLog.slice(0, 4)); + + const { result, error } = await runWithDiscontinuation( + ctx, + makeWorkflowFn(ctx) + ); + + // No error: race 1 resolves with hookA (hook_received A), then s + // resolves (wait_completed), and race 2's fresh `await hook` is beaten + // by the already-resolved s — so sleep wins race 2. + expect(error).toBeUndefined(); + expect(result).toEqual([{ kind: 'hook', value: 'A' }, { kind: 'sleep' }]); + + // After wait_completed, the wait is removed from invocationsQueue. + const pendingWaits = [...ctx.invocationsQueue.values()].filter( + (i) => i.type === 'wait' + ); + expect(pendingWaits).toHaveLength(0); + }); + + it('full event log [..., hook_received B]: race1 = hookA, race2 = sleep; B is consumed by the still-subscribed hook awaiter without orphan error', async () => { + await setupHydrateMock(); + const { payloadA, payloadB } = await buildPayloads(); + const fullLog = buildFullEventLog(payloadA, payloadB); + const ctx = setupWorkflowContext(fullLog); + + const { result, error } = await runWithDiscontinuation( + ctx, + makeWorkflowFn(ctx) + ); + + // The race outcome must match the 4-event prefix exactly — adding + // hook_received B at the end must not change the deterministic path + // the workflow takes. + expect(error).toBeUndefined(); + expect(result).toEqual([{ kind: 'hook', value: 'A' }, { kind: 'sleep' }]); + + // The dangling hook awaiter (the loser of race 2) is still subscribed + // when hook_received B arrives, so the event is consumed and no + // unconsumed-event error fires. + const pendingWaits = [...ctx.invocationsQueue.values()].filter( + (i) => i.type === 'wait' + ); + expect(pendingWaits).toHaveLength(0); + }); + }); } // ─── Run tests in both modes ────────────────────────────