-
Notifications
You must be signed in to change notification settings - Fork 15
feat: add support for skipped steps in client #601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 01-22-add_edge_worker_conditional_steps_tests
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,259 @@ | ||||||||||||||||||||||||||||
| import { describe, it, expect } from 'vitest'; | ||||||||||||||||||||||||||||
| import { withPgNoTransaction } from '../helpers/db.js'; | ||||||||||||||||||||||||||||
| import { createTestSupabaseClient } from '../helpers/setup.js'; | ||||||||||||||||||||||||||||
| import { createTestFlow } from '../helpers/fixtures.js'; | ||||||||||||||||||||||||||||
| import { grantMinimalPgflowPermissions } from '../helpers/permissions.js'; | ||||||||||||||||||||||||||||
| import { PgflowClient } from '../../src/lib/PgflowClient.js'; | ||||||||||||||||||||||||||||
| import { FlowStepStatus } from '../../src/lib/types.js'; | ||||||||||||||||||||||||||||
| import { PgflowSqlClient } from '@pgflow/core'; | ||||||||||||||||||||||||||||
| import { readAndStart } from '../helpers/polling.js'; | ||||||||||||||||||||||||||||
| import { cleanupFlow } from '../helpers/cleanup.js'; | ||||||||||||||||||||||||||||
| import { createEventTracker } from '../helpers/test-utils.js'; | ||||||||||||||||||||||||||||
| import { skipStep } from '../helpers/skip-step.js'; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||
| * Tests for skipped step event handling in the client. | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * Skipped steps can occur when: | ||||||||||||||||||||||||||||
| * - A step's condition evaluates to false (condition_unmet) | ||||||||||||||||||||||||||||
| * - A dependency was skipped, causing cascading skips (dependency_skipped) | ||||||||||||||||||||||||||||
| * - A handler fails during evaluation (handler_failed) | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * These tests verify the client correctly: | ||||||||||||||||||||||||||||
| * - Receives and processes skipped broadcast events | ||||||||||||||||||||||||||||
| * - Updates step state with skipped_at and skip_reason | ||||||||||||||||||||||||||||
| * - Treats skipped as a terminal state | ||||||||||||||||||||||||||||
| * - Handles waitForStatus(Skipped) correctly | ||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||
| describe('Skipped Step Handling', () => { | ||||||||||||||||||||||||||||
| it( | ||||||||||||||||||||||||||||
| 'client handles skipped step state from database snapshot', | ||||||||||||||||||||||||||||
| withPgNoTransaction(async (sql) => { | ||||||||||||||||||||||||||||
| // This test verifies the client correctly handles skipped step state | ||||||||||||||||||||||||||||
| // when fetched from the database (e.g., on reconnect or late join) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const testFlow = createTestFlow('skip_snap'); | ||||||||||||||||||||||||||||
| await cleanupFlow(sql, testFlow.slug); | ||||||||||||||||||||||||||||
| await grantMinimalPgflowPermissions(sql); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| await sql`SELECT pgflow.create_flow(${testFlow.slug})`; | ||||||||||||||||||||||||||||
| await sql`SELECT pgflow.add_step(${testFlow.slug}, 'will_skip_step')`; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const supabaseClient = createTestSupabaseClient(); | ||||||||||||||||||||||||||||
| const pgflowClient = new PgflowClient(supabaseClient, { | ||||||||||||||||||||||||||||
| realtimeStabilizationDelayMs: 1000, | ||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Start the flow | ||||||||||||||||||||||||||||
| const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); | ||||||||||||||||||||||||||||
| const step = run.step('will_skip_step'); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Verify initial state is Started (root step) | ||||||||||||||||||||||||||||
| expect(step.status).toBe(FlowStepStatus.Started); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Directly call pgflow.skip_step to simulate the step being skipped | ||||||||||||||||||||||||||||
| // This mimics what would happen when a condition evaluates to false | ||||||||||||||||||||||||||||
| await skipStep(sql, run.run_id, 'will_skip_step', 'condition_unmet'); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Wait for the skipped event to be received | ||||||||||||||||||||||||||||
| await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 }); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Verify skipped state | ||||||||||||||||||||||||||||
| expect(step.status).toBe(FlowStepStatus.Skipped); | ||||||||||||||||||||||||||||
| expect(step.skipped_at).toBeInstanceOf(Date); | ||||||||||||||||||||||||||||
| expect(step.skip_reason).toBe('condition_unmet'); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Verify output is null for skipped steps (per design decision Q1) | ||||||||||||||||||||||||||||
| expect(step.output).toBeNull(); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| await supabaseClient.removeAllChannels(); | ||||||||||||||||||||||||||||
| }), | ||||||||||||||||||||||||||||
| { timeout: 15000 } | ||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| it( | ||||||||||||||||||||||||||||
| 'receives skipped broadcast event and updates step state', | ||||||||||||||||||||||||||||
| withPgNoTransaction(async (sql) => { | ||||||||||||||||||||||||||||
| // This test verifies the client receives and processes skipped events | ||||||||||||||||||||||||||||
| // broadcast via Supabase realtime | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const testFlow = createTestFlow('skip_broadcast'); | ||||||||||||||||||||||||||||
| await cleanupFlow(sql, testFlow.slug); | ||||||||||||||||||||||||||||
| await grantMinimalPgflowPermissions(sql); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| await sql`SELECT pgflow.create_flow(${testFlow.slug})`; | ||||||||||||||||||||||||||||
| await sql`SELECT pgflow.add_step(${testFlow.slug}, 'skipped_step')`; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const supabaseClient = createTestSupabaseClient(); | ||||||||||||||||||||||||||||
| const pgflowClient = new PgflowClient(supabaseClient, { | ||||||||||||||||||||||||||||
| realtimeStabilizationDelayMs: 1000, | ||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); | ||||||||||||||||||||||||||||
| const step = run.step('skipped_step'); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Set up event tracking BEFORE the skip happens | ||||||||||||||||||||||||||||
| const tracker = createEventTracker(); | ||||||||||||||||||||||||||||
| step.on('*', tracker.callback); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Skip the step | ||||||||||||||||||||||||||||
| await skipStep(sql, run.run_id, 'skipped_step', 'handler_failed'); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Wait for the skipped status | ||||||||||||||||||||||||||||
| await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 }); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Verify we received the skipped event | ||||||||||||||||||||||||||||
| expect(tracker).toHaveReceivedEvent('step:skipped'); | ||||||||||||||||||||||||||||
| expect(tracker).toHaveReceivedEvent('step:skipped', { | ||||||||||||||||||||||||||||
| run_id: run.run_id, | ||||||||||||||||||||||||||||
| step_slug: 'skipped_step', | ||||||||||||||||||||||||||||
| status: FlowStepStatus.Skipped, | ||||||||||||||||||||||||||||
| skip_reason: 'handler_failed', | ||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Verify step state | ||||||||||||||||||||||||||||
| expect(step.status).toBe(FlowStepStatus.Skipped); | ||||||||||||||||||||||||||||
| expect(step.skip_reason).toBe('handler_failed'); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| await supabaseClient.removeAllChannels(); | ||||||||||||||||||||||||||||
| }), | ||||||||||||||||||||||||||||
| { timeout: 15000 } | ||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| it( | ||||||||||||||||||||||||||||
| 'skipped is a terminal state - no further status changes', | ||||||||||||||||||||||||||||
| withPgNoTransaction(async (sql) => { | ||||||||||||||||||||||||||||
| // Verify that once a step is skipped, it cannot transition to other states | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const testFlow = createTestFlow('skip_terminal'); | ||||||||||||||||||||||||||||
| await cleanupFlow(sql, testFlow.slug); | ||||||||||||||||||||||||||||
| await grantMinimalPgflowPermissions(sql); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| await sql`SELECT pgflow.create_flow(${testFlow.slug})`; | ||||||||||||||||||||||||||||
| await sql`SELECT pgflow.add_step(${testFlow.slug}, 'terminal_step')`; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const supabaseClient = createTestSupabaseClient(); | ||||||||||||||||||||||||||||
| const pgflowClient = new PgflowClient(supabaseClient, { | ||||||||||||||||||||||||||||
| realtimeStabilizationDelayMs: 1000, | ||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); | ||||||||||||||||||||||||||||
| const step = run.step('terminal_step'); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Skip the step | ||||||||||||||||||||||||||||
| await skipStep(sql, run.run_id, 'terminal_step', 'dependency_skipped'); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 }); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Store original skipped_at | ||||||||||||||||||||||||||||
| const originalSkippedAt = step.skipped_at; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Set up tracker for any subsequent events | ||||||||||||||||||||||||||||
| const tracker = createEventTracker(); | ||||||||||||||||||||||||||||
| step.on('*', tracker.callback); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Verify skipped steps don't produce tasks (nothing to read from queue) | ||||||||||||||||||||||||||||
| const sqlClient = new PgflowSqlClient(sql); | ||||||||||||||||||||||||||||
| const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 0.1, 1); | ||||||||||||||||||||||||||||
| expect(tasks).toHaveLength(0); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Give time for any potential events | ||||||||||||||||||||||||||||
| await new Promise((resolve) => setTimeout(resolve, 1000)); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Verify step is still skipped with same timestamp | ||||||||||||||||||||||||||||
| expect(step.status).toBe(FlowStepStatus.Skipped); | ||||||||||||||||||||||||||||
| expect(step.skipped_at).toEqual(originalSkippedAt); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Verify no additional events were processed | ||||||||||||||||||||||||||||
| expect(tracker).toHaveReceivedTotalEvents(0); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| await supabaseClient.removeAllChannels(); | ||||||||||||||||||||||||||||
| }), | ||||||||||||||||||||||||||||
| { timeout: 15000 } | ||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| it( | ||||||||||||||||||||||||||||
| 'waitForStatus(Skipped) resolves when step is skipped', | ||||||||||||||||||||||||||||
| withPgNoTransaction(async (sql) => { | ||||||||||||||||||||||||||||
| // Verify waitForStatus works correctly with Skipped status | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const testFlow = createTestFlow('wait_skip'); | ||||||||||||||||||||||||||||
| await cleanupFlow(sql, testFlow.slug); | ||||||||||||||||||||||||||||
| await grantMinimalPgflowPermissions(sql); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| await sql`SELECT pgflow.create_flow(${testFlow.slug})`; | ||||||||||||||||||||||||||||
| await sql`SELECT pgflow.add_step(${testFlow.slug}, 'wait_step')`; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const supabaseClient = createTestSupabaseClient(); | ||||||||||||||||||||||||||||
| const pgflowClient = new PgflowClient(supabaseClient, { | ||||||||||||||||||||||||||||
| realtimeStabilizationDelayMs: 1000, | ||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' }); | ||||||||||||||||||||||||||||
| const step = run.step('wait_step'); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Start waiting for skipped BEFORE the skip happens | ||||||||||||||||||||||||||||
| const waitPromise = step.waitForStatus(FlowStepStatus.Skipped, { | ||||||||||||||||||||||||||||
| timeoutMs: 10000, | ||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Skip the step after a small delay | ||||||||||||||||||||||||||||
| setTimeout(async () => { | ||||||||||||||||||||||||||||
| await skipStep(sql, run.run_id, 'wait_step', 'condition_unmet'); | ||||||||||||||||||||||||||||
| }, 100); | ||||||||||||||||||||||||||||
|
Comment on lines
+201
to
+203
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Fix by removing setTimeout(() => {
skipStep(sql, run.run_id, 'wait_step', 'condition_unmet').catch(err => {
// Handle or log error
});
}, 100);Or if errors should fail the test, handle them explicitly: const skipPromise = new Promise((resolve, reject) => {
setTimeout(async () => {
try {
await skipStep(sql, run.run_id, 'wait_step', 'condition_unmet');
resolve(null);
} catch (err) {
reject(err);
}
}, 100);
});
Suggested change
Spotted by Graphite Agent |
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Wait should resolve with the step | ||||||||||||||||||||||||||||
| const result = await waitPromise; | ||||||||||||||||||||||||||||
| expect(result).toBe(step); | ||||||||||||||||||||||||||||
| expect(result.status).toBe(FlowStepStatus.Skipped); | ||||||||||||||||||||||||||||
| expect(result.skip_reason).toBe('condition_unmet'); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| await supabaseClient.removeAllChannels(); | ||||||||||||||||||||||||||||
| }), | ||||||||||||||||||||||||||||
| { timeout: 15000 } | ||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| it( | ||||||||||||||||||||||||||||
| 'handles all skip reasons correctly', | ||||||||||||||||||||||||||||
| withPgNoTransaction(async (sql) => { | ||||||||||||||||||||||||||||
| // Verify all three skip reasons are handled correctly | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const skipReasons = [ | ||||||||||||||||||||||||||||
| 'condition_unmet', | ||||||||||||||||||||||||||||
| 'handler_failed', | ||||||||||||||||||||||||||||
| 'dependency_skipped', | ||||||||||||||||||||||||||||
| ] as const; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| for (const skipReason of skipReasons) { | ||||||||||||||||||||||||||||
| const testFlow = createTestFlow(`skip_${skipReason}`); | ||||||||||||||||||||||||||||
| await cleanupFlow(sql, testFlow.slug); | ||||||||||||||||||||||||||||
| await grantMinimalPgflowPermissions(sql); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| await sql`SELECT pgflow.create_flow(${testFlow.slug})`; | ||||||||||||||||||||||||||||
| await sql`SELECT pgflow.add_step(${testFlow.slug}, 'reason_step')`; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const supabaseClient = createTestSupabaseClient(); | ||||||||||||||||||||||||||||
| const pgflowClient = new PgflowClient(supabaseClient, { | ||||||||||||||||||||||||||||
| realtimeStabilizationDelayMs: 1000, | ||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const run = await pgflowClient.startFlow(testFlow.slug, { | ||||||||||||||||||||||||||||
| test: 'data', | ||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||
| const step = run.step('reason_step'); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Skip with specific reason | ||||||||||||||||||||||||||||
| await skipStep(sql, run.run_id, 'reason_step', skipReason); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 }); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Verify the skip reason was captured correctly | ||||||||||||||||||||||||||||
| expect(step.status).toBe(FlowStepStatus.Skipped); | ||||||||||||||||||||||||||||
| expect(step.skip_reason).toBe(skipReason); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| await supabaseClient.removeAllChannels(); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| }), | ||||||||||||||||||||||||||||
| { timeout: 45000 } | ||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| import type postgres from 'postgres'; | ||
|
|
||
| /** | ||
| * Skip a step using the internal _cascade_force_skip_steps function. | ||
| * This is a test helper that wraps the internal function. | ||
| * If pgflow.skip_step() is exposed publicly later, swap implementation here. | ||
| */ | ||
| export async function skipStep( | ||
| sql: postgres.Sql, | ||
| runId: string, | ||
| stepSlug: string, | ||
| skipReason: 'condition_unmet' | 'handler_failed' | 'dependency_skipped' | ||
| ): Promise<void> { | ||
| await sql`SELECT pgflow._cascade_force_skip_steps( | ||
| ${runId}::uuid, | ||
| ${stepSlug}::text, | ||
| ${skipReason}::text | ||
| )`; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change the decimal value 0.1 to an integer value (e.g., 1) to avoid type conversion errors when calling PostgreSQL's pgmq.read_with_poll function which expects an integer parameter.
Spotted by Graphite Agent (based on CI logs)

Is this helpful? React 👍 or 👎 to let us know.