diff --git a/.changeset/add-when-failed-option.md b/.changeset/add-when-failed-option.md deleted file mode 100644 index 254800c84..000000000 --- a/.changeset/add-when-failed-option.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -'@pgflow/core': patch -'@pgflow/dsl': patch ---- - -Add whenFailed option for error handling after retries exhausted (fail, skip, skip-cascade) diff --git a/.changeset/conditional-step-execution.md b/.changeset/conditional-step-execution.md new file mode 100644 index 000000000..c94909304 --- /dev/null +++ b/.changeset/conditional-step-execution.md @@ -0,0 +1,25 @@ +--- +'@pgflow/core': minor +'@pgflow/dsl': minor +--- + +Add conditional step execution with skip infrastructure + +**New DSL Options:** + +- `if` - Run step only when input contains specified pattern +- `ifNot` - Run step only when input does NOT contain pattern +- `whenUnmet` - Control behavior when condition not met (fail/skip/skip-cascade) +- `retriesExhausted` - Control behavior after all retries fail (fail/skip/skip-cascade) + +**New Types:** + +- `ContainmentPattern` - Type-safe JSON containment patterns for conditions +- `StepMeta` - Track skippable dependencies for proper type inference + +**Schema Changes:** + +- New columns: required_input_pattern, forbidden_input_pattern, when_unmet, when_failed, skip_reason, skipped_at +- New step status: 'skipped' +- New function: cascade_skip_steps() for skip propagation +- FlowShape condition fields for auto-compilation drift detection diff --git a/.changeset/skip-infrastructure-schema.md b/.changeset/skip-infrastructure-schema.md deleted file mode 100644 index 5fd952320..000000000 --- a/.changeset/skip-infrastructure-schema.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -'@pgflow/core': patch ---- - -Add skip infrastructure schema for conditional execution - new columns (condition_pattern, when_unmet, when_failed, skip_reason, skipped_at), 'skipped' status, and cascade_skip_steps function diff --git a/pkgs/website/astro.config.mjs b/pkgs/website/astro.config.mjs index 3da4cebf5..d6f7dd8c6 100644 --- a/pkgs/website/astro.config.mjs +++ b/pkgs/website/astro.config.mjs @@ -253,6 +253,10 @@ export default defineConfig({ label: 'Retrying steps', link: '/build/retrying-steps/', }, + { + label: 'Graceful Failure', + link: '/build/graceful-failure/', + }, { label: 'Validation steps', link: '/build/validation-steps/', @@ -271,6 +275,10 @@ export default defineConfig({ }, ], }, + { + label: 'Conditional Steps', + autogenerate: { directory: 'build/conditional-steps/' }, + }, { label: 'Starting Flows', autogenerate: { directory: 'build/starting-flows/' }, diff --git a/pkgs/website/src/assets/cover-images/pgflow-0-14-0-conditional-step-execution.png b/pkgs/website/src/assets/cover-images/pgflow-0-14-0-conditional-step-execution.png new file mode 100644 index 000000000..8867dc9c6 Binary files /dev/null and b/pkgs/website/src/assets/cover-images/pgflow-0-14-0-conditional-step-execution.png differ diff --git a/pkgs/website/src/assets/pgflow-theme.d2 b/pkgs/website/src/assets/pgflow-theme.d2 index c4794e52b..65fc375b8 100644 --- a/pkgs/website/src/assets/pgflow-theme.d2 +++ b/pkgs/website/src/assets/pgflow-theme.d2 @@ -69,7 +69,7 @@ classes: { style.stroke: "#e85c5c" } - # Step state classes (created, started, completed, failed) + # Step state classes (created, started, completed, failed, skipped) step_created: { style.fill: "#95a0a3" style.stroke: "#4a5759" @@ -86,6 +86,11 @@ classes: { style.fill: "#a33636" style.stroke: "#e85c5c" } + step_skipped: { + style.fill: "#4a5759" + style.stroke: "#6b7a7d" + style.stroke-dash: 3 + } # Task state classes (queued, completed, failed) task_queued: { diff --git a/pkgs/website/src/content/docs/build/conditional-steps/examples.mdx b/pkgs/website/src/content/docs/build/conditional-steps/examples.mdx new file mode 100644 index 000000000..15a92d3d2 --- /dev/null +++ b/pkgs/website/src/content/docs/build/conditional-steps/examples.mdx @@ -0,0 +1,300 @@ +--- +title: Examples +description: AI/LLM workflow patterns using conditional execution. +sidebar: + order: 4 +--- + +import { Aside } from '@astrojs/starlight/components'; + +This page shows AI/LLM workflow patterns that benefit from conditional execution. Each example includes a diagram and condensed flow code. + + + +## Query Routing + +Route to different handlers based on input. Simple questions go to a fast model, complex reasoning to a powerful model, and code questions to a code-specialized model. + +```d2 width="700" pad="20" +...@../../../../assets/pgflow-theme.d2 + +direction: right + +input: "Query" { class: neutral } +classify: "Classify" { class: step_completed } +simple: "Simple" { class: step_skipped } +complex: "Complex" { class: step_skipped } +code: "Code" { class: step_started } +respond: "Respond" { class: step_created } + +input -> classify +classify -> simple { style.stroke-dash: 3 } +classify -> complex { style.stroke-dash: 3 } +classify -> code: "intent=code" +simple -> respond { style.stroke-dash: 3 } +complex -> respond { style.stroke-dash: 3 } +code -> respond +``` + +```typescript {7,15,23} +new Flow<{ query: string }>({ slug: 'query_router' }) + .step({ slug: 'classify' }, (flowInput) => classifyIntent(flowInput.query)) + .step( + { + slug: 'simple', + dependsOn: ['classify'], + if: { classify: { intent: 'simple' } }, + }, + async (_, ctx) => callFastModel((await ctx.flowInput).query) + ) + .step( + { + slug: 'complex', + dependsOn: ['classify'], + if: { classify: { intent: 'complex' } }, + }, + async (_, ctx) => callReasoningModel((await ctx.flowInput).query) + ) + .step( + { + slug: 'code', + dependsOn: ['classify'], + if: { classify: { intent: 'code' } }, + }, + async (_, ctx) => callCodeModel((await ctx.flowInput).query) + ) + .step( + { + slug: 'respond', + dependsOn: ['simple', 'complex', 'code'], + }, + (deps) => format(deps.simple ?? deps.complex ?? deps.code) + ); +``` + +**Key points:** + +- Intent classification determines which model handles the query +- Only ONE model runs per query - others are skipped +- `respond` uses `??` to coalesce the single defined output + +--- + +## Conditional Fallback + +Enrich only when the primary source is insufficient. If retrieval returns low-confidence results, fall back to web search for current information. + +```d2 width="600" pad="20" +...@../../../../assets/pgflow-theme.d2 + +direction: right + +query: "Query" { class: neutral } +retrieve: "Retrieve" { class: step_completed } +web: "Web Search" { class: step_started } +generate: "Generate" { class: step_created } + +query -> retrieve +retrieve -> web: "low confidence" +retrieve -> generate +web -> generate +``` + +```typescript {7} +new Flow<{ query: string }>({ slug: 'rag_fallback' }) + .step({ slug: 'retrieve' }, (flowInput) => vectorSearch(flowInput.query)) // embedding happens inside + .step( + { + slug: 'web', + dependsOn: ['retrieve'], + if: { retrieve: { confidence: 'low' } }, + retriesExhausted: 'skip', // Continue if web search fails + }, + async (_, ctx) => searchWeb((await ctx.flowInput).query) + ) + .step( + { + slug: 'generate', + dependsOn: ['retrieve', 'web'], + }, + async (deps, ctx) => { + const docs = [...deps.retrieve.docs, ...(deps.web ?? [])]; + return generateAnswer((await ctx.flowInput).query, docs); + } + ); +``` + + + +**Key points:** + +- Retrieval always runs first to check knowledge base +- Web search is conditional on low confidence scores +- `retriesExhausted: 'skip'` ensures graceful degradation if web search fails + +--- + +## Graceful Failure Handling + +Continue execution when steps fail. Search multiple sources in parallel - if any source fails, continue with the others. + +```d2 width="700" pad="20" +...@../../../../assets/pgflow-theme.d2 + +direction: right + +query: "Query" { class: neutral } +embed: "Embed" { class: step_completed } +vector: "Vector" { class: step_completed } +keyword: "Keyword" { class: step_completed } +graph: "Graph" { class: step_skipped } +rerank: "Rerank" { class: step_started } + +query -> embed +embed -> vector +embed -> keyword +embed -> graph { style.stroke-dash: 3 } +vector -> rerank +keyword -> rerank +graph -> rerank { style.stroke-dash: 3 } +``` + +```typescript {7,15,23} +new Flow<{ query: string }>({ slug: 'multi_retrieval' }) + .step({ slug: 'embed' }, (flowInput) => createEmbedding(flowInput.query)) + .step( + { + slug: 'vector', + dependsOn: ['embed'], + retriesExhausted: 'skip', + }, + (deps) => searchPinecone(deps.embed.vector) + ) + .step( + { + slug: 'keyword', + dependsOn: ['embed'], + retriesExhausted: 'skip', + }, + async (_, ctx) => searchElastic((await ctx.flowInput).query) + ) + .step( + { + slug: 'graph', + dependsOn: ['embed'], + retriesExhausted: 'skip', + }, + async (_, ctx) => searchNeo4j((await ctx.flowInput).query) + ) + .step( + { + slug: 'rerank', + dependsOn: ['vector', 'keyword', 'graph'], + }, + async (deps, ctx) => { + const all = [ + ...(deps.vector ?? []), + ...(deps.keyword ?? []), + ...(deps.graph ?? []), + ]; + return rerankResults((await ctx.flowInput).query, all); + } + ); +``` + +**Key points:** + +- Three retrieval sources run **in parallel** after embedding +- Each source has `retriesExhausted: 'skip'` for resilience +- `rerank` combines available results - handles undefined sources gracefully + +--- + +## Layered Conditions + +Combine `skip` and `skip-cascade` for nested conditionals. If tool use is needed, validate with guardrails before execution. Skip the entire tool branch if no tool is needed. + +```d2 width="650" pad="20" +...@../../../../assets/pgflow-theme.d2 + +direction: right + +input: "Message" { class: neutral } +plan: "Plan" { class: step_completed } +validate: "Guardrails" { class: step_completed } +execute: "Execute" { class: step_started } +respond: "Respond" { class: step_created } + +input -> plan +plan -> validate: "needsTool" +plan -> respond +validate -> execute: "approved" +validate -> respond { style.stroke-dash: 3 } +execute -> respond +``` + +```typescript {7-8,16-17} +new Flow<{ message: string }>({ slug: 'agent_guardrails' }) + .step({ slug: 'plan' }, (flowInput) => planAction(flowInput.message)) + .step( + { + slug: 'validate', + dependsOn: ['plan'], + if: { plan: { needsTool: true } }, + whenUnmet: 'skip-cascade', // No tool needed = skip validation AND execution + }, + (deps) => validateWithGuardrails(deps.plan.toolName, deps.plan.toolArgs) + ) + .step( + { + slug: 'execute', + dependsOn: ['plan', 'validate'], + if: { validate: { approved: true } }, + whenUnmet: 'skip', // Rejected = skip execution, still respond + }, + (deps) => executeTool(deps.plan.toolName!, deps.plan.toolArgs!) + ) + .step( + { + slug: 'respond', + dependsOn: ['plan', 'execute'], + }, + async (deps, ctx) => + generateResponse((await ctx.flowInput).message, deps.execute) + ); +``` + + + +**Key points:** + +- `skip-cascade` on validation skips the entire tool branch when no tool is needed +- `skip` on execution allows responding even when guardrails reject +- Layered conditions: tool needed → guardrails approved → execute + +--- + +## Pattern Comparison + +| Pattern | Use Case | Skip Mode | Output Type | +| -------------------- | --------------------------- | -------------- | ------------------------ | +| Query Routing | Mutually exclusive branches | `skip` | `T` or `undefined` | +| Conditional Fallback | Enrich only when needed | `skip` | `T` or `undefined` | +| Graceful Failure | Continue when steps fail | `skip` | `T` or `undefined` | +| Layered Conditions | Nested skip + skip-cascade | `skip-cascade` | `T` (guaranteed if runs) | + + diff --git a/pkgs/website/src/content/docs/build/conditional-steps/index.mdx b/pkgs/website/src/content/docs/build/conditional-steps/index.mdx new file mode 100644 index 000000000..c9883acbe --- /dev/null +++ b/pkgs/website/src/content/docs/build/conditional-steps/index.mdx @@ -0,0 +1,134 @@ +--- +title: Conditional Steps +description: Control which steps execute based on input patterns and handle failures gracefully. +sidebar: + label: Overview + order: 0 +--- + +import { Aside, CardGrid, LinkCard } from '@astrojs/starlight/components'; + + + +pgflow lets you skip steps based on input patterns or when handlers fail. + +## Pattern Matching and Failure Handling + +pgflow provides two ways to skip steps: + +| Feature | When Evaluated | Purpose | +| ------------------------- | ---------------- | -------------------------------- | +| `if`/`ifNot` conditions | Before step runs | Route based on input data | +| `retriesExhausted` option | After step fails | Recover gracefully from failures | + +Both use the same three modes: `fail`, `skip`, and `skip-cascade`. + + + +## Behavior Modes + +When a condition is unmet or a step fails, you control what happens: + +| Mode | Behavior | +| -------------- | ----------------------------------------------------------------------------------------------- | +| `fail` | Step fails, entire run fails (default for `retriesExhausted`) | +| `skip` | Step marked as skipped, run continues, dependents receive `undefined` (default for `whenUnmet`) | +| `skip-cascade` | Step AND all downstream dependents skipped, run continues | + +## Quick Examples + +### Conditional Execution + +Run premium-only features based on input: + +```typescript +new Flow<{ userId: string; plan: 'free' | 'premium' }>({ + slug: 'userOnboarding', +}) + .step({ slug: 'createAccount' }, async (input) => { + return { accountId: await createUser(input.run.userId) }; + }) + .step( + { + slug: 'setupPremiumFeatures', + dependsOn: ['createAccount'], + if: { plan: 'premium' }, // Only run for premium users + whenUnmet: 'skip', // Skip (don't fail) for free users + }, + async (input) => { + return await enablePremium(input.createAccount.accountId); + } + ); +``` + +### Graceful Failure Handling + +Continue the workflow even if an optional step fails: + +```typescript +.step({ + slug: 'sendWelcomeEmail', + dependsOn: ['createAccount'], + maxAttempts: 3, + retriesExhausted: 'skip', // If email fails, continue anyway +}, async (input) => { + return await sendEmail(input.createAccount.accountId); +}) +``` + + + +## Type Safety + +pgflow's type system tracks which steps may be skipped: + +- **`skip` mode**: Dependent steps receive `T | undefined` - you must handle the missing case +- **`skip-cascade` mode**: Dependents are also skipped, so if they run, output is guaranteed + +```typescript +.step({ + slug: 'processResults', + dependsOn: ['optionalEnrichment'], +}, async (input) => { + // TypeScript knows this may be undefined + if (input.optionalEnrichment) { + return processWithEnrichment(input.optionalEnrichment); + } + return processBasic(input.run); +}) +``` + +## Learn More + + + + + + + diff --git a/pkgs/website/src/content/docs/build/conditional-steps/pattern-matching.mdx b/pkgs/website/src/content/docs/build/conditional-steps/pattern-matching.mdx new file mode 100644 index 000000000..e6b2d418f --- /dev/null +++ b/pkgs/website/src/content/docs/build/conditional-steps/pattern-matching.mdx @@ -0,0 +1,235 @@ +--- +title: Pattern Matching +description: Use if/ifNot conditions to control step execution based on input patterns. +sidebar: + order: 1 +--- + +import { Aside, Code } from '@astrojs/starlight/components'; + +Pattern matching lets you conditionally execute steps based on input data. pgflow uses PostgreSQL's `@>` JSON containment operator for pattern matching. + +## Basic Syntax + +Use `if` to run a step only when input contains a pattern: + +```typescript +.step({ + slug: 'premiumFeature', + if: { plan: 'premium' }, // Run if input contains plan: 'premium' + whenUnmet: 'skip', // Skip if condition not met +}, handler) +``` + +Use `ifNot` to run a step only when input does NOT contain a pattern: + +```typescript +.step({ + slug: 'trialReminder', + ifNot: { plan: 'premium' }, // Run if input does NOT contain plan: 'premium' + whenUnmet: 'skip', +}, handler) +``` + + + +## What Gets Checked + +The pattern is checked against different data depending on the step type: + +| Step Type | Pattern Checked Against | +| -------------------------- | ------------------------------------------------------------ | +| Root step (no `dependsOn`) | Flow input | +| Dependent step | Aggregated dependency outputs: `{ depSlug: depOutput, ... }` | + +### Root Step Example + +For root steps, the pattern matches against the flow input: + +```typescript +type Input = { userId: string; plan: 'free' | 'premium' }; + +new Flow({ slug: 'onboarding' }).step( + { + slug: 'setupPremium', + if: { plan: 'premium' }, // Checks flow input.plan + whenUnmet: 'skip', + }, + async (input) => { + // input.run.plan is guaranteed to be 'premium' here + return await enablePremiumFeatures(input.run.userId); + } +); +``` + +### Dependent Step Example + +For dependent steps, the pattern matches against an object containing all dependency outputs: + +```typescript +new Flow<{ url: string }>({ slug: 'contentPipeline' }) + .step({ slug: 'analyze' }, async (input) => { + const result = await analyzeContent(input.run.url); + return { needsModeration: result.flagged, content: result.text }; + }) + .step( + { + slug: 'moderate', + dependsOn: ['analyze'], + if: { analyze: { needsModeration: true } }, // Check analyze output + whenUnmet: 'skip', + }, + async (input) => { + return await moderateContent(input.analyze.content); + } + ); +``` + +The pattern `{ analyze: { needsModeration: true } }` matches the object `{ analyze: }`. + +## JSON Containment Semantics + +pgflow uses PostgreSQL's `@>` containment operator. Understanding its behavior helps write correct patterns. + +### Key Rules + +1. **Partial matching**: Your pattern only needs to include the fields you care about +2. **Nested matching**: Patterns can match nested objects recursively +3. **Array containment**: Array patterns check if elements exist (order doesn't matter) + +### Examples + + + +### Practical Pattern Examples + +```typescript +// Match a specific value +if: { status: 'active' } + +// Match nested object +if: { user: { verified: true } } + +// Match array containing element +if: { tags: ['priority'] } + +// Match multiple conditions (AND) +if: { status: 'active', type: 'premium' } +``` + +## Combining if and ifNot + +You can use both `if` and `ifNot` on the same step. Both conditions must be satisfied: + +```typescript +.step({ + slug: 'standardUserFeature', + if: { status: 'active' }, // Must be active + ifNot: { role: 'admin' }, // Must NOT be admin + whenUnmet: 'skip', +}, handler) +``` + +This step runs only for active non-admin users. + +## Branching Patterns + +Create mutually exclusive branches using opposite conditions: + +```typescript +new Flow<{ userType: 'individual' | 'business' }>({ slug: 'pricing' }) + .step( + { + slug: 'individualPricing', + if: { userType: 'individual' }, + whenUnmet: 'skip', + }, + calculateIndividualPrice + ) + .step( + { + slug: 'businessPricing', + if: { userType: 'business' }, + whenUnmet: 'skip', + }, + calculateBusinessPrice + ) + .step( + { + slug: 'finalize', + dependsOn: ['individualPricing', 'businessPricing'], + }, + async (input) => { + // Exactly one will have a value + const price = input.individualPricing ?? input.businessPricing; + return { finalPrice: price }; + } + ); +``` + + + +### Visual: Branching Execution + +With `userType: 'business'` as input: + +```d2 width="700" pad="20" +...@../../../../assets/pgflow-theme.d2 + +direction: right + +input: "Input" { class: neutral } +individual: "individual" { class: step_skipped } +business: "business" { class: step_started } +finalize: "finalize" { class: step_created } + +input -> individual: "NOT met" { style.stroke-dash: 3 } +input -> business: "MET" +individual -> finalize { style.stroke-dash: 3 } +business -> finalize +``` + +## Detecting Skipped Dependencies + +Use `ifNot` with an empty object to detect when a dependency was skipped: + +```typescript +.step({ + slug: 'primaryAction', + if: { someCondition: true }, + whenUnmet: 'skip', +}, primaryHandler) +.step({ + slug: 'fallbackAction', + dependsOn: ['primaryAction'], + ifNot: { primaryAction: {} }, // Run if primaryAction was skipped + whenUnmet: 'skip', +}, fallbackHandler) +``` + +The pattern `{ primaryAction: {} }` matches any non-null output. Using `ifNot` inverts it to match when the dependency is absent (skipped). + + diff --git a/pkgs/website/src/content/docs/build/conditional-steps/skip-modes.mdx b/pkgs/website/src/content/docs/build/conditional-steps/skip-modes.mdx new file mode 100644 index 000000000..9a4573d2e --- /dev/null +++ b/pkgs/website/src/content/docs/build/conditional-steps/skip-modes.mdx @@ -0,0 +1,299 @@ +--- +title: Skip Modes +description: Understand fail, skip, and skip-cascade behaviors when conditions aren't met. +sidebar: + order: 2 +--- + +import { Aside, Code } from '@astrojs/starlight/components'; + +When a step's condition isn't met (or a step fails after retries), the `whenUnmet` option controls what happens next. + + + +## The Three Modes + +| Mode | Step | Dependents | Run | +| -------------- | ------- | ------------------------ | --------- | +| `fail` | Fails | Not executed | Fails | +| `skip` | Skipped | Execute with `undefined` | Continues | +| `skip-cascade` | Skipped | All skipped | Continues | + +## fail Mode + +When a condition isn't met, the step fails and the entire run fails. + +```typescript +.step({ + slug: 'requirePremium', + if: { plan: 'premium' }, + whenUnmet: 'fail', // Explicit - must fail if not premium +}, handler) +``` + +**Use when:** The condition is a hard requirement. If not met, the workflow cannot proceed meaningfully. + +```d2 width="600" pad="20" +...@../../../../assets/pgflow-theme.d2 + +direction: right + +validate: "validate" +validate.class: step_completed + +premium: "requirePremium" +premium.class: step_failed + +notify: "notify" +notify.class: step_created + +validate -> premium +premium -> notify + +run: "Run: failed" { + style.stroke: "#a33636" + style.stroke-dash: 3 +} +``` + +## skip Mode + +The default behavior for `whenUnmet`. The step is marked as skipped, but the run continues. Downstream steps receive `undefined` for this dependency. + +```typescript +.step({ + slug: 'enrichData', + if: { includeEnrichment: true }, + // whenUnmet: 'skip' is the default +}, async (input) => { + return await fetchEnrichment(input.run.id); +}) +.step({ + slug: 'processResults', + dependsOn: ['enrichData'], +}, async (input) => { + // TypeScript knows enrichData may be undefined + if (input.enrichData) { + return processWithEnrichment(input.enrichData); + } + return processBasic(input.run); +}) +``` + +**Use when:** The step is optional and downstream steps can handle its absence. + +```d2 width="600" pad="20" +...@../../../../assets/pgflow-theme.d2 + +direction: right + +validate: "validate" +validate.class: step_completed + +enrich: "enrichData" +enrich.class: step_skipped + +process: "processResults" +process.class: step_completed + +validate -> enrich: "if: { includeEnrichment: true }" +validate -> process +enrich -> process: "undefined" { + style.stroke-dash: 3 +} + +run: "Run: completed" { + style.stroke: "#247056" + style.stroke-dash: 3 +} +``` + +## skip-cascade Mode + +The step is skipped AND all downstream dependents are automatically skipped too. The run continues with whatever steps don't depend on the skipped step. + +```typescript +.step({ + slug: 'loadPremiumData', + if: { plan: 'premium' }, + whenUnmet: 'skip-cascade', // Skip this AND all dependents +}, loadPremiumHandler) +.step({ + slug: 'processPremiumData', + dependsOn: ['loadPremiumData'], +}, processHandler) // Also skipped if loadPremiumData skips +.step({ + slug: 'sendBasicReport', + // No dependency on premium steps - always runs +}, basicReportHandler) +``` + +**Use when:** A group of steps only makes sense together. If the first can't run, the rest shouldn't either. + +```d2 width="700" pad="20" +...@../../../../assets/pgflow-theme.d2 + +direction: right + +validate: "validate" { + class: step_completed +} + +load: "loadPremiumData\n(condition_unmet)" { + class: step_skipped +} + +process: "processPremiumData\n(dependency_skipped)" { + class: step_skipped +} + +basic: "sendBasicReport" { + class: step_completed +} + +validate -> load { + style.stroke-dash: 3 +} +validate -> basic +load -> process { + style.stroke-dash: 3 +} + +run: "Run: completed" { + style.stroke: "#247056" + style.stroke-dash: 3 +} +``` + +## Type Safety + +The mode you choose affects TypeScript types in dependent steps: + +| Mode | Dependent Input Type | Reason | +| -------------- | -------------------- | ---------------------------------------------- | +| `fail` | `T` (required) | Run fails if step fails, so always has value | +| `skip` | `T \| undefined` | Step may be skipped, must handle missing case | +| `skip-cascade` | `T` (required) | If dependent runs, parent definitely completed | + +```typescript +// With skip mode - must handle undefined +.step({ + slug: 'optional', + if: { feature: 'enabled' }, + whenUnmet: 'skip', +}, () => ({ data: 'enriched' })) + +.step({ + slug: 'consumer', + dependsOn: ['optional'], +}, async (input) => { + // Type is: { data: string } | undefined + if (input.optional) { + return input.optional.data; + } + return 'default'; +}) + +// With skip-cascade - output guaranteed +.step({ + slug: 'required', + if: { enabled: true }, + whenUnmet: 'skip-cascade', +}, () => ({ items: [1, 2, 3] })) + +.step({ + slug: 'processor', + dependsOn: ['required'], +}, async (input) => { + // Type is: { items: number[] } - NOT optional + // If required was skipped, processor is also skipped + return input.required.items.map(x => x * 2); +}) +``` + + + +## Choosing the Right Mode + +| Scenario | Recommended Mode | +| ------------------------------------------- | --------------------- | +| Hard requirement - can't proceed without it | `fail` | +| Optional enrichment - nice to have | `skip` | +| Feature flag - entire feature branch | `skip-cascade` | +| Premium features - all or nothing | `skip-cascade` | +| Fallback handling - try A, fallback to B | `skip` with detection | + +## Skip Reasons + +When a step is skipped, pgflow records why in the `skip_reason` field: + +| Skip Reason | Meaning | +| -------------------- | ------------------------------------------------- | +| `condition_unmet` | Step's `if` or `ifNot` condition wasn't satisfied | +| `dependency_skipped` | A dependency was skipped with `skip-cascade` | +| `handler_failed` | Handler failed with `retriesExhausted: 'skip'` | + +Query skipped steps: + +```sql +SELECT step_slug, status, skip_reason +FROM pgflow.step_states +WHERE run_id = 'your-run-id' + AND status = 'skipped'; +``` + +## Multi-Level Cascades + +Skip cascades propagate through the entire dependency chain: + +```typescript +new Flow({ slug: 'pipeline' }) + .step( + { + slug: 'step_a', + if: { enabled: true }, + whenUnmet: 'skip-cascade', + }, + handlerA + ) + .step({ slug: 'step_b', dependsOn: ['step_a'] }, handlerB) + .step({ slug: 'step_c', dependsOn: ['step_b'] }, handlerC); +// If step_a skips, step_b AND step_c are also skipped +``` + +```d2 width="600" pad="20" +...@../../../../assets/pgflow-theme.d2 + +direction: right + +a: "step_a" +a.class: step_skipped + +b: "step_b" +b.class: step_skipped + +c: "step_c" +c.class: step_skipped + +d: "step_d" +d.class: step_completed + +a -> b: "cascade" {style.stroke-dash: 3} +b -> c: "cascade" {style.stroke-dash: 3} + +note: "step_d has no dependency\non A/B/C, so it runs" { + style.fill: transparent + style.stroke: transparent +} +``` + + diff --git a/pkgs/website/src/content/docs/build/graceful-failure.mdx b/pkgs/website/src/content/docs/build/graceful-failure.mdx new file mode 100644 index 000000000..59675a6d0 --- /dev/null +++ b/pkgs/website/src/content/docs/build/graceful-failure.mdx @@ -0,0 +1,135 @@ +--- +title: Graceful Failure +description: Handle step failures gracefully with retriesExhausted option. +sidebar: + order: 26 +--- + +import { Aside, CardGrid, LinkCard } from '@astrojs/starlight/components'; + +The `retriesExhausted` option controls what happens when a step fails after exhausting all retry attempts. Instead of failing the entire run, you can skip the step and continue. + +## Quick Example + +```typescript +.step({ + slug: 'sendWelcomeEmail', + maxAttempts: 3, + retriesExhausted: 'skip', // If email fails 3x, skip and continue +}, async (input) => { + return await sendEmail(input.run.email); +}) +.step({ + slug: 'createAccount', + dependsOn: ['sendWelcomeEmail'], +}, async (input) => { + // Runs even if email failed - input.sendWelcomeEmail may be undefined + const emailSent = input.sendWelcomeEmail !== undefined; + return { accountId: await createUser(input.run), emailSent }; +}) +``` + +## Options + +| Mode | Behavior | +| -------------- | ------------------------------------------ | +| `fail` | Step fails, run fails (default) | +| `skip` | Step skipped, dependents get `undefined` | +| `skip-cascade` | Step AND all downstream dependents skipped | + + + +## TYPE_VIOLATION Exception + +Programming errors always fail the run, regardless of `retriesExhausted`: + +```typescript +// This ALWAYS fails the run, even with retriesExhausted: 'skip' +.step({ + slug: 'fetchItems', + retriesExhausted: 'skip', +}, () => "not an array") // Returns string instead of array! +.map({ + slug: 'processItems', + array: 'fetchItems', // Expects array, gets string +}, (item) => item * 2) +``` + + + +## Skip Reason + +When `retriesExhausted: 'skip'` triggers, the step gets `skip_reason: 'handler_failed'` with the original error preserved in `error_message`. + +```sql +SELECT step_slug, error_message, skip_reason +FROM pgflow.step_states +WHERE run_id = 'your-run-id' + AND skip_reason = 'handler_failed'; +``` + +## Combining with Conditions + +You can use `retriesExhausted` together with `if` conditions for maximum flexibility: + +```typescript +.step({ + slug: 'enrichFromAPI', + if: { includeEnrichment: true }, // Only attempt if requested + maxAttempts: 3, + retriesExhausted: 'skip', // If API fails, continue anyway +}, async (input) => { + return await externalAPI.enrich(input.run.id); +}) +``` + +This step: + +1. Skips immediately if `includeEnrichment` is false (condition unmet) +2. Retries up to 3 times if the API fails +3. Skips gracefully if all retries fail (instead of failing the run) + +## Best Practices + +**Do use `retriesExhausted: 'skip'` for:** + +- Notification steps (email, SMS, push) +- Analytics and tracking events +- Optional enrichment from external APIs +- Logging and audit trails + +**Don't use for:** + +- Core business logic that must succeed +- Steps that produce data required by downstream steps +- Payment processing or other critical operations +- Steps where silent failure would cause data inconsistency + + + +## Learn More + +For comprehensive coverage of skip behavior, type safety implications, and when to use each mode, see the [Skip Modes](/build/conditional-steps/skip-modes/) guide. + + + + + diff --git a/pkgs/website/src/content/docs/build/index.mdx b/pkgs/website/src/content/docs/build/index.mdx index c3d8a7f3e..79fe692de 100644 --- a/pkgs/website/src/content/docs/build/index.mdx +++ b/pkgs/website/src/content/docs/build/index.mdx @@ -35,6 +35,11 @@ Now that you've created your first flow, learn how to structure your code, integ href="/build/delaying-steps/" description="Schedule steps to run after specified delays for multi-day flows" /> + + + For detailed guidance on validation patterns, see [Validation Steps](/build/validation-steps/). ## Guiding Principle @@ -125,6 +130,11 @@ new Flow({ ## Learn More + ` containment operator. Your pattern only needs the fields you care about: + +| Input | Pattern | Match? | +| ------------------------------------------ | ----------------------------- | ------ | +| `{ plan: 'premium', name: 'Alice' }` | `{ plan: 'premium' }` | Yes | +| `{ plan: 'free' }` | `{ plan: 'premium' }` | No | +| `{ user: { role: 'admin', name: 'Bob' } }` | `{ user: { role: 'admin' } }` | Yes | +| `{ tags: ['urgent', 'support'] }` | `{ tags: ['urgent'] }` | Yes | + + + +See [Pattern Matching](/build/conditional-steps/pattern-matching/) for complete semantics. + +## Behavior Modes + +The `whenUnmet` option controls what happens when a condition isn't met: + +| Mode | Step | Dependents | Run | +| -------------- | ------- | ------------------- | --------- | +| `fail` | Fails | Not executed | Fails | +| `skip` | Skipped | Receive `undefined` | Continues | +| `skip-cascade` | Skipped | All skipped | Continues | + +```typescript +// Skip entire feature branch if condition not met +.step({ + slug: 'loadPremiumData', + if: { plan: 'premium' }, + whenUnmet: 'skip-cascade', +}, handler) +``` + +See [Skip Modes](/build/conditional-steps/skip-modes/) for detailed guidance on choosing modes. + +## Graceful Failure Handling + +The new `retriesExhausted` option controls what happens after all retries fail: + +```typescript +.step({ + slug: 'sendWelcomeEmail', + dependsOn: ['createAccount'], + maxAttempts: 3, + retriesExhausted: 'skip', // Continue even if email fails +}, handler) +``` + +Use `retriesExhausted: 'skip'` for non-critical steps like notifications, analytics, or enrichment that shouldn't block the main workflow. + +See [Graceful Failure](/build/graceful-failure/) for patterns. + +## Type Safety + +pgflow's type system tracks which steps may be skipped: + +```typescript +.step({ + slug: 'processResults', + dependsOn: ['optionalEnrichment'], +}, async (input) => { + // TypeScript knows this may be undefined + if (input.optionalEnrichment) { + return processWithEnrichment(input.optionalEnrichment); + } + return processBasic(input.run); +}) +``` + +With `skip-cascade`, dependent types remain required - if the step runs, the dependency is guaranteed. + +## Migration + +This release includes schema changes. After upgrading, run: + +```bash frame="none" +npx pgflow@latest install +``` + +Then apply the new migration: + +```bash frame="none" +npx supabase db push +``` + + + +See [Install pgflow](/get-started/installation/) for upgrade details. + +## Learn More + +- [Conditional Steps Overview](/build/conditional-steps/) - Introduction to the feature +- [Pattern Matching](/build/conditional-steps/pattern-matching/) - JSON containment patterns +- [Skip Modes](/build/conditional-steps/skip-modes/) - Detailed mode comparison +- [Graceful Failure](/build/graceful-failure/) - Handling failures with `retriesExhausted` +- [Examples](/build/conditional-steps/examples/) - AI/LLM patterns like query routing and RAG fallback diff --git a/pkgs/website/src/content/docs/reference/configuration/step-execution.mdx b/pkgs/website/src/content/docs/reference/configuration/step-execution.mdx index d2f886257..0b7745e22 100644 --- a/pkgs/website/src/content/docs/reference/configuration/step-execution.mdx +++ b/pkgs/website/src/content/docs/reference/configuration/step-execution.mdx @@ -67,14 +67,12 @@ The visibility timeout (in seconds) - how long a task remains invisible to other Set `timeout` higher than your task's maximum processing time.
- -Here's why: - -- When a worker picks up a task, it becomes invisible for `timeout` seconds -- If processing takes longer than `timeout`, the task becomes visible again -- Other workers can then pick up and process the same task -- This leads to duplicate processing -- For example: with `timeout: 30` and a task that takes 45 seconds, the task could be processed twice + Here's why:- When a worker picks up a task, it becomes + invisible for `timeout` seconds - If processing takes longer than `timeout`, + the task becomes visible again - Other workers can then pick up and process + the same task - This leads to duplicate processing - For example: with + `timeout: 30` and a task that takes 45 seconds, the task could be processed + twice
Currently, pgflow uses timeout only for visibility. In the future, the Edge Worker will also use it to terminate tasks that exceed their timeout. @@ -120,13 +118,14 @@ Time 40: Step C starts (waits 10s after B completes) This results in 40+ seconds of delays, not the expected 10s. + + **Better alternatives:** - **Need uniform delays?** Use a constant as shown below - **Rate limiting?** Use worker's `maxConcurrent` setting - **Debug delays?** Add only to specific steps you're debugging - **Compliance delays?** Make them explicit on relevant steps - To apply the same delay to multiple steps, use a constant: @@ -139,6 +138,132 @@ flow +## Conditional Execution Options + +These options control which steps execute based on input patterns and how failures are handled. See [Conditional Steps](/build/conditional-steps/) for detailed explanations and examples. + +### `if` + +**Type:** `ContainmentPattern` +**Default:** Not applicable (must be explicitly set) + +Run the step only if input contains the specified pattern. pgflow uses PostgreSQL's `@>` containment operator for matching. + +```typescript +// Root step - checks flow input +.step({ + slug: 'premiumFeature', + if: { plan: 'premium' }, // Only run for premium users +}, handler) + +// Dependent step - checks dependency output +.step({ + slug: 'notify', + dependsOn: ['analyze'], + if: { analyze: { needsAlert: true } }, // Check analyze output +}, handler) +``` + + + +### `ifNot` + +**Type:** `ContainmentPattern` +**Default:** Not applicable (must be explicitly set) + +Run the step only if input does NOT contain the specified pattern. + +```typescript +.step({ + slug: 'standardUserFlow', + ifNot: { role: 'admin' }, // Skip admin users +}, handler) +``` + +You can combine `if` and `ifNot` - both conditions must be satisfied: + +```typescript +.step({ + slug: 'targetedNotification', + if: { status: 'active' }, // Must be active + ifNot: { role: 'admin' }, // AND must not be admin +}, handler) +``` + +### `whenUnmet` + +**Type:** `'fail' | 'skip' | 'skip-cascade'` +**Default:** `'skip'` + +Controls what happens when `if` or `ifNot` condition is not met. + +| Mode | Behavior | +| ---------------- | ------------------------------------------------------------------------------- | +| `'fail'` | Step fails, entire run fails | +| `'skip'` | Step marked as skipped, run continues, dependents receive `undefined` (default) | +| `'skip-cascade'` | Step AND all downstream dependents skipped, run continues | + +```typescript +.step({ + slug: 'enrichData', + if: { includeEnrichment: true }, + whenUnmet: 'skip', // Default - could be omitted +}, handler) + +.step({ + slug: 'criticalPath', + if: { plan: 'premium' }, + whenUnmet: 'fail', // Explicit - fail if not premium +}, handler) +``` + + + +### `retriesExhausted` + +**Type:** `'fail' | 'skip' | 'skip-cascade'` +**Default:** `'fail'` + +Controls what happens when a step fails after exhausting all `maxAttempts` retry attempts. + +| Mode | Behavior | +| ---------------- | --------------------------------------------------------------------- | +| `'fail'` | Step fails, entire run fails (default) | +| `'skip'` | Step marked as skipped, run continues, dependents receive `undefined` | +| `'skip-cascade'` | Step AND all downstream dependents skipped, run continues | + +```typescript +.step({ + slug: 'sendEmail', + maxAttempts: 3, + retriesExhausted: 'skip', // Don't fail run if email service is down +}, handler) + +.step({ + slug: 'criticalOperation', + maxAttempts: 5, + retriesExhausted: 'fail', // Default - fail if operation fails +}, handler) +``` + + + + + ## Configuration Examples ### Flow with Defaults Only