From 992df42b3270d28443168859c02a66e6ae968de1 Mon Sep 17 00:00:00 2001 From: Garrett Beatty Date: Mon, 11 May 2026 23:02:53 -0400 Subject: [PATCH] add design doc stack-info: PR: https://github.com/aws/aws-lambda-dotnet/pull/2361, branch: GarrettBeatty/stack/3 --- Docs/durable-execution-design.md | 2074 ++++++++++++++++++++++++++++++ 1 file changed, 2074 insertions(+) create mode 100644 Docs/durable-execution-design.md diff --git a/Docs/durable-execution-design.md b/Docs/durable-execution-design.md new file mode 100644 index 000000000..efaa41589 --- /dev/null +++ b/Docs/durable-execution-design.md @@ -0,0 +1,2074 @@ +# .NET Lambda Durable Execution SDK Design + +## Table of Contents + +- [Overview](#overview) +- [Motivation](#motivation) +- [How Durable Execution Works](#how-durable-execution-works) +- [User Experience](#user-experience) + - [Quick Start](#quick-start) + - [Steps](#steps) + - [Wait Operations](#wait-operations) + - [Callbacks](#callbacks) + - [Invoke (Chained Functions)](#invoke-chained-functions) + - [Parallel Execution](#parallel-execution) + - [Map Operations](#map-operations) + - [Child Contexts](#child-contexts) + - [Error Handling & Retry](#error-handling--retry) + - [Logging](#logging) +- [Internals](#internals) +- [API Reference](#api-reference) + - [IDurableContext](#idurablecontext) + - [Configuration Types](#configuration-types) + - [Result Types](#result-types) + - [Exception Types](#exception-types) +- [Serialization](#serialization) +- [Integration with Existing Libraries](#integration-with-existing-libraries) +- [Testing](#testing) +- [Local development (Test Tool v2 and Aspire)](#local-development-test-tool-v2-and-aspire) +- [Requirements & Constraints](#requirements--constraints) +- [Package Structure](#package-structure) +- [Implementation plan](#implementation-plan) +- [Cross-SDK API comparison](#cross-sdk-api-comparison) +- [Common Patterns](#common-patterns) + +--- + +## Overview + +Lambda Durable Functions let you write multi-step workflows that persist state automatically. They can run for days or months, survive failures, and you only pay for actual compute time. + +This doc covers the **.NET Durable Execution SDK** (`Amazon.Lambda.DurableExecution`). SDKs already exist for [Python](https://github.com/aws/aws-durable-execution-sdk-python) and [JavaScript/TypeScript](https://github.com/aws/aws-durable-execution-sdk-js). + +Related: [GitHub Issue #2216](https://github.com/aws/aws-lambda-dotnet/issues/2216) + +--- + +## Motivation + +### The problem + +Today, building multi-step Lambda workflows in .NET requires one of: + +1. **Step Functions** -- a separate service with its own state machine language (ASL), adding latency between steps and forcing you to learn a second programming model. +2. **Manual state management** -- rolling your own checkpointing with DynamoDB or S3, plus retry logic, idempotency keys, and resumption code. +3. **Event-driven choreography** -- chaining functions through SQS/SNS/EventBridge, scattering a single workflow's logic across half a dozen Lambda functions. + +All three push infrastructure concerns into your business logic. The code gets harder to read and test, and nobody wants to inherit it. + +### What durable functions do instead + +With this SDK, you write sequential code and the runtime handles persistence: +- Checkpoints each step's result +- Suspends when waiting (no compute charges while idle) +- Resumes from the last checkpoint on the next invocation +- Retries failed steps with configurable backoff +- Waits for callbacks from external systems + +Your function reads like a normal async method. The SDK deals with state, replay, and recovery. + +### Why build a .NET SDK + +.NET has a large Lambda user base, especially in enterprise shops running order processing, document pipelines, and (increasingly) AI agent workflows. Today those teams either use Step Functions or build custom state machines. A native .NET SDK removes that tradeoff. + +--- + +## How Durable Execution Works + +### The replay model + +Durable functions use a replay-based execution model. Every invocation runs your code from the top, but previously completed steps return their cached result instead of re-executing. + +1. Lambda invokes your function with a `DurableExecutionInvocationInput` containing: + - `DurableExecutionArn` -- unique execution identifier + - `CheckpointToken` -- for optimistic concurrency + - `InitialExecutionState` -- previously checkpointed operations + +2. Your function code runs **from the beginning** on every invocation. + +3. When a **step** is encountered: + - Previously completed → return cached result (no re-execution) + - New → execute it, checkpoint the result, continue + +4. When a **wait** is encountered: + - Already elapsed → continue + - Still pending → return `PENDING`, Lambda terminates, service re-invokes later + +5. The function returns one of: + - `SUCCEEDED` -- workflow completed + - `FAILED` -- workflow failed + - `PENDING` -- workflow suspended (waiting for time or callback) + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ First Invocation (t=0s) │ +│ │ +│ handler(event, context) │ +│ │ │ +│ ├─► context.StepAsync(FetchData) → executes, checkpoints │ +│ │ │ +│ ├─► context.WaitAsync(30 seconds) → returns PENDING │ +│ │ │ +│ └── (Lambda terminates, environment recyclable) │ +└─────────────────────────────────────────────────────────────────┘ + +┌─────────────────────────────────────────────────────────────────┐ +│ Second Invocation (t=30s) │ +│ │ +│ handler(event, context) │ +│ │ │ +│ ├─► context.StepAsync(FetchData) → returns cached result │ +│ │ │ +│ ├─► context.WaitAsync(30 seconds) → already elapsed, skip │ +│ │ │ +│ ├─► context.StepAsync(ProcessData) → executes, checkpoints │ +│ │ │ +│ └── return result → SUCCEEDED │ +└─────────────────────────────────────────────────────────────────┘ +``` + +--- + +## User Experience + +### Quick Start + +#### Installation + +```shell +dotnet add package Amazon.Lambda.DurableExecution +``` + +#### Minimal Example + +```csharp +using Amazon.Lambda.Annotations; +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; + +[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] + +namespace MyDurableFunction; + +public class Function +{ + [LambdaFunction] + [DurableExecution] + public async Task Handler(OrderEvent input, IDurableContext context) + { + // Step 1: Validate the order (checkpointed automatically) + var validation = await context.StepAsync( + async () => await ValidateOrder(input.OrderId), + name: "validate_order"); + + if (!validation.IsValid) + return new OrderResult { Status = "rejected" }; + + // Step 2: Wait for processing (Lambda is NOT running during this time) + await context.WaitAsync(TimeSpan.FromSeconds(30), name: "processing_delay"); + + // Step 3: Process the order + var result = await context.StepAsync( + async () => await ProcessOrder(input.OrderId), + name: "process_order"); + + return new OrderResult { Status = "approved", OrderId = result.OrderId }; + } + + private async Task ValidateOrder(string orderId) { /* ... */ } + private async Task ProcessOrder(string orderId) { /* ... */ } +} +``` + +Things to notice: +- `[LambdaFunction]` + `[DurableExecution]` triggers source generation, so you don't wire up the handler yourself +- Each `StepAsync` call checkpoints its result automatically +- `WaitAsync` suspends the function -- Lambda is not running (or billing you) during the wait +- On replay, completed steps return their cached result without re-executing +- The generated wrapper handles checkpoint batching and cleanup + +#### Manual Handler (Without Annotations) + +If you don't use `Amazon.Lambda.Annotations`, use `DurableFunction.WrapAsync` — a static helper (inspired by [OpenTelemetry's `AWSLambdaWrapper.TraceAsync`](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/tree/main/src/OpenTelemetry.Instrumentation.AWSLambda#lambda-function)) that handles the entire durable execution envelope for you: + +```csharp +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; + +[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] + +namespace MyDurableFunction; + +public class Function +{ + public Task FunctionHandler( + DurableExecutionInvocationInput invocationInput, ILambdaContext context) + => DurableFunction.WrapAsync(MyWorkflow, invocationInput, context); + + private async Task MyWorkflow(OrderEvent input, IDurableContext context) + { + var validation = await context.StepAsync( + async () => await ValidateOrder(input.OrderId), + name: "validate_order"); + + if (!validation.IsValid) + return new OrderResult { Status = "rejected" }; + + await context.WaitAsync(TimeSpan.FromSeconds(30), name: "processing_delay"); + + var result = await context.StepAsync( + async () => await ProcessOrder(input.OrderId), + name: "process_order"); + + return new OrderResult { Status = "approved", OrderId = result.OrderId }; + } + + private async Task ValidateOrder(string orderId) { /* ... */ } + private async Task ProcessOrder(string orderId) { /* ... */ } +} +``` + +`DurableFunction.WrapAsync` handles all the plumbing: +- Hydrates `ExecutionState` from `invocationInput.InitialExecutionState` +- Extracts the user payload from the service envelope +- Runs the workflow through `DurableExecutionHandler.RunAsync` +- Constructs and returns the `DurableExecutionInvocationOutput` envelope (status mapping, JSON serialization) +- Sets execution environment tracking + +For workflows that return no value, use the single-type-parameter overload: + +```csharp +public Task FunctionHandler( + DurableExecutionInvocationInput invocationInput, ILambdaContext context) + => DurableFunction.WrapAsync(MyWorkflow, invocationInput, context); + +private async Task MyWorkflow(OrderEvent input, IDurableContext context) +{ + await context.StepAsync(async () => await SendNotification(input.UserId), name: "notify"); + await context.WaitAsync(TimeSpan.FromHours(1), name: "cooldown"); + await context.StepAsync(async () => await Cleanup(input.UserId), name: "cleanup"); +} +``` + +You'd also need to manually configure the CloudFormation template with `DurableConfig` and managed policies: + +```json +{ + "Resources": { + "MyFunction": { + "Type": "AWS::Serverless::Function", + "Properties": { + "Handler": "MyDurableFunction::MyDurableFunction.Function::FunctionHandler", + "Policies": [ + "AWSLambdaBasicExecutionRole", + "AWSLambdaBasicDurableExecutionRolePolicy" + ], + "DurableConfig": { + "Enabled": true + } + } + } + } +} +``` + +##### What WrapAsync does internally + +For reference, here's the expanded version of what `DurableFunction.WrapAsync` eliminates — this is effectively what the source generator produces for the Annotations path: + +```csharp +public async Task FunctionHandler( + DurableExecutionInvocationInput invocationInput, + ILambdaContext lambdaContext) +{ + // 1. Hydrate execution state from previously checkpointed operations + var state = new ExecutionState(); + state.LoadFromCheckpoint(invocationInput.InitialExecutionState); + + // 2. Extract user payload from the service envelope (internal) + var userPayload = ExtractUserPayload(invocationInput); + + // 3. Run the user's workflow via DurableExecutionHandler.RunAsync + var result = await DurableExecutionHandler.RunAsync( + state, + async (durableContext) => await MyWorkflow(userPayload, durableContext), + invocationInput.DurableExecutionArn); + + // 4. Construct and return the service output envelope + return new DurableExecutionInvocationOutput + { + Status = result.Status, + Result = result.Status == InvocationStatus.Succeeded + ? JsonSerializer.Serialize(result.Result) + : null, + ErrorMessage = result.Message + }; +} +``` + +Key differences between `WrapAsync` and the Annotations approach: +- `WrapAsync` still requires you to define the Lambda entry point signature (`DurableExecutionInvocationInput` → `DurableExecutionInvocationOutput`) +- You configure `DurableConfig` + managed policies in your CloudFormation template manually (not generated) +- No `[LambdaFunction]` or `[DurableExecution]` attributes needed + +With `[LambdaFunction] + [DurableExecution]`, even the entry point and CloudFormation config are generated at compile time — you just write the workflow method. + +--- + +### Steps + +> **Implementations:** [Python](https://github.com/aws/aws-durable-execution-sdk-python/blob/main/src/aws_durable_execution_sdk_python/operation/step.py) | [JavaScript](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js/src/handlers/step-handler/step-handler.ts) + +A step runs your code and checkpoints the result. On replay, the cached result comes back without re-executing. Each step function receives an `IStepContext` with a step-scoped logger and attempt metadata. + +```csharp +// Basic step +var result = await context.StepAsync(async (step) => await CallExternalApi()); + +// Named step (recommended for debugging/testing) +var user = await context.StepAsync( + async (step) => await FetchUser(userId), + name: "fetch_user"); + +// Using the step-scoped logger (includes step name, attempt number, operation ID) +var order = await context.StepAsync( + async (step) => + { + step.Logger.LogInformation("Fetching order {OrderId}", orderId); + return await orderService.GetOrder(orderId); + }, + name: "get_order"); + +// Step with configuration +var payment = await context.StepAsync( + async (step) => await chargeCard(amount), + name: "charge_card", + config: new StepConfig + { + Semantics = StepSemantics.AtMostOncePerRetry, + RetryStrategy = RetryStrategy.Exponential(maxAttempts: 3, initialDelay: TimeSpan.FromSeconds(1)) + }); +``` + +#### Step Semantics + +| Semantics | Behavior | Use Case | +|-----------|----------|----------| +| `AtLeastOncePerRetry` (default) | Step re-executes on each retry | Idempotent operations (calculations, reads) | +| `AtMostOncePerRetry` | Step executes at most once per retry | Side effects (payments, emails, writes) | + +--- + +### Wait Operations + +> **Implementations:** [Python](https://github.com/aws/aws-durable-execution-sdk-python/blob/main/src/aws_durable_execution_sdk_python/operation/wait.py) | [JavaScript](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js/src/handlers/wait-handler/wait-handler.ts) + +Waits suspend the function without consuming compute time. Lambda can recycle the execution environment. + +```csharp +// Wait for a specific duration +await context.WaitAsync(TimeSpan.FromSeconds(30)); +await context.WaitAsync(TimeSpan.FromMinutes(5), name: "cooldown"); +await context.WaitAsync(TimeSpan.FromHours(24), name: "daily_check"); +await context.WaitAsync(TimeSpan.FromDays(7), name: "weekly_reminder"); +``` + +> **Validation:** The duration must be at least 1 second. Values less than 1 second throw `ArgumentOutOfRangeException`. Sub-second precision is truncated to whole seconds (the underlying service operates at second granularity). + +--- + +### Callbacks + +> **Implementations:** [Python](https://github.com/aws/aws-durable-execution-sdk-python/blob/main/src/aws_durable_execution_sdk_python/operation/callback.py) | [JavaScript](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js/src/handlers/callback-handler/callback.ts) + +Callbacks let your workflow pause until an external system responds (human approval, a webhook, a third-party API). + +#### Create a Callback (Advanced) + +```csharp +// Create a callback and get the callback ID +var callback = await context.CreateCallbackAsync( + name: "approval_callback", + config: new CallbackConfig + { + Timeout = TimeSpan.FromHours(24), + HeartbeatTimeout = TimeSpan.FromHours(2) + }); + +// Send the callback ID to an external system +await context.StepAsync( + async () => await SendApprovalEmail(callback.CallbackId, recipientEmail), + name: "send_approval_email"); + +// Wait for the external system to respond +var result = await callback.GetResultAsync(); +``` + +#### Wait For Callback (Simple) + +```csharp +// Combined pattern: create callback, submit to external system, wait for result +var approval = await context.WaitForCallbackAsync( + async (callbackId, ctx) => + { + await SendApprovalEmail(callbackId, managerEmail); + }, + name: "wait_for_approval", + config: new WaitForCallbackConfig + { + Timeout = TimeSpan.FromHours(24), + RetryStrategy = RetryStrategy.Exponential(maxAttempts: 3) + }); + +if (approval.Approved) +{ + await context.StepAsync(async () => await ExecutePlan(), name: "execute"); +} +``` + +**Example `SendApprovalEmail` stub:** +```csharp +private async Task SendApprovalEmail(string callbackId, string recipientEmail) +{ + // Include the callbackId in the approval link so the external system + // can complete the callback via the AWS API + var approvalLink = $"https://my-app.example.com/approve?callbackId={callbackId}"; + await emailService.SendAsync(recipientEmail, "Approval Required", $"Please approve: {approvalLink}"); +} +``` + +**External system completes the callback via AWS API:** +```bash +aws lambda send-durable-execution-callback-success \ + --function-name my-function:1 \ + --callback-id "cb-12345" \ + --payload '{"approved": true, "approver": "jane@example.com"}' +``` + +--- + +### Invoke (Chained Functions) + +> **Implementations:** [Python](https://github.com/aws/aws-durable-execution-sdk-python/blob/main/src/aws_durable_execution_sdk_python/operation/invoke.py) | [JavaScript](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js/src/handlers/invoke-handler/invoke-handler.ts) + +Call another durable function. The invocation is checkpointed, so it survives failures and won't double-fire. + +```csharp +// Invoke another durable function +var paymentResult = await context.InvokeAsync( + functionName: "arn:aws:lambda:us-east-1:123456789012:function:payment-processor:prod", + payload: new PaymentRequest { Amount = 100, Currency = "USD" }, + name: "process_payment", + config: new InvokeConfig + { + Timeout = TimeSpan.FromMinutes(5) + }); +``` + +> **Note:** Durable function invocations require **qualified identifiers** — include a version number, alias, or `$LATEST`: +> - ✅ `arn:aws:lambda:us-east-1:123456789012:function:payment-processor:prod` (alias) +> - ✅ `arn:aws:lambda:us-east-1:123456789012:function:payment-processor:42` (version) +> - ✅ `arn:aws:lambda:us-east-1:123456789012:function:payment-processor:$LATEST` +> - ❌ `arn:aws:lambda:us-east-1:123456789012:function:payment-processor` (unqualified — not supported) + +--- + +### Parallel Execution + +> **Implementations:** [Python](https://github.com/aws/aws-durable-execution-sdk-python/blob/main/src/aws_durable_execution_sdk_python/operation/parallel.py) | [JavaScript](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js/src/handlers/parallel-handler/parallel-handler.ts) + +Run independent operations concurrently. The JS SDK uses a `DurablePromise` pattern where operations are deferred until awaited; in .NET that isn't necessary because `ParallelAsync` and `MapAsync` cover the same use case idiomatically. `Task`-returning methods start immediately and `await` retrieves the result, so there's no gap to fill with a lazy wrapper. + +> **Prefer `ParallelAsync` over `Task.WhenAll`:** While `Task.WhenAll` works correctly with durable operations (operation IDs are allocated deterministically), it bypasses completion policies, concurrency limits, branch naming, and `IBatchResult` structured output. Always use `ParallelAsync` or `MapAsync` for concurrent durable operations. A future Roslyn analyzer (DE004) will flag `Task.WhenAll` usage with durable tasks and suggest `ParallelAsync` as a replacement. + +```csharp +// Run multiple operations in parallel +var results = await context.ParallelAsync( + new Func>[] + { + async (ctx) => await ctx.StepAsync(async () => await FetchUserData(userId), name: "fetch_user"), + async (ctx) => await ctx.StepAsync(async () => await FetchOrderHistory(userId), name: "fetch_orders"), + async (ctx) => await ctx.StepAsync(async () => await FetchPreferences(userId), name: "fetch_prefs"), + }, + name: "parallel_fetch", + config: new ParallelConfig + { + MaxConcurrency = 3, + CompletionConfig = CompletionConfig.AllSuccessful() + }); + +// Access individual results +var userData = results.GetResults()[0]; +var orderHistory = results.GetResults()[1]; +var preferences = results.GetResults()[2]; +``` + +#### Named Parallel Branches + +For better observability, you can name individual branches (matching the JS SDK pattern): + +```csharp +// Named branches for easier debugging and testing +var results = await context.ParallelAsync( + new NamedBranch[] + { + new("fetch_user", async (ctx) => await ctx.StepAsync(async () => await FetchUserData(userId))), + new("fetch_orders", async (ctx) => await ctx.StepAsync(async () => await FetchOrderHistory(userId))), + new("fetch_prefs", async (ctx) => await ctx.StepAsync(async () => await FetchPreferences(userId))), + }, + name: "parallel_fetch"); + +// In tests, you can find specific branches by name +var fetchUserBranch = result.GetOperation("fetch_user"); +``` + +#### Completion Configurations + +`ParallelAsync` and `MapAsync` accept a `CompletionConfig` to control when the overall operation is considered complete: + +```csharp +// All must succeed (default) +CompletionConfig.AllSuccessful() + +// Complete when any one succeeds +CompletionConfig.FirstSuccessful() + +// Complete when all finish (regardless of success/failure) +CompletionConfig.AllCompleted() + +// Custom: succeed if at least 3 succeed, tolerate up to 2 failures +new CompletionConfig +{ + MinSuccessful = 3, + ToleratedFailureCount = 2 +} +``` + +--- + +### Map Operations + +> **Implementations:** [Python](https://github.com/aws/aws-durable-execution-sdk-python/blob/main/src/aws_durable_execution_sdk_python/operation/map.py) | [JavaScript](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js/src/handlers/map-handler/map-handler.ts) + +Process a collection in parallel with configurable concurrency. The `items` parameter accepts any `IReadOnlyList` (arrays, lists, etc.). + +```csharp +var orders = new[] { "order-1", "order-2", "order-3", "order-4", "order-5" }; + +var results = await context.MapAsync( + items: orders, // IReadOnlyList + func: async (ctx, orderId, index, allItems) => + { + return await ctx.StepAsync( + async () => await ProcessOrder(orderId), + name: $"process_order_{index}"); + }, + name: "process_all_orders", + config: new MapConfig + { + MaxConcurrency = 3, + CompletionConfig = CompletionConfig.AllSuccessful(), + ItemNamer = (orderId, index) => $"Order-{orderId}" // Readable names for observability + }); + +// Check results +results.ThrowIfError(); // Throws if any item failed +var processedOrders = results.GetResults(); +``` + +--- + +### Child Contexts + +> **Implementations:** [Python](https://github.com/aws/aws-durable-execution-sdk-python/blob/main/src/aws_durable_execution_sdk_python/operation/child.py) | [JavaScript](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.ts) + +Child contexts group related durable operations into a sub-workflow. Use them when you need waits or multiple steps inside a logical unit (you cannot nest durable calls inside a step directly). + +```csharp +// Group operations into a child context +var enrichedData = await context.RunInChildContextAsync( + async (childCtx) => + { + var validated = await childCtx.StepAsync( + async () => await Validate(data), + name: "validate"); + + await childCtx.WaitAsync(TimeSpan.FromSeconds(1), name: "rate_limit"); + + var enriched = await childCtx.StepAsync( + async () => await Enrich(validated), + name: "enrich"); + + return enriched; + }, + name: "validation_phase"); + +// Use the enriched data in a subsequent step +var finalResult = await context.StepAsync( + async () => await SubmitEnrichedData(enrichedData), + name: "submit"); +``` + +> **Why child contexts?** You cannot nest durable operations inside a step. Steps are leaf operations. If you need multiple durable operations grouped together, use a child context. + +--- + +### Error Handling & Retry + +> **Implementations:** [Python](https://github.com/aws/aws-durable-execution-sdk-python/blob/main/src/aws_durable_execution_sdk_python/retries.py) | [JavaScript](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js/src/utils/retry/retry-config/index.ts) + +#### Retry Strategies + +```csharp +// Exponential backoff with jitter +var result = await context.StepAsync( + async () => await CallUnreliableApi(), + name: "api_call", + config: new StepConfig + { + RetryStrategy = RetryStrategy.Exponential( + maxAttempts: 5, + initialDelay: TimeSpan.FromSeconds(1), + maxDelay: TimeSpan.FromSeconds(60), + backoffRate: 2.0, + jitter: JitterStrategy.Full) + }); + +// Using presets +var result = await context.StepAsync( + async () => await CallApi(), + name: "api_call", + config: new StepConfig + { + RetryStrategy = RetryStrategy.Default // 6 attempts, 2x backoff, 5s initial, Full jitter + }); + +// Available presets: +// RetryStrategy.None — maxAttempts: 1 (no retry) +// RetryStrategy.Default — 6 attempts, 2x backoff, 5s initial delay, Full jitter +// RetryStrategy.Transient — 3 attempts, 2x backoff, 1s initial delay, Full jitter + +// Custom retry strategy +var result = await context.StepAsync( + async () => await CallApi(), + name: "api_call", + config: new StepConfig + { + RetryStrategy = new CustomRetryStrategy((exception, attemptCount) => + { + // Only retry transient errors + if (exception is HttpRequestException httpEx && httpEx.StatusCode >= 500) + return RetryDecision.RetryAfter(TimeSpan.FromSeconds(Math.Pow(2, attemptCount))); + + return RetryDecision.DoNotRetry(); + }) + }); + +// Retry with specific exception types +var result = await context.StepAsync( + async () => await CallApi(), + name: "api_call", + config: new StepConfig + { + RetryStrategy = RetryStrategy.Exponential( + maxAttempts: 3, + retryableExceptions: new[] { typeof(TimeoutException), typeof(HttpRequestException) }) + }); + +// Retry with message pattern matching (regex) +var result = await context.StepAsync( + async () => await CallApi(), + name: "api_call", + config: new StepConfig + { + RetryStrategy = RetryStrategy.Exponential( + maxAttempts: 3, + retryableExceptions: new[] { typeof(HttpRequestException) }, + retryableMessagePatterns: new[] { "timeout", "throttl", "5\\d{2}" }) + }); +``` + +#### Jitter Strategies + +Jitter prevents thundering-herd scenarios where multiple retrying clients converge on the same backoff schedule. The SDK supports three jitter strategies: + +```csharp +public enum JitterStrategy +{ + /// No randomization — delay is exactly the calculated backoff value. + None, + + /// Random delay between 0 and the calculated backoff value (recommended). + Full, + + /// Random delay between 50% and 100% of the calculated backoff value. + Half +} +``` + +The default jitter for `RetryStrategy.Exponential()` is `JitterStrategy.Full`. All built-in presets (`RetryStrategy.Default`, `RetryStrategy.Transient`) also use `JitterStrategy.Full`. Use `JitterStrategy.None` only when you need deterministic retry timing (e.g., for testing). + +#### Retry Strategy Interface + +```csharp +public interface IRetryStrategy +{ + RetryDecision ShouldRetry(Exception exception, int attemptNumber); +} + +public record RetryDecision +{ + public bool ShouldRetry { get; } + public TimeSpan Delay { get; } + + public static RetryDecision DoNotRetry() => new() { ShouldRetry = false }; + public static RetryDecision RetryAfter(TimeSpan delay) => new() { ShouldRetry = true, Delay = delay }; +} +``` + +`IRetryStrategy` supports implicit conversion from `Func`, enabling inline lambdas: + +```csharp +config: new StepConfig +{ + RetryStrategy = (ex, attempt) => + attempt < 3 && ex is HttpRequestException + ? RetryDecision.RetryAfter(TimeSpan.FromSeconds(Math.Pow(2, attempt))) + : RetryDecision.DoNotRetry() +} +``` + +#### Saga Pattern (Compensating Transactions) + +```csharp +[DurableExecution] +public async Task Handler(BookingRequest input, IDurableContext context) +{ + var compensations = new List<(string Name, Func Action)>(); + + try + { + var flight = await context.StepAsync( + async () => await BookFlight(input), + name: "book_flight"); + compensations.Add(("cancel_flight", async () => await CancelFlight(flight.Id))); + + var hotel = await context.StepAsync( + async () => await BookHotel(input), + name: "book_hotel"); + compensations.Add(("cancel_hotel", async () => await CancelHotel(hotel.Id))); + + var car = await context.StepAsync( + async () => await BookCar(input), + name: "book_car"); + compensations.Add(("cancel_car", async () => await CancelCar(car.Id))); + + return new BookingResult { Status = "confirmed" }; + } + catch (Exception ex) + { + // Execute compensations in reverse order + foreach (var (name, action) in compensations.AsEnumerable().Reverse()) + { + await context.StepAsync(action, name: name); + } + return new BookingResult { Status = "cancelled", Error = ex.Message }; + } +} +``` + +--- + +### Logging + +> **Implementations:** [Python](https://github.com/aws/aws-durable-execution-sdk-python/blob/main/src/aws_durable_execution_sdk_python/logger.py) | [JavaScript](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js/src/utils/logger/logger.ts) + +`context.Logger` is replay-aware: it suppresses duplicate messages that would otherwise repeat on every invocation. Use it instead of `Console.WriteLine`. + +> **Implementation note:** The replay-aware logger is implemented entirely in the durable execution SDK. During replay, the SDK tracks which operations are being restored from checkpoint state vs. executing for the first time, and suppresses log output for replayed operations. No changes to `Amazon.Lambda.RuntimeSupport` or the Lambda Runtime API are required. + +```csharp +[DurableExecution] +public async Task Handler(MyEvent input, IDurableContext context) +{ + // ✅ Replay-safe: only logs once even during replay + context.Logger.LogInformation("Starting workflow for {OrderId}", input.OrderId); + + var result = await context.StepAsync( + async () => await ProcessData(input.Data), + name: "process_data"); + + // ✅ Replay-safe + context.Logger.LogInformation("Processing complete: {Result}", result); + + // ❌ NOT replay-safe: will log on every replay + Console.WriteLine("This will repeat!"); + + return result; +} +``` + +The logger integrates with `Microsoft.Extensions.Logging`: + +```csharp +// context.Logger implements ILogger +context.Logger.LogDebug("Debug info"); +context.Logger.LogInformation("Info message"); +context.Logger.LogWarning("Warning: {Detail}", detail); +context.Logger.LogError(exception, "Error occurred"); +``` + +#### Custom Logger Configuration + +You can swap the logger or disable replay-aware filtering (e.g., to see logs during replay for debugging): + +```csharp +// Use a custom logger (e.g., Serilog, AWS Lambda Powertools) +context.ConfigureLogger(new LoggerConfig +{ + CustomLogger = myCustomLogger, + ModeAware = true // true = suppress during replay (default), false = always log +}); + +// Disable replay-aware filtering to see ALL logs (useful for debugging) +context.ConfigureLogger(new LoggerConfig { ModeAware = false }); +``` + +--- + +## Internals + +### AWS APIs used + +| API | Purpose | +|-----|---------| +| `CheckpointDurableExecution` | Persist operation state (step results, waits, etc.) | +| `GetDurableExecutionState` | Retrieve previously checkpointed state on replay | +| `SendDurableExecutionCallbackSuccess` | External systems signal callback completion | +| `SendDurableExecutionCallbackFailure` | External systems signal callback failure | +| `SendDurableExecutionCallbackHeartbeat` | External systems send heartbeat signals | + +### How suspension works internally + +This follows the same pattern as the JavaScript SDK's `Promise.race`. The .NET equivalent is `Task.WhenAny`. + +When `RunAsync` starts, it kicks off two tasks in parallel: user code and a termination signal (a `TaskCompletionSource` that starts unresolved). Whoever finishes first wins: + +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ DurableExecutionHandler.RunAsync │ +│ │ +│ var userTask = userHandler(context); │ +│ var terminationTask = terminationManager.TerminationTask; │ +│ │ +│ var winner = await Task.WhenAny(userTask, terminationTask); │ +│ │ +│ ┌─── userTask ───────────────────┐ ┌─── terminationTask ────────┐ │ +│ │ StepAsync("fetch") → execute │ │ (unresolved TCS - waiting) │ │ +│ │ WaitAsync("delay") → ... │ │ │ │ +│ │ calls Terminate() ──────────────► SetResult() → resolves! │ │ +│ │ awaits forever (blocked) │ │ │ │ +│ └────────────────────────────────┘ └────────────────────────────┘ │ +│ │ +│ winner == terminationTask → return PENDING │ +│ (userTask is abandoned, GC collects it) │ +└─────────────────────────────────────────────────────────────────────┘ +``` + +The `TerminationManager` is a thin wrapper around `TaskCompletionSource`: +- `TerminationTask` -- a Task that hangs forever until `Terminate()` is called +- `Terminate(reason)` -- resolves the TCS, causing the race to pick termination + +When user code hits a pending wait or callback: +1. It checkpoints the operation state +2. Calls `terminationManager.Terminate(WaitScheduled)` +3. Awaits a new never-completing `TaskCompletionSource` (blocks itself permanently) +4. `Task.WhenAny` sees the termination task resolved and picks it as the winner +5. `RunAsync` returns PENDING; Lambda terminates; the abandoned user task is GC'd + +### Lifecycle and cleanup + +`RunAsync` manages the full lifecycle internally. When the handler completes (SUCCEEDED/FAILED) or suspends (PENDING), `RunAsync` stops the background checkpoint batcher, flushes any pending checkpoint operations, and disposes internal state. Users never call `Dispose` or wrap anything in `await using`. + +--- + +## API Reference + +### DurableFunction + +Static helper for the non-Annotations handler path. Wraps a workflow function, handling all envelope translation between `DurableExecutionInvocationInput`/`DurableExecutionInvocationOutput` and user types. + +```csharp +/// +/// Static helper that wraps a durable workflow function, handling all envelope +/// translation between DurableExecutionInvocationInput/Output and user types. +/// Inspired by OpenTelemetry.Instrumentation.AWSLambda's AWSLambdaWrapper.TraceAsync pattern. +/// +public static class DurableFunction +{ + /// + /// Wrap a workflow that takes typed input and returns typed output. + /// + public static Task WrapAsync( + Func> workflow, + DurableExecutionInvocationInput invocationInput, + ILambdaContext lambdaContext); + + /// + /// Wrap a workflow that takes typed input and returns no value. + /// + public static Task WrapAsync( + Func workflow, + DurableExecutionInvocationInput invocationInput, + ILambdaContext lambdaContext); +} +``` + +### IDurableContext + +> **Implementations:** [Python](https://github.com/aws/aws-durable-execution-sdk-python/blob/main/src/aws_durable_execution_sdk_python/context.py) | [JavaScript](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js/src/types/durable-context.ts) + +The primary interface developers interact with: + +```csharp +public interface IDurableContext +{ + /// + /// Replay-safe logger. Messages are de-duplicated during replay. + /// + ILogger Logger { get; } + + /// + /// Metadata about the current durable execution. + /// + IExecutionContext ExecutionContext { get; } + + /// + /// The underlying Lambda context. + /// + ILambdaContext LambdaContext { get; } + + /// + /// Execute a step with automatic checkpointing. + /// The IStepContext provides a step-scoped logger with operation metadata + /// (step name, attempt number, operation ID) and the current attempt number. + /// + Task StepAsync( + Func> func, + string? name = null, + StepConfig? config = null, + CancellationToken cancellationToken = default); + + /// + /// Execute a step that returns no value. + /// + Task StepAsync( + Func func, + string? name = null, + StepConfig? config = null, + CancellationToken cancellationToken = default); + + /// + /// Suspend execution for the specified duration. + /// Throws ArgumentOutOfRangeException if duration is less than 1 second. + /// + Task WaitAsync( + TimeSpan duration, + string? name = null, + CancellationToken cancellationToken = default); + + /// + /// Create a callback for external system integration. + /// + Task> CreateCallbackAsync( + string? name = null, + CallbackConfig? config = null, + CancellationToken cancellationToken = default); + + /// + /// Wait for an external system to respond via callback. + /// + Task WaitForCallbackAsync( + Func submitter, + string? name = null, + WaitForCallbackConfig? config = null, + CancellationToken cancellationToken = default); + + /// + /// Invoke another durable function. + /// + Task InvokeAsync( + string functionName, + TPayload payload, + string? name = null, + InvokeConfig? config = null, + CancellationToken cancellationToken = default); + + /// + /// Execute multiple operations in parallel (unnamed branches). + /// + Task> ParallelAsync( + IReadOnlyList>> functions, + string? name = null, + ParallelConfig? config = null, + CancellationToken cancellationToken = default); + + /// + /// Execute multiple named operations in parallel. Named branches appear in + /// execution traces and can be inspected by name in tests. + /// + Task> ParallelAsync( + IReadOnlyList> branches, + string? name = null, + ParallelConfig? config = null, + CancellationToken cancellationToken = default); + + /// + /// Process a collection of items in parallel. + /// + Task> MapAsync( + IReadOnlyList items, + Func, Task> func, + string? name = null, + MapConfig? config = null, + CancellationToken cancellationToken = default); + + /// + /// Run operations in an isolated child context. + /// + Task RunInChildContextAsync( + Func> func, + string? name = null, + ChildContextConfig? config = null, + CancellationToken cancellationToken = default); + + /// + /// Poll until a condition is met. + /// + Task WaitForConditionAsync( + Func> check, + WaitForConditionConfig config, + string? name = null, + CancellationToken cancellationToken = default); +} +``` + +#### Supporting Types + +```csharp +/// +/// Context passed to step functions. Provides step-scoped logging and metadata. +/// +public interface IStepContext +{ + /// + /// Logger scoped to this step. Includes step name, operation ID, and attempt + /// number in structured log metadata automatically. + /// + ILogger Logger { get; } + + /// + /// The current retry attempt number (1-based). + /// + int AttemptNumber { get; } + + /// + /// The deterministic operation ID for this step. + /// + string OperationId { get; } +} + +/// +/// A named branch for parallel execution. Named branches appear in execution +/// traces and can be inspected by name in the test runner. +/// +public record DurableBranch(string Name, Func> Func); +``` + +#### CancellationToken behavior + +All methods accept a `CancellationToken` that follows standard .NET semantics: cancellation throws `OperationCanceledException` and the execution fails. Cancellation does **not** trigger suspension — those are separate concepts. The durable execution service handles timeout scenarios automatically: if Lambda terminates mid-execution, the next invocation simply replays from the last checkpoint. For advanced users who want to suspend gracefully before timeout, check `context.LambdaContext.RemainingTime` and return early. + +### Configuration Types + +> **Implementations:** [Python](https://github.com/aws/aws-durable-execution-sdk-python/blob/main/src/aws_durable_execution_sdk_python/config.py) | JavaScript: [step](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js/src/types/step.ts) | [batch](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js/src/types/batch.ts) + +```csharp +/// +/// Configuration for step execution. +/// +public class StepConfig +{ + /// + /// Retry strategy for failed steps. Default is no retry. + /// Accepts IRetryStrategy implementations (RetryStrategy.Exponential, etc.) + /// or an inline function via implicit conversion from + /// Func<Exception, int, RetryDecision>. + /// + public IRetryStrategy? RetryStrategy { get; set; } + + /// + /// Execution semantics. Default is AtLeastOncePerRetry. + /// + public StepSemantics Semantics { get; set; } = StepSemantics.AtLeastOncePerRetry; + + /// + /// Custom serializer for the step result. Default is System.Text.Json. + /// + public ICheckpointSerializer? Serializer { get; set; } +} + +public enum StepSemantics +{ + /// + /// Step re-executes on each retry attempt. Safe for idempotent operations. + /// + AtLeastOncePerRetry, + + /// + /// Step executes at most once per retry attempt. Use for side effects. + /// + AtMostOncePerRetry +} + +/// +/// Configuration for callback operations. +/// +public class CallbackConfig +{ + /// + /// Maximum time to wait for callback response. Default (TimeSpan.Zero) means no timeout. + /// + public TimeSpan Timeout { get; set; } = TimeSpan.Zero; + + /// + /// Maximum time between heartbeat signals before timeout. Default (TimeSpan.Zero) means no heartbeat timeout. + /// + public TimeSpan HeartbeatTimeout { get; set; } = TimeSpan.Zero; + + /// + /// Custom serializer for callback result. + /// + public ICheckpointSerializer? Serializer { get; set; } +} + +/// +/// Configuration for wait-for-callback operations. +/// +public class WaitForCallbackConfig : CallbackConfig +{ + /// + /// Retry strategy for the submitter function. + /// + public IRetryStrategy? RetryStrategy { get; set; } +} + +/// +/// Configuration for invoke operations. +/// +public class InvokeConfig +{ + /// + /// Maximum time to wait for the invoked function. Default (TimeSpan.Zero) means no timeout. + /// + public TimeSpan Timeout { get; set; } = TimeSpan.Zero; + + /// + /// Custom serializer for the payload. + /// + public ICheckpointSerializer? PayloadSerializer { get; set; } + + /// + /// Custom serializer for the result. + /// + public ICheckpointSerializer? ResultSerializer { get; set; } +} + +/// +/// Controls how branches are represented in the checkpoint graph. +/// +public enum NestingType +{ + /// + /// Each branch creates a full isolated CONTEXT operation. Higher observability + /// in execution traces but more checkpoint operations (default). + /// + Nested, + + /// + /// Branches use virtual contexts sharing the parent. Reduces checkpoint cost + /// by ~30% at the expense of less granular execution traces. + /// + Flat +} + +/// +/// Configuration for parallel execution. +/// +public class ParallelConfig +{ + /// + /// Maximum concurrent branches. Null = unlimited. + /// + public int? MaxConcurrency { get; set; } + + /// + /// When to consider the operation complete. + /// + public CompletionConfig CompletionConfig { get; set; } = CompletionConfig.AllSuccessful(); + + /// + /// How branches are represented in the checkpoint graph. + /// Nested = full isolated context per branch (default). + /// Flat = virtual contexts sharing parent (~30% fewer checkpoint operations). + /// + public NestingType NestingType { get; set; } = NestingType.Nested; +} + +/// +/// Configuration for map operations. +/// +public class MapConfig +{ + /// + /// Maximum concurrent items. Null = unlimited. + /// + public int? MaxConcurrency { get; set; } + + /// + /// When to consider the operation complete. + /// + public CompletionConfig CompletionConfig { get; set; } = CompletionConfig.AllSuccessful(); + + /// + /// How item branches are represented in the checkpoint graph. + /// + public NestingType NestingType { get; set; } = NestingType.Nested; + + /// + /// Optional batching configuration for grouping items before processing. + /// When set, items are grouped into batches and each batch is processed as a unit. + /// Reduces checkpoint overhead for large collections. + /// + public ItemBatcher? Batcher { get; set; } + + /// + /// Optional function to generate a custom name for each item's branch. + /// Improves observability in execution traces. Receives the item and its index. + /// If null, branches are named by index (e.g., "0", "1", "2"). + /// + public Func? ItemNamer { get; set; } +} + +/// +/// Groups items into batches for map operations to reduce checkpoint overhead. +/// At least one of MaxItemsPerBatch or MaxBytesPerBatch must be set. +/// +public class ItemBatcher +{ + /// + /// Maximum number of items per batch. Null = no count limit. + /// + public int? MaxItemsPerBatch { get; set; } + + /// + /// Maximum serialized size (bytes) per batch. Null = no size limit. + /// + public int? MaxBytesPerBatch { get; set; } +} + +/// +/// Defines completion criteria for parallel/map operations. +/// +public class CompletionConfig +{ + public int? MinSuccessful { get; set; } + public int? ToleratedFailureCount { get; set; } + public double? ToleratedFailurePercentage { get; set; } + + public static CompletionConfig AllSuccessful() => new() { ToleratedFailureCount = 0 }; + public static CompletionConfig FirstSuccessful() => new() { MinSuccessful = 1 }; + public static CompletionConfig AllCompleted() => new(); +} + +/// +/// Configuration for child context operations. +/// +public class ChildContextConfig +{ + /// + /// Custom serializer for the child context's return value. + /// + public ICheckpointSerializer? Serializer { get; set; } + + /// + /// Operation sub-type label for observability (e.g., in test runner output). + /// + public string? SubType { get; set; } + + /// + /// Optional function to transform exceptions from the child context before + /// surfacing them to the parent. Useful for wrapping low-level errors into + /// domain-specific exceptions. + /// + public Func? ErrorMapping { get; set; } +} + +/// +/// Configuration for wait-for-condition (polling). +/// +public class WaitForConditionConfig +{ + /// + /// Initial state passed to the first check invocation. + /// + public required TState InitialState { get; set; } + + /// + /// Strategy controlling how long to wait between checks. + /// + public required IWaitStrategy WaitStrategy { get; set; } +} +``` + +### Result Types + +```csharp +/// +/// Result of a parallel or map operation. +/// +public interface IBatchResult +{ + /// + /// All items (succeeded and failed). + /// + IReadOnlyList> All { get; } + + /// + /// Only successful items. + /// + IReadOnlyList> Succeeded { get; } + + /// + /// Only failed items. + /// + IReadOnlyList> Failed { get; } + + /// + /// Get all successful results. Throws if any failed. + /// + IReadOnlyList GetResults(); + + /// + /// Throw an exception if any item failed. + /// + void ThrowIfError(); + + /// + /// Why the operation completed. + /// + CompletionReason CompletionReason { get; } +} + +public interface IBatchItem +{ + int Index { get; } + BatchItemStatus Status { get; } + T? Result { get; } + DurableExecutionException? Error { get; } +} + +public enum BatchItemStatus { Succeeded, Failed, Cancelled } +public enum CompletionReason { AllCompleted, MinSuccessfulReached, FailureToleranceExceeded } + +/// +/// Represents a pending callback. +/// +public interface ICallback +{ + /// + /// The callback ID to send to external systems. + /// + string CallbackId { get; } + + /// + /// Wait for and return the callback result. + /// Suspends execution until the result is available. + /// + Task GetResultAsync(CancellationToken cancellationToken = default); +} + +/// +/// Metadata about the current execution. +/// +public interface IExecutionContext +{ + /// + /// The ARN of the current durable execution. + /// + string DurableExecutionArn { get; } +} +``` + +### Exception Types + +> **Implementations:** [Python](https://github.com/aws/aws-durable-execution-sdk-python/blob/main/src/aws_durable_execution_sdk_python/exceptions.py) | [JavaScript](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js/src/errors/durable-error/durable-error.ts) + +```csharp +/// +/// Base exception for all durable execution errors. +/// +public class DurableExecutionException : Exception { } + +/// +/// Thrown when user code inside a step fails (after retries exhausted). +/// Contains the original error details from the checkpoint. +/// +public class StepException : DurableExecutionException +{ + public string? ErrorType { get; } + public string? ErrorData { get; } + public IReadOnlyList? StackTrace { get; } +} + +/// +/// Thrown when a callback fails or times out. +/// +public class CallbackException : DurableExecutionException +{ + public string? CallbackId { get; } + public bool IsTimeout { get; } +} + +/// +/// Thrown when an invoked function fails. +/// +public class InvokeException : DurableExecutionException +{ + public string? FunctionName { get; } + public string? ErrorType { get; } + public string? ErrorData { get; } +} + +/// +/// Thrown when a child context operation fails. +/// +public class ChildContextException : DurableExecutionException +{ + public string? SubType { get; } +} + +/// +/// Thrown when a wait-for-condition operation exhausts all attempts +/// without the condition being met. +/// +public class WaitForConditionException : DurableExecutionException +{ + public int AttemptsExhausted { get; } +} + +/// +/// Thrown when the operation sequence during replay does not match +/// the previously checkpointed history. Indicates non-deterministic code. +/// +public class NonDeterministicException : DurableExecutionException +{ + public string? ExpectedOperationId { get; } + public string? ActualOperationId { get; } +} + +/// +/// Thrown when a step is interrupted mid-execution (e.g., Lambda timeout or +/// runtime termination). The step did not complete and its result was not +/// checkpointed. On the next invocation, the step will re-execute from scratch. +/// +public class StepInterruptedException : DurableExecutionException +{ + public string? StepName { get; } + public int AttemptNumber { get; } +} + +/// +/// Thrown when checkpoint serialization or deserialization fails. +/// +public class SerializationException : DurableExecutionException { } + +/// +/// Thrown when input validation fails. +/// +public class DurableValidationException : DurableExecutionException { } + +/// +/// Thrown when the checkpoint API call fails. +/// +public class CheckpointException : DurableExecutionException +{ + public bool IsRetriable { get; } +} +``` + +--- + +## Serialization + +> **Implementations:** [Python](https://github.com/aws/aws-durable-execution-sdk-python/blob/main/src/aws_durable_execution_sdk_python/serdes.py) | [JavaScript](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js/src/utils/serdes/serdes.ts) + +### Default behavior + +Step results are serialized to JSON (via `System.Text.Json`) before checkpointing. Your return types need to be JSON-serializable. + +```csharp +// ✅ GOOD: JSON-serializable types +public record OrderResult(string OrderId, decimal Total, bool IsCompleted); + +// ❌ BAD: Non-serializable types +public class BadResult +{ + public Stream DataStream { get; set; } // Not serializable + public HttpClient Client { get; set; } // Not serializable +} +``` + +### Custom Serialization + +Implement `ICheckpointSerializer` for custom serialization: + +```csharp +public interface ICheckpointSerializer +{ + string Serialize(T value, SerializationContext context); + T Deserialize(string data, SerializationContext context); +} + +public record SerializationContext(string OperationId, string DurableExecutionArn); +``` + +Usage: + +```csharp +var result = await context.StepAsync( + async () => await GetLargeData(), + name: "get_data", + config: new StepConfig + { + Serializer = new CompressedJsonSerializer() + }); +``` + +### Class library vs. executable output + +All samples in this doc use the class library pattern (no `Main` method). This is the default for Lambda functions. To turn a durable function project into an executable (required for NativeAOT or custom runtimes): + +**With Annotations** — add the global attribute to auto-generate a `Main` method: +```csharp +[assembly: LambdaGlobalProperties(GenerateMain = true)] +``` + +**Without Annotations** — provide your own `Main` method: +```csharp +public static async Task Main(string[] args) +{ + using var bootstrap = new LambdaBootstrap( + new Function().FunctionHandler, + new DefaultLambdaJsonSerializer()); + await bootstrap.RunAsync(); +} +``` + +Both approaches produce a self-contained executable that the Lambda custom runtime can invoke. + +### NativeAOT compatibility + +The SDK is AOT-friendly but does not require AOT. The default JSON serialization uses reflection (standard `System.Text.Json` behavior), which works in JIT mode. For NativeAOT deployments, provide a `JsonSerializerContext` via the `ICheckpointSerializer` interface — this avoids all runtime reflection and is fully trim-safe. The SDK itself avoids `Activator.CreateInstance`, `Type.GetType()`, and other reflection patterns, and uses `[DynamicallyAccessedMembers]` trimming annotations where needed. + +```csharp +// Default: works with reflection (JIT mode) +var result = await context.StepAsync(async () => await GetOrder()); + +// AOT mode: user provides serialization context +var result = await context.StepAsync( + async () => await GetOrder(), + config: new StepConfig { Serializer = new JsonCheckpointSerializer(MyJsonContext.Default.Order) }); +``` + +### Large payload and checkpoint overflow + +The durable execution service imposes size limits: + +- **256 KB** per individual operation checkpoint +- **6 MB** maximum Lambda response payload + +The SDK handles overflow transparently: + +**Step results exceeding 256 KB:** When a step's serialized result exceeds the checkpoint size limit, the SDK splits the checkpoint into a START operation (before execution) and a separate result checkpoint (after execution). On replay, the SDK fetches the result via the paginated `GetDurableExecutionState` API rather than reading it inline from the operation record. + +**Batch results (map/parallel) exceeding limits:** For large map/parallel operations, the SDK generates a compact summary for the parent operation's checkpoint. The summary includes item count, success/failure counts, and completion reason — but not individual item results. During replay, the SDK sets `ReplayChildren = true` on the state request, which causes the service to return child operation records so full results can be reconstructed. + +**Lambda response exceeding 6 MB:** If the final orchestration result exceeds the response payload limit, the SDK checkpoints the result before returning the `DurableExecutionInvocationOutput`. The service reads the result from the checkpoint rather than from the response body. + +**Guidance for very large results:** For results that are inherently large (multi-MB payloads), use a custom `ICheckpointSerializer` that offloads to external storage (S3, DynamoDB) and returns a reference. This keeps checkpoint sizes small and avoids pagination overhead: + +```csharp +public class S3BackedSerializer : ICheckpointSerializer +{ + public string Serialize(T value, SerializationContext context) + { + var key = $"results/{context.DurableExecutionArn}/{context.OperationId}"; + // Upload to S3, return the key as the checkpoint value + _s3Client.PutObject(new PutObjectRequest { BucketName = _bucket, Key = key, ... }); + return key; + } + + public T Deserialize(string data, SerializationContext context) + { + // Download from S3 using the stored key + var response = _s3Client.GetObject(new GetObjectRequest { BucketName = _bucket, Key = data }); + return JsonSerializer.Deserialize(response.ResponseStream); + } +} +``` + +--- + +## Integration with Existing Libraries + +### Amazon.Lambda.Core + +The SDK uses existing Lambda core interfaces: +- `ILambdaContext` -- available via `context.LambdaContext` +- `ILambdaSerializer` -- used for event deserialization + +### Amazon.Lambda.RuntimeSupport + +The durable execution handler integrates with the existing runtime support bootstrap: + +```csharp +// The [DurableExecution] attribute signals that the handler +// receives DurableExecutionInvocationInput and returns DurableExecutionInvocationOutput +// The SDK handles the translation to/from the user's handler signature +``` + +### Amazon.Lambda.Annotations (optional) + +`Amazon.Lambda.Annotations` is an **optional** dependency. Users can write durable functions without it (see [Manual Handler](#manual-handler-without-annotations) above), but adding Annotations to the project reduces boilerplate significantly. + +When both packages are referenced, the Annotations source generator detects `[DurableExecution]` by fully-qualified name and at compile time: + +1. Generates a handler wrapper that translates `DurableExecutionInvocationInput` to/from your types +2. Manages context lifecycle (creation, checkpoint batching, cleanup) +3. Adds `DurableConfig` to the CloudFormation template +4. Adds the `AWSLambdaBasicDurableExecutionRolePolicy` managed policy + +```csharp +public class Functions +{ + [LambdaFunction] + [DurableExecution(ExecutionTimeout = 3600, RetentionPeriodInDays = 7)] + public async Task ProcessOrder( + [FromBody] OrderRequest request, + IDurableContext context) + { + var validated = await context.StepAsync( + async (step) => await Validate(request), + name: "validate"); + // ... + } +} +``` + +#### Custom Lambda Client + +For VPC endpoints, custom retry policies, or testing with mocked clients, inject a custom `IAmazonLambda` client via the `[DurableExecution]` attribute: + +```csharp +public class Functions +{ + private readonly IAmazonLambda _lambdaClient; + + public Functions(IAmazonLambda lambdaClient) + { + _lambdaClient = lambdaClient; + } + + [LambdaFunction] + [DurableExecution(LambdaClientFactory = nameof(_lambdaClient))] + public async Task ProcessOrder( + [FromBody] OrderRequest request, + IDurableContext context) + { + // ... + } +} +``` + +When no `LambdaClientFactory` is specified, the generated code creates a default `AmazonLambdaClient`. For the manual handler path, pass the client directly to `DurableExecutionHandler.RunAsync`. + +> **Dependency boundaries:** `Amazon.Lambda.Annotations` has **no dependency** on the AWS SDK or on `Amazon.Lambda.DurableExecution`. The Annotations source generator references durable execution types by fully-qualified name strings only — it never takes a compile-time dependency on the durable package. The `[DurableExecution]` attribute is defined in `Amazon.Lambda.DurableExecution`, and the generated code resolves against the user's project references. There is only one source generator (Annotations) — no coordination between multiple generators is needed. + +### AWSSDK.Lambda + +The `Amazon.Lambda.DurableExecution` package depends on the AWS SDK for .NET Lambda client to make checkpoint API calls. This dependency is confined to the durable execution package — `Amazon.Lambda.Annotations` does not depend on the AWS SDK. + + +- `CheckpointDurableExecutionAsync` +- `GetDurableExecutionStateAsync` + +--- + +## Testing (customer-facing package) + +> **Implementations:** [JavaScript (local runner)](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/local-durable-test-runner.ts) | [JavaScript (cloud runner)](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js-testing/src/test-runner/cloud/cloud-durable-test-runner.ts) + +We ship a separate NuGet package (`Amazon.Lambda.DurableExecution.Testing`) that lets developers test their durable functions locally without deploying to AWS. + +**Why this needs to exist:** A durable function requires multiple Lambda invocations to complete (invoke → PENDING → wait → re-invoke → SUCCEEDED). You can't test that with a normal unit test because there's no Lambda service orchestrating the re-invocations. The test runner simulates this loop in-process: it calls your handler, gets PENDING, marks waits as elapsed, calls your handler again with the prior checkpoint state, and repeats until the workflow completes. + +```csharp +var runner = new DurableTestRunner( + handler: new Function().Handler, + options: new TestRunnerOptions + { + SkipTime = true, // Waits complete instantly (no real delays) + MaxInvocations = 10 // Safety limit to prevent infinite loops + }); + +var result = await runner.RunAsync( + input: new OrderEvent { OrderId = "order-123" }, + timeout: TimeSpan.FromSeconds(30)); + +Assert.Equal(InvocationStatus.Succeeded, result.Status); +Assert.Equal("approved", result.Result.Status); + +// Inspect individual steps +var validateStep = result.GetStep("validate_order"); +Assert.True(validateStep.GetResult().IsValid); +``` + +The Python and JS SDKs both ship equivalent test runner packages. + +### Cloud Test Runner + +For integration testing against deployed functions, the testing package also ships a `CloudDurableTestRunner` with the same API as the local runner. This lets developers run the exact same assertions against a real Lambda function: + +```csharp +var runner = new CloudDurableTestRunner( + functionArn: "arn:aws:lambda:us-east-1:123456789012:function:process-order:$LATEST"); + +var result = await runner.RunAsync( + input: new OrderEvent { OrderId = "order-123" }, + timeout: TimeSpan.FromSeconds(60)); + +Assert.Equal(InvocationStatus.Succeeded, result.Status); +var validateStep = result.GetStep("validate_order"); +Assert.True(validateStep.GetResult().IsValid); +``` + +The cloud runner invokes the deployed function and polls `GetDurableExecutionState` until the execution reaches a terminal state, then reconstructs the same `TestResult` structure as the local runner. + +### Function Registration for Invoke Testing + +To test workflows that use `InvokeAsync` without deploying, register sibling functions with the local test runner: + +```csharp +var paymentHandler = new PaymentFunction().Handler; + +var runner = new DurableTestRunner( + handler: new OrderFunction().Handler, + options: new TestRunnerOptions { SkipTime = true }); + +runner.RegisterFunction("process-payment", paymentHandler); +runner.RegisterFunction( + "arn:aws:lambda:us-east-1:123:function:process-payment:$LATEST", + paymentHandler); + +var result = await runner.RunAsync(input: new OrderEvent { OrderId = "123" }); +``` + +When the workflow calls `context.InvokeAsync("process-payment", payload)`, the test runner routes to the registered handler instead of making an AWS API call. + +--- + +## Local development (Test Tool v2 and Aspire) + +The Lambda Test Tool v2 and the Aspire Lambda integration currently emulate single-invocation Lambda functions. Durable functions require a multi-invocation loop that neither tool supports today. To add support, the local emulator needs three things: + +### Checkpoint API endpoints + +The SDK calls these during execution. The emulator would serve them locally with in-memory storage: + +- `POST /checkpoint-durable-execution` -- store step results, wait records +- `GET /durable-execution-state` -- return accumulated state for replay + +### An orchestration loop + +When the function returns `PENDING`, the emulator needs to: +- Parse the checkpoint to determine what's pending (timer, callback, retry) +- Wait for that condition (or skip it in fast mode) +- Re-invoke the function with the accumulated `DurableExecutionInvocationInput` +- Repeat until `SUCCEEDED` or `FAILED` + +### Callback delivery + +An endpoint that external tools (or the developer via the UI) can call to deliver callback results: + +- `POST /send-durable-execution-callback-success` +- This triggers a re-invocation of the waiting execution + +### How this relates to the testing SDK + +The `DurableTestRunner` in the testing package implements the same orchestration loop programmatically. The test tool / Aspire enhancement would reuse this engine and wrap it in a web UI or Aspire dashboard, giving developers a visual way to see execution state, deliver callbacks manually, skip timers, and inspect checkpoint history. + +### Priority + +This is post-v1 work. For the initial release, developers test durable functions using the programmatic `DurableTestRunner` or by deploying to AWS. Test tool and Aspire support are a fast-follow once the core SDK is stable. + +--- + +## Requirements & Constraints + +- **Target framework:** `net8.0` only. .NET 6 is EOL and not supported. Durable functions are a new feature — adopters will be on the latest managed runtime. Targeting .NET 8 gives access to `required` properties, improved `System.Text.Json` source generation, and better NativeAOT support. +- **Lambda runtime:** Requires the managed .NET 8 runtime or a custom runtime (`provided.al2023`) for NativeAOT deployments. +- **Durable execution service:** The function must be configured with `DurableConfig` (handled automatically by the `[DurableExecution]` source generator). +- **Qualified function identifiers:** `InvokeAsync` requires a version number, alias, or `$LATEST` — unqualified ARNs are not supported for durable invocations. +- **Serializable results:** All step return types must be JSON-serializable (or use a custom `ICheckpointSerializer`). + +--- + +## Package Structure + +### Amazon.Lambda.DurableExecution (Runtime) + +The core SDK that runs in Lambda. Minimal dependencies. + +**Dependencies:** +- `Amazon.Lambda.Core` (existing) +- `AWSSDK.Lambda` (for checkpoint/state APIs) +- `Microsoft.Extensions.Logging.Abstractions` (for ILogger) + +### Amazon.Lambda.DurableExecution.Testing (Dev-only) + +Test runner and helpers for local/cloud testing. + +**Dependencies:** +- `Amazon.Lambda.DurableExecution` +- `Amazon.Lambda.TestUtilities` (existing) + +### Blueprints (`dotnet new` Templates) + +New `dotnet new` templates ship as part of the existing `Amazon.Lambda.Templates` NuGet package (same as all other Lambda blueprints in this repo under `Blueprints/BlueprintDefinitions/`). + +**Templates to ship:** + +| Template short name | Description | +|---------------------|-------------| +| `lambda.DurableFunction` | Minimal durable function with a single step and wait. Includes test project with `DurableTestRunner`. | +| `lambda.DurableFunction.Agentic` | GenAI agentic loop pattern (invoke model → check tool call → execute tool → repeat). | +| `lambda.DurableFunction.HumanInTheLoop` | Callback-based human approval workflow. | + +Each template includes: +- `.csproj` with correct NuGet references (`Amazon.Lambda.DurableExecution`, `Amazon.Lambda.Annotations`) +- Handler class with `[LambdaFunction]` + `[DurableExecution]` attributes +- `serverless.template` (auto-generated by source generator on build) +- Test project with `DurableTestRunner` and a passing test +- `aws-lambda-tools-defaults.json` for deployment via `dotnet lambda deploy-function` + +Running `dotnet new lambda.DurableFunction` should produce a buildable, testable, deployable project in under 30 seconds. + +--- + +## Implementation plan + +| Workstream | Scope | Estimate | +|------------|-------|----------| +| **Durable execution runtime** | Core SDK: replay engine, all context operations (step, wait, callback, invoke, parallel, map), checkpoint batching, retry, logging | ~5-6 weeks | +| **Annotations / source generator** | `[DurableExecution]` attribute, handler wrapper codegen, CloudFormation DurableConfig + IAM policy generation | ~2 weeks | +| **Testing SDK** | Local test runner (in-memory, time-skipping), cloud test runner, step inspection API | ~1.5 weeks | +| **Blueprints, docs, examples** | `dotnet new` project templates, developer guide, API reference, sample projects | ~2 weeks | +| **Roslyn analyzers** (P1 follow-up) | Static analysis detecting non-determinism, nesting violations, closure mutations | ~2 weeks | + +**Total: ~10-11 weeks (1 engineer familiar with the Python/JS SDKs)** + Roslyn analyzers as follow-up + +### Roslyn Analyzers (P1 Follow-up) + +> **Reference implementation:** JavaScript ESLint plugin — [no-non-deterministic-outside-step](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js-eslint-plugin/src/rules/no-non-deterministic-outside-step/no-non-deterministic-outside-step.ts) | [no-nested-durable-operations](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js-eslint-plugin/src/rules/no-nested-durable-operations/no-nested-durable-operations.ts) | [no-closure-in-durable-operations](https://github.com/aws/aws-durable-execution-sdk-js/blob/main/packages/aws-durable-execution-sdk-js-eslint-plugin/src/rules/no-closure-in-durable-operations/no-closure-in-durable-operations.ts) + +Ship as a separate NuGet package: `Amazon.Lambda.DurableExecution.Analyzers` + +The JavaScript SDK ships an ESLint plugin (`@aws/durable-execution-sdk-js-eslint-plugin`) with three rules that catch the most common durable execution mistakes at author time. The .NET equivalent uses Roslyn diagnostic analyzers: + +| Diagnostic ID | Severity | Rule | Rationale | +|---------------|----------|------|-----------| +| DE001 | Warning | `DateTime.Now`, `DateTime.UtcNow`, `Guid.NewGuid()`, `Random.Next()`, `Random.Shared`, `Environment.TickCount` used outside a `StepAsync` body | Non-deterministic values produce different results on replay, breaking checkpoint consistency | +| DE002 | Error | Calling `context.StepAsync`, `WaitAsync`, `ParallelAsync`, `MapAsync`, `InvokeAsync`, `RunInChildContextAsync`, `CreateCallbackAsync`, or `WaitForCallbackAsync` inside a `StepAsync` lambda | Steps are leaf operations — nesting durable operations inside a step produces unpredictable behavior | +| DE003 | Warning | Mutable variable captured by a `StepAsync` lambda and written to inside the lambda body | On replay the step returns cached result without executing, so the write never happens — the outer variable has stale state | +| DE004 | Info | `Task.WhenAll` or `Task.WhenAny` called with tasks returned by durable context methods | Suggest using `ParallelAsync` for completion policies, nesting control, and observability | + +These analyzers run at compile time in the IDE (IntelliSense squiggles) and during `dotnet build`, preventing the most confusing class of runtime failures. + +--- + +## Cross-SDK API comparison + +All three SDKs expose the same core operations. The differences are naming conventions, parameter ordering, and concurrency model. + +| Operation | .NET | Python | JavaScript | +|-----------|------|--------|------------| +| Step | `context.StepAsync(func, name?, config?)` | `context.step(func, name?, config?)` | `context.step(name?, fn, config?)` → `DurablePromise` | +| Wait | `context.WaitAsync(duration, name?)` | `context.wait(duration, name?)` | `context.wait(name?, duration)` → `DurablePromise` | +| Create callback | `context.CreateCallbackAsync(name?, config?)` | `context.create_callback(name?, config?)` | `context.createCallback(name?, config?)` | +| Wait for callback | `context.WaitForCallbackAsync(submitter, name?, config?)` | `context.wait_for_callback(submitter, name?, config?)` | `context.waitForCallback(name?, submitter, config?)` | +| Invoke | `context.InvokeAsync(funcName, payload, name?, config?)` | `context.invoke(func_name, payload, name?, config?)` | `context.invoke(name?, funcId, input, config?)` → `DurablePromise` | +| Parallel | `context.ParallelAsync(functions, name?, config?)` | `context.parallel(functions, name?, config?)` | `context.parallel(name?, branches, config?)` | +| Map | `context.MapAsync(items, func, name?, config?)` | `context.map(inputs, func, name?, config?)` | `context.map(name?, items, mapFunc, config?)` | +| Child context | `context.RunInChildContextAsync(func, name?, config?)` | `context.run_in_child_context(func, name?, config?)` | `context.runInChildContext(name?, fn, config?)` | +| Wait for condition | `context.WaitForConditionAsync(check, config, name?)` | `context.wait_for_condition(check, config, name?)` | `context.waitForCondition(name?, checkFunc, config?)` | +| Logger | `context.Logger` (ILogger) | `context.logger` (Logger) | `context.logger` (DurableContextLogger) | +| Lambda context | `context.LambdaContext` | `context.lambda_context` | `context.lambdaContext` | +| Execution context | `context.ExecutionContext` | `context.execution_context` | *(via logger metadata)* | +| Promise combinators | `CompletionConfig` on `ParallelAsync` | `CompletionConfig` on `parallel`/`map` | `context.promise.all/allSettled/any/race` | +| Configure logger | `context.ConfigureLogger(config)` | `context.set_logger(logger)` | `context.configureLogger(config)` | +| Cancellation | `CancellationToken` on all methods | *(N/A)* | *(N/A)* | +| Jitter strategy | `JitterStrategy` enum on `Exponential()` | `jitter_strategy` on `RetryStrategyConfig` | `jitter` on `createRetryStrategy()` | +| Retry presets | `RetryStrategy.None/Default/Transient` | `RetryPresets.none()/default()/transient()` | `retryPresets.default/linear/noRetry` | +| Nesting type | `NestingType` on `ParallelConfig`/`MapConfig` | `NestingType` on parallel/map config | `NestingType` on parallel/map config | +| Item batching | `ItemBatcher` on `MapConfig` | `ItemBatcher` on `MapConfig` | *(checkpoint manager handles batching)* | +| Item namer | `ItemNamer` on `MapConfig` | Item naming function on `MapConfig` | `itemNamer` on `MapConfig` | +| Error mapping | `ErrorMapping` on `ChildContextConfig` | *(typed exception wrapping)* | `errorMapping` on child context config | +| Message-based retry filter | `retryableMessagePatterns` (regex) | `retryable_errors` (regex) | `retryableErrors` (RegExp[]) | +| Step context / scoped logger | `IStepContext` with `Logger`, `AttemptNumber` | `StepContext` with `logger` | `ctx` with `logger` in step callback | +| Named parallel branches | `DurableBranch(name, func)` | Function `__name__` | `{ name, func }` objects | +| Inline retry lambda | `Func` | `Callable[[Exception, int], RetryDecision]` | `(error, attempt) => RetryDecision` | +| Static analysis | Roslyn analyzers (P1 follow-up) | *(N/A)* | ESLint plugin (3 rules) | +| Cloud test runner | `CloudDurableTestRunner` | `pytest --runner-mode=cloud` | `CloudDurableTestRunner` | + +**Key differences:** + +- **Concurrency model:** JS returns `DurablePromise` (lazy, deferred until awaited). Python is synchronous (blocks the thread). .NET returns `Task` (standard async/await). Note: `Task.WhenAll` works with durable operations but `ParallelAsync`/`MapAsync` are preferred for completion policies and observability. +- **Name parameter position:** JS puts `name` first; Python and .NET put it after the function/duration. +- **Parallel semantics in JS:** JS uses `context.promise.all/any/race/allSettled` to combine DurablePromises. .NET and Python use `CompletionConfig` on the `Parallel`/`Map` operations instead. +- **.NET-only:** `CancellationToken` on every method (standard .NET pattern). +- **Jitter default:** All three SDKs default to full jitter on retry strategies. + +--- + +## Common Patterns + +### GenAI Agentic Loop + +```csharp +[DurableExecution] +public async Task AgentHandler(AgentRequest input, IDurableContext context) +{ + var messages = new List + { + new Message { Role = "user", Content = input.Prompt } + }; + + while (true) + { + var response = await context.StepAsync( + async (step) => await InvokeModel(messages), + name: "invoke_model"); + + if (response.ToolCall == null) + return response.Content; + + var toolResult = await context.StepAsync( + async (step) => await ExecuteTool(response.ToolCall), + name: $"tool_{response.ToolCall.Name}"); + + messages.Add(new Message { Role = "assistant", Content = toolResult }); + } +} +``` + +### Human-in-the-Loop + +```csharp +[DurableExecution] +public async Task ReviewHandler(ReviewRequest input, IDurableContext context) +{ + var analysis = await context.StepAsync( + async (step) => await AnalyzeDocument(input.DocumentUrl), + name: "analyze_document"); + + context.Logger.LogInformation("Analysis complete, requesting human review"); + + var review = await context.WaitForCallbackAsync( + async (callbackId, ctx) => + { + await NotifyReviewer(input.ReviewerEmail, callbackId, analysis); + }, + name: "human_review", + config: new WaitForCallbackConfig + { + Timeout = TimeSpan.FromDays(7), + HeartbeatTimeout = TimeSpan.FromHours(24) + }); + + if (review.Approved) + { + await context.StepAsync( + async (step) => await PublishDocument(input.DocumentUrl), + name: "publish"); + } + + return new ReviewResult { Status = review.Approved ? "published" : "rejected" }; +} +``` + +### Scheduled Pipeline with Retries + +```csharp +[DurableExecution] +public async Task DataPipeline(PipelineInput input, IDurableContext context) +{ + // Extract + var rawData = await context.StepAsync( + async (step) => await ExtractFromSource(input.SourceId), + name: "extract", + config: new StepConfig + { + RetryStrategy = RetryStrategy.Exponential(maxAttempts: 5, initialDelay: TimeSpan.FromSeconds(2)) + }); + + // Transform (fan-out) + var transformed = await context.MapAsync( + items: rawData.Chunks, + func: async (ctx, chunk, index, _) => + { + return await ctx.StepAsync( + async (step) => await TransformChunk(chunk), + name: $"transform_{index}"); + }, + name: "transform_all", + config: new MapConfig { MaxConcurrency = 10 }); + + transformed.ThrowIfError(); + + // Load + var loadResult = await context.StepAsync( + async (step) => await LoadToDestination(transformed.GetResults()), + name: "load", + config: new StepConfig + { + Semantics = StepSemantics.AtMostOncePerRetry + }); + + // Wait before next run + await context.WaitAsync(TimeSpan.FromHours(1), name: "schedule_delay"); + + return new PipelineResult { RecordsProcessed = loadResult.Count }; +} +``` + +--- + +## References + +- [AWS Blog: Build multi-step applications and AI workflows with AWS Lambda durable functions](https://aws.amazon.com/blogs/aws/build-multi-step-applications-and-ai-workflows-with-aws-lambda-durable-functions/) +- [AWS Documentation: Lambda Durable Functions](https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html) +- [Python SDK Repository](https://github.com/aws/aws-durable-execution-sdk-python) +- [JavaScript/TypeScript SDK Repository](https://github.com/aws/aws-durable-execution-sdk-js) +- [GitHub Issue #2216: .NET Durable Functions Support](https://github.com/aws/aws-lambda-dotnet/issues/2216) +- [Existing .NET Annotations Design Doc](lambda-annotations-design.md)