diff --git a/docs/adr/003-completable-future-based-coordination.md b/docs/adr/003-completable-future-based-coordination.md index 4b571eaf6..6a73fcfdb 100644 --- a/docs/adr/003-completable-future-based-coordination.md +++ b/docs/adr/003-completable-future-based-coordination.md @@ -1,6 +1,6 @@ # ADR-003: CompletableFuture-Based Operation Coordination -**Status:** Review +**Status:** Accepted **Date:** 2026-02-18 ## Context diff --git a/docs/advanced/configuration.md b/docs/advanced/configuration.md index abf2dfbde..bc20f86a3 100644 --- a/docs/advanced/configuration.md +++ b/docs/advanced/configuration.md @@ -35,5 +35,7 @@ public class OrderProcessor extends DurableHandler { | `withSerDes()` | Serializer for step results | Jackson with default settings | | `withExecutorService()` | Thread pool for user-defined operations | Cached daemon thread pool | | `withLoggerConfig()` | Logger behavior configuration | Suppress logs during replay | +| `withPollingStrategy()` | Backend polling strategy | Exponential backoff: 1s base, 2x rate, FULL jitter, 10s max | +| `withCheckpointDelay()` | How often the SDK checkpoints updates | `Duration.ofSeconds(0)` (as soon as possible) | The `withExecutorService()` option configures the thread pool used for running user-defined operations. Internal SDK coordination (checkpoint batching, polling) runs on an SDK-managed thread pool. \ No newline at end of file diff --git a/docs/advanced/error-handling.md b/docs/advanced/error-handling.md index 4285f9e17..08d13b0a7 100644 --- a/docs/advanced/error-handling.md +++ b/docs/advanced/error-handling.md @@ -4,21 +4,26 @@ The SDK throws specific exceptions to help you handle different failure scenario ``` DurableExecutionException - General durable exception -├── NonDeterministicExecutionException - Code changed between original execution and replay. Fix code to maintain determinism; don't change step order/names. ├── SerDesException - Serialization and deserialization exception. +├── UnrecoverableDurableExecutionException - Execution cannot be recovered. The durable execution will be immediately terminated. +│ ├── NonDeterministicExecutionException - Code changed between original execution and replay. Fix code to maintain determinism; don't change step order/names. +│ └── IllegalDurableOperationException - An illegal operation was detected. The execution will be immediately terminated. └── DurableOperationException - General operation exception ├── StepException - General Step exception - │ ├── StepFailedException - Step exhausted all retry attempts.Catch to implement fallback logic or let execution fail. + │ ├── StepFailedException - Step exhausted all retry attempts. Catch to implement fallback logic or let execution fail. │ └── StepInterruptedException - `AT_MOST_ONCE` step was interrupted before completion. Implement manual recovery (check if operation completed externally) ├── InvokeException - General chained invocation exception │ ├── InvokeFailedException - Chained invocation failed. Handle the error or propagate failure. - │ ├── InvokeTimedoutException - Chained invocation timed out. Handle the error or propagate failure. + │ ├── InvokeTimedOutException - Chained invocation timed out. Handle the error or propagate failure. │ └── InvokeStoppedException - Chained invocation stopped. Handle the error or propagate failure. ├── CallbackException - General callback exception │ ├── CallbackFailedException - External system sent an error response to the callback. Handle the error or propagate failure - │ └── CallbackTimeoutException - Callback exceeded its timeout duration. Handle the error or propagate the failure + │ ├── CallbackTimeoutException - Callback exceeded its timeout duration. Handle the error or propagate the failure + │ └── CallbackSubmitterException - Submitter step failed to submit the callback. Handle the error or propagate failure ├── WaitForConditionFailedException- waitForCondition exceeded max polling attempts or failed. Catch to implement fallback logic. - └── ChildContextFailedException - Child context failed and the original exception could not be reconstructed + ├── ChildContextFailedException - Child context failed and the original exception could not be reconstructed + ├── MapIterationFailedException - Map iteration failed and the original exception could not be reconstructed + └── ParallelBranchFailedException - Parallel branch failed and the original exception could not be reconstructed ``` ```java diff --git a/docs/core/callbacks.md b/docs/core/callbacks.md index cab6dc305..1243fa0e5 100644 --- a/docs/core/callbacks.md +++ b/docs/core/callbacks.md @@ -45,7 +45,7 @@ var waitForCallbackConfig = WaitForCallbackConfig.builder() .callbackConfig(config) .stepConfig(StepConfig.builder().retryStrategy(...).build()) .build(); -ctx.waitForCallback("approval", String.class, callbackId -> sendApprovalRequest(callbackId), waitForCallbackConfig); +ctx.waitForCallback("approval", String.class, (callbackId, stepCtx) -> sendApprovalRequest(callbackId), waitForCallbackConfig); ``` | Option | Description | diff --git a/docs/core/invoke.md b/docs/core/invoke.md index 73dfe3d95..9e474cfad 100644 --- a/docs/core/invoke.md +++ b/docs/core/invoke.md @@ -9,8 +9,7 @@ var result = ctx.invoke("invoke-function", Result.class, InvokeConfig.builder() .payloadSerDes(...) // payload serializer - .resultSerDes(...) // result deserializer - .timeout(Duration.of(...)) // wait timeout + .serDes(...) // result deserializer .tenantId(...) // Lambda tenantId .build() ); diff --git a/docs/core/map.md b/docs/core/map.md index 544838d17..5b8453e3d 100644 --- a/docs/core/map.md +++ b/docs/core/map.md @@ -57,9 +57,9 @@ Each `MapResultItem` contains: | Field | Description | |-------|-------------| -| `status()` | `SUCCEEDED`, `FAILED`, or `NOT_STARTED` | -| `result()` | The result value, or `null` if failed/not started | -| `error()` | The error details as `MapError`, or `null` if succeeded/not started | +| `status()` | `SUCCEEDED`, `FAILED`, or `SKIPPED` | +| `result()` | The result value, or `null` if failed/skipped | +| `error()` | The error details as `MapError`, or `null` if succeeded/skipped | ### MapError @@ -135,10 +135,10 @@ var config = MapConfig.builder() .build(); var result = ctx.map("find-two", items, String.class, fn, config); -assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason()); +assertEquals(ConcurrencyCompletionStatus.MIN_SUCCESSFUL_REACHED, result.completionReason()); ``` -When early termination triggers, items that were never started have `NOT_STARTED` status with `null` for both result and error in the `MapResult`. +When early termination triggers, items that were never started have `SKIPPED` status with `null` for both result and error in the `MapResult`. ### Checkpoint-and-Replay diff --git a/docs/core/parallel.md b/docs/core/parallel.md index 509b69548..4fd344e95 100644 --- a/docs/core/parallel.md +++ b/docs/core/parallel.md @@ -1,143 +1,121 @@ -# Parallel Operations Design Plan +## parallel() – Concurrent Branch Execution -## Overview - -Add parallel execution capability to the AWS Lambda Durable Execution SDK, allowing multiple branches to run concurrently within a single durable function execution. - -## API Design - -### User Interface +`parallel()` runs multiple independent branches concurrently, each in its own child context. Branches are registered via `branch()` and execute immediately (respecting `maxConcurrency`). The operation completes when all branches finish or completion criteria are met. ```java -try (var parallelContext = ctx.parallel(ParallelConfig.builder().build())) { - DurableFuture task1 = parallelContext.branch("validate", Boolean.class, branchContext -> validate()); - DurableFuture task2 = parallelContext.branch("process", String.class, branchContext -> process()); - parallelContext.join(); // Wait for completion based on config - - // Access results - Boolean validated = task1.get(); - String processed = task2.get(); -} +// Basic parallel execution +var parallel = ctx.parallel("validate-and-process"); +DurableFuture task1 = parallel.branch("validate", Boolean.class, branchCtx -> { + return branchCtx.step("check", Boolean.class, stepCtx -> validate()); +}); +DurableFuture task2 = parallel.branch("process", String.class, branchCtx -> { + return branchCtx.step("work", String.class, stepCtx -> process()); +}); + +// Wait for all branches and get the aggregate result +ParallelResult result = parallel.get(); + +// Access individual branch results +Boolean validated = task1.get(); +String processed = task2.get(); ``` -### Core Components - -#### 1. ParallelConfig -Configuration object controlling parallel execution behavior: +`ParallelDurableFuture` implements `AutoCloseable` — calling `close()` triggers `get()` if it hasn't been called yet, ensuring all branches complete. ```java -ParallelConfig config = ParallelConfig.builder() - .maxConcurrency(5) // Max branches running simultaneously - .minSuccessful(3) // Minimum successful branches required (-1 = all) - .toleratedFailureCount(2) // Max failures before stopping execution - .build(); +// AutoCloseable pattern +try (var parallel = ctx.parallel("work")) { + parallel.branch("a", String.class, branchCtx -> branchCtx.step("a1", String.class, stepCtx -> "a")); + parallel.branch("b", String.class, branchCtx -> branchCtx.step("b1", String.class, stepCtx -> "b")); +} // close() calls get() automatically ``` -**Configuration Rules:** -- `maxConcurrency`: Controls resource usage, prevents overwhelming the system -- `minSuccessful`: Enables "best effort" scenarios where not all branches need to succeed -- `toleratedFailureCount`: Fail-fast behavior when too many branches fail +### ParallelResult -#### 2. ParallelContext -Manages the lifecycle of parallel branches: +`ParallelResult` is a summary of the parallel execution: -```java -public class ParallelContext implements AutoCloseable { - // Create branches - public DurableFuture branch(String name, Class resultType, Function func); - public DurableFuture branch(String name, TypeToken resultType, Function func); - - // Wait for completion - public void join(); - - // AutoCloseable ensures join() is called - public void close(); -} -``` +| Field | Description | +|-------|-------------| +| `size()` | Total number of registered branches | +| `succeeded()` | Number of branches that succeeded | +| `failed()` | Number of branches that failed | +| `completionStatus()` | Why the operation completed (`ALL_COMPLETED`, `MIN_SUCCESSFUL_REACHED`, `FAILURE_TOLERANCE_EXCEEDED`) | -#### 3. DurableContext Integration -Add single method to existing `DurableContext`: - -```java -public ParallelContext parallel(ParallelConfig config); -``` +### ParallelConfig -## Implementation Strategy +Configure concurrency limits and completion criteria: -### 1. Leverage Existing Child Context Infrastructure - -Each parallel branch will be implemented as a `ChildContextOperation`: -- **Isolation**: Each branch has its own checkpoint log -- **Replay Safety**: Branches replay independently -- **Error Handling**: Branch failures don't affect other branches directly - -### 2. Execution Flow - -1. **Branch Registration**: `branch()` calls create `ChildContextOperation` instances but don't execute immediately -2. **Execution Start**: `join()` triggers execution of branches respecting `maxConcurrency` -3. **Concurrency Control**: Use a queue to manage pending branches when `maxConcurrency` is reached -4. **Completion Logic**: Monitor success/failure counts against configuration thresholds -5. **Result Collection**: Return results via `DurableFuture` instances +```java +var config = ParallelConfig.builder() + .maxConcurrency(5) // at most 5 branches run at once + .completionConfig(CompletionConfig.allCompleted()) // default: run all branches + .build(); +var parallel = ctx.parallel("work", config); +``` -### 4. Error Handling Strategy +| Option | Default | Description | +|--------|---------|-------------| +| `maxConcurrency` | Unlimited | Maximum branches running simultaneously (must be ≥ 1) | +| `completionConfig` | `allCompleted()` | Controls when the operation stops starting new branches | -**Branch-Level Failures:** -- Individual branch failures are captured in their respective `DurableFuture` -- Don't immediately fail the entire parallel operation -- Count towards `failureCount` for threshold checking +#### CompletionConfig -**Parallel-Level Failures:** -- Exceed `toleratedFailureCount`: Stop starting new branches, wait for running ones -- Insufficient `minSuccessful`: Throw `ParallelExecutionException` after all branches complete -- Configuration validation errors: Fail immediately +`CompletionConfig` controls when the parallel operation stops starting new branches: -## Key Design Decisions +| Factory Method | Behavior | +|----------------|----------| +| `allCompleted()` (default) | All branches run regardless of failures | +| `allSuccessful()` | Stop if any branch fails (zero failures tolerated) | +| `firstSuccessful()` | Stop after the first branch succeeds | +| `minSuccessful(n)` | Stop after `n` branches succeed | +| `toleratedFailureCount(n)` | Stop after more than `n` failures | -### 1. Build on Child Contexts -- **Pros**: Reuses existing isolation and checkpointing logic -- **Cons**: Each branch has overhead of a separate child context -- **Decision**: Acceptable trade-off for clean isolation and replay safety +Note: `toleratedFailurePercentage` is not supported for parallel operations. -### 2. Eager vs Lazy Execution -- **Chosen**: Lazy execution (branches start only on `join()`) -- **Rationale**: Allows all branches to be registered before execution starts, enabling better concurrency planning +### ParallelBranchConfig -### 3. AutoCloseable Pattern -- **Purpose**: Ensures `join()` is called even if user forgets -- **Behavior**: If `close()` is called before `join()`, automatically call `join()` +Per-branch configuration can be provided: -### 4. Configuration Validation -- Validate at `ParallelConfig.build()` time: - - `maxConcurrency > 0` - - `minSuccessful >= -1` (where -1 means "all") - - `toleratedFailureCount >= 0` - - `minSuccessful + toleratedFailureCount <= total branches` (validated at runtime) +```java +parallel.branch("work", String.class, branchCtx -> doWork(), + ParallelBranchConfig.builder() + .serDes(customSerDes) + .build()); +``` -## Implementation Files +### Error Handling -### New Files to Create -1. `ParallelConfig.java` - Configuration builder -2. `ParallelContext.java` - User-facing parallel context -3. `operation/ParallelOperation.java` - Core execution logic -4. `exception/ParallelExecutionException.java` - Parallel-specific exceptions +Branch failures are captured individually. A failed branch throws its exception when you call `get()` on its `DurableFuture`: -### Files to Modify -1. `DurableContext.java` - Add `parallel()` method -2. `DurableFuture.java` - Ensure compatibility with parallel results (likely no changes needed) +```java +var parallel = ctx.parallel("work"); +var risky = parallel.branch("risky", String.class, branchCtx -> { + throw new RuntimeException("failed"); +}); +var safe = parallel.branch("safe", String.class, branchCtx -> { + return branchCtx.step("ok", String.class, stepCtx -> "done"); +}); + +ParallelResult result = parallel.get(); + +String safeResult = safe.get(); // "done" +try { + risky.get(); // throws +} catch (ParallelBranchFailedException e) { + // Branch failed — original exception could not be reconstructed +} +``` -## Testing Strategy +| Exception | When Thrown | +|-----------|-------------| +| `ParallelBranchFailedException` | Branch failed and the original exception could not be reconstructed | +| User's exception | Branch threw a reconstructable exception — propagated through `get()` | -### Unit Tests -- `ParallelConfigTest` - Configuration validation -- `ParallelOperationTest` - Core execution logic with mocked child contexts +### Checkpoint-and-Replay -### Integration Tests -- Success scenarios with various configurations -- Failure scenarios (exceeding thresholds) -- Concurrency limits -- Replay behavior +Parallel operations are fully durable. On replay after interruption: -### Example Implementation -- `ParallelExample.java` in examples module -- Demonstrate common patterns and error handling +- Completed branches return cached results without re-execution +- Incomplete branches resume from their last checkpoint +- Branches that never started execute fresh diff --git a/docs/core/steps.md b/docs/core/steps.md index 5bf06a053..770f631bb 100644 --- a/docs/core/steps.md +++ b/docs/core/steps.md @@ -119,4 +119,4 @@ var orderMap = ctx.step("fetch-orders", new TypeToken>() {}, stepCtx -> orderService.getOrdersByCustomer()); ``` -This is needed for the SDK to deserialize a checkpointed result and get the exact type to reconstruct. See [TypeToken and Type Erasure](docs/internal-design.md#typetoken-and-type-erasure) for technical details. \ No newline at end of file +This is needed for the SDK to deserialize a checkpointed result and get the exact type to reconstruct. See [TypeToken and Type Erasure](../design.md#custom-serdes-and-typetoken) for technical details. \ No newline at end of file diff --git a/docs/core/wait-for-condition.md b/docs/core/wait-for-condition.md index 66750055d..f173a6eba 100644 --- a/docs/core/wait-for-condition.md +++ b/docs/core/wait-for-condition.md @@ -4,6 +4,10 @@ ```java // Poll an order status until it ships +var config = WaitForConditionConfig.builder() + .initialState("PENDING") + .build(); + var status = ctx.waitForCondition( "wait-for-shipment", String.class, @@ -13,14 +17,14 @@ var status = ctx.waitForCondition( ? WaitForConditionResult.stopPolling(latest) : WaitForConditionResult.continuePolling(latest); }, - "PENDING"); + config); ``` The check function receives the current state and a `StepContext`, and returns a `WaitForConditionResult`: - `WaitForConditionResult.stopPolling(value)` — condition met, return `value` as the final result - `WaitForConditionResult.continuePolling(value)` — keep polling, pass `value` to the next check -The `initialState` parameter (`"PENDING"` above) is passed to the first check invocation. +The `initialState` is configured via `WaitForConditionConfig` (`"PENDING"` above) and is passed to the first check invocation. ## waitForConditionAsync() – Non-Blocking Polling @@ -36,7 +40,9 @@ DurableFuture shipmentFuture = ctx.waitForConditionAsync( ? WaitForConditionResult.stopPolling(latest) : WaitForConditionResult.continuePolling(latest); }, - "PENDING"); + WaitForConditionConfig.builder() + .initialState("PENDING") + .build()); // Do other work while polling runs var invoice = ctx.step("generate-invoice", String.class, stepCtx -> generateInvoice(orderId)); @@ -55,9 +61,10 @@ Use `WaitStrategies` to configure a different strategy: // Fixed 30-second delay, up to 10 attempts var config = WaitForConditionConfig.builder() .waitStrategy(WaitStrategies.fixedDelay(10, Duration.ofSeconds(30))) + .initialState("PENDING") .build(); -var result = ctx.waitForCondition("poll-status", String.class, checkFunc, "PENDING", config); +var result = ctx.waitForCondition("poll-status", String.class, checkFunc, config); ``` ```java @@ -94,19 +101,20 @@ var config = WaitForConditionConfig.builder() |--------|---------|-------------| | `waitStrategy()` | Exponential backoff (see above) | Controls delay between polls and max attempts | | `serDes()` | Handler default | Custom serialization for checkpointing state | +| `initialState()` | `null` | Initial state passed to the first check invocation | ## Error Handling | Exception | When Thrown | |-----------|-------------| -| `WaitForConditionException` | Max attempts exceeded (thrown by the wait strategy) | +| `WaitForConditionFailedException` | Max attempts exceeded (thrown by the wait strategy) | | `SerDesException` | Checkpointed state fails to deserialize on replay | | User's exception | Check function throws — propagated through `get()` | ```java try { - var result = ctx.waitForCondition("poll", String.class, checkFunc, "initial"); -} catch (WaitForConditionException e) { + var result = ctx.waitForCondition("poll", String.class, checkFunc); +} catch (WaitForConditionFailedException e) { // Max attempts exceeded — condition was never met } catch (IllegalStateException e) { // Check function threw this — handle accordingly @@ -127,4 +135,4 @@ WaitForConditionWaitStrategy customStrategy = (state, attempt) -> { }; ``` -The strategy receives the current state and attempt number, and returns a `Duration`. Throw `WaitForConditionException` to stop polling with an error. +The strategy receives the current state and attempt number, and returns a `Duration`. Throw `WaitForConditionFailedException` to stop polling with an error. diff --git a/docs/design.md b/docs/design.md index e5100d024..69d6b79de 100644 --- a/docs/design.md +++ b/docs/design.md @@ -30,20 +30,20 @@ aws-durable-execution-sdk-java/ ### User-Facing (DurableContext) ```java -// Synchronous step -T step(String name, Class type, Supplier func) -T step(String name, Class type, Supplier func, StepConfig config) -T step(String name, TypeToken type, Supplier func) -T step(String name, TypeToken type, Supplier func, StepConfig config) +// Synchronous step (func receives a StepContext) +T step(String name, Class type, Function func) +T step(String name, Class type, Function func, StepConfig config) +T step(String name, TypeToken type, Function func) +T step(String name, TypeToken type, Function func, StepConfig config) // Asynchronous step -DurableFuture stepAsync(String name, Class type, Supplier func) -DurableFuture stepAsync(String name, Class type, Supplier func, StepConfig config) -DurableFuture stepAsync(String name, TypeToken type, Supplier func) -DurableFuture stepAsync(String name, TypeToken type, Supplier func, StepConfig config) +DurableFuture stepAsync(String name, Class type, Function func) +DurableFuture stepAsync(String name, Class type, Function func, StepConfig config) +DurableFuture stepAsync(String name, TypeToken type, Function func) +DurableFuture stepAsync(String name, TypeToken type, Function func, StepConfig config) // Wait -void wait(String name, Duration duration) +Void wait(String name, Duration duration) // Asynchronous wait DurableFuture waitAsync(String name, Duration duration) @@ -59,6 +59,34 @@ DurableFuture invokeAsync(String name, String functionName, U payload, Class< DurableFuture invokeAsync(String name, String functionName, U payload, TypeToken resultType) DurableFuture invokeAsync(String name, String functionName, U payload, TypeToken resultType, InvokeConfig config) +// Callback +DurableCallbackFuture createCallback(String name, Class resultType) +DurableCallbackFuture createCallback(String name, Class resultType, CallbackConfig config) +DurableCallbackFuture createCallback(String name, TypeToken resultType) +DurableCallbackFuture createCallback(String name, TypeToken resultType, CallbackConfig config) + +// Wait for callback (combines callback creation + submitter step) +T waitForCallback(String name, Class resultType, BiConsumer func) +T waitForCallback(String name, TypeToken resultType, BiConsumer func) +T waitForCallback(String name, Class resultType, BiConsumer func, WaitForCallbackConfig config) +T waitForCallback(String name, TypeToken resultType, BiConsumer func, WaitForCallbackConfig config) + +DurableFuture waitForCallbackAsync(String name, Class resultType, BiConsumer func) +DurableFuture waitForCallbackAsync(String name, TypeToken resultType, BiConsumer func) +DurableFuture waitForCallbackAsync(String name, Class resultType, BiConsumer func, WaitForCallbackConfig config) +DurableFuture waitForCallbackAsync(String name, TypeToken resultType, BiConsumer func, WaitForCallbackConfig config) + +// Child context +T runInChildContext(String name, Class resultType, Function func) +T runInChildContext(String name, TypeToken resultType, Function func) +T runInChildContext(String name, Class resultType, Function func, RunInChildContextConfig config) +T runInChildContext(String name, TypeToken resultType, Function func, RunInChildContextConfig config) + +DurableFuture runInChildContextAsync(String name, Class resultType, Function func) +DurableFuture runInChildContextAsync(String name, TypeToken resultType, Function func) +DurableFuture runInChildContextAsync(String name, Class resultType, Function func, RunInChildContextConfig config) +DurableFuture runInChildContextAsync(String name, TypeToken resultType, Function func, RunInChildContextConfig config) + // Map MapResult map(String name, Collection items, Class resultType, MapFunction function) MapResult map(String name, Collection items, Class resultType, MapFunction function, MapConfig config) @@ -70,6 +98,21 @@ DurableFuture> mapAsync(String name, Collection items, Class DurableFuture> mapAsync(String name, Collection items, TypeToken resultType, MapFunction function) DurableFuture> mapAsync(String name, Collection items, TypeToken resultType, MapFunction function, MapConfig config) +// Parallel +ParallelDurableFuture parallel(String name) +ParallelDurableFuture parallel(String name, ParallelConfig config) + +// Wait for condition +T waitForCondition(String name, Class resultType, BiFunction> checkFunc) +T waitForCondition(String name, Class resultType, BiFunction> checkFunc, WaitForConditionConfig config) +T waitForCondition(String name, TypeToken resultType, BiFunction> checkFunc) +T waitForCondition(String name, TypeToken resultType, BiFunction> checkFunc, WaitForConditionConfig config) + +DurableFuture waitForConditionAsync(String name, Class resultType, BiFunction> checkFunc) +DurableFuture waitForConditionAsync(String name, Class resultType, BiFunction> checkFunc, WaitForConditionConfig config) +DurableFuture waitForConditionAsync(String name, TypeToken resultType, BiFunction> checkFunc) +DurableFuture waitForConditionAsync(String name, TypeToken resultType, BiFunction> checkFunc, WaitForConditionConfig config) + // Lambda context access Context getLambdaContext() ``` @@ -77,7 +120,9 @@ Context getLambdaContext() ### DurableFuture ```java -T get() // Blocks until complete, may suspend +T get() // Blocks until complete, may suspend +static List allOf(DurableFuture... futures) // Collect all results in order +static Object anyOf(DurableFuture... futures) // Return first completed result ``` ### Handler Configuration @@ -95,12 +140,14 @@ public class MyHandler extends DurableHandler { } ``` -| Option | Default | -|-----------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `lambdaClientBuilder` | Auto-created `LambdaClient` for current region, primed for performance (see [`DurableConfig.java`](../sdk/src/main/java/com/amazonaws/lambda/durable/DurableConfig.java)) | -| `serDes` | `JacksonSerDes` | -| `executorService` | `Executors.newCachedThreadPool()` (for user-defined operations only) | -| `loggerConfig` | `LoggerConfig.defaults()` (suppress replay logs) | +| Option | Default | +|-----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `lambdaClientBuilder` | Auto-created `LambdaClient` for current region, primed for performance (see [`DurableConfig.java`](../sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java)) | +| `serDes` | `JacksonSerDes` | +| `executorService` | `Executors.newCachedThreadPool()` (for user-defined operations only) | +| `loggerConfig` | `LoggerConfig.defaults()` (suppress replay logs) | +| `pollingStrategy` | Exponential backoff: 1s base, 2x rate, FULL jitter, 10s max | +| `checkpointDelay` | `Duration.ofSeconds(0)` (checkpoint as soon as possible) | ### Thread Pool Architecture @@ -148,7 +195,7 @@ protected DurableConfig createConfiguration() { ### Step Configuration ```java -context.step("name", Type.class, supplier, +context.step("name", Type.class, stepCtx -> doWork(), StepConfig.builder() .serDes(stepSpecificSerDes) .retryStrategy(RetryStrategies.exponentialBackoff(3, Duration.ofSeconds(1))) @@ -187,20 +234,28 @@ context.step("name", Type.class, supplier, ┌──────────────────────────────┐ ┌─────────────────────────────────┐ │ DurableContext │ │ ExecutionManager │ │ - User-facing API │ │ - State (ops, token) │ -│ - step(), stepAsync(), etc │ │ - Thread coordination │ +│ - step(), stepAsync() │ │ - Thread coordination │ │ - wait(), waitAsync() │ │ - Checkpoint batching │ -│ - waitForCondition() │ │ - Checkpoint response handling │ -│ - Operation ID counter │ │ - Polling │ -└──────────────────────────────┘ └─────────────────────────────────┘ +│ - invoke(), invokeAsync() │ │ - Checkpoint response handling │ +│ - createCallback() │ │ - Polling │ +│ - waitForCallback() │ └─────────────────────────────────┘ +│ - runInChildContext() │ +│ - map(), mapAsync() │ +│ - parallel() │ +│ - waitForCondition() │ +│ - Operation ID counter │ +└──────────────────────────────┘ │ │ ▼ ▼ ┌──────────────────────────────┐ ┌──────────────────────────────┐ │ Operations │ │ CheckpointBatcher │ │ - StepOperation │ │ - Queues requests │ │ - WaitOperation │ │ - Batches API calls (750KB) │ -│ - WaitForConditionOperation │ │ │ -│ - ConcurrencyOperation │ │ - Notifies via callback │ -│ - MapOperation │ └──────────────────────────────┘ +│ - InvokeOperation │ │ │ +│ - CallbackOperation │ │ - Notifies via callback │ +│ - WaitForConditionOperation │ └──────────────────────────────┘ +│ - ConcurrencyOperation │ +│ - MapOperation │ │ - ParallelOperation │ │ - ChildContextOperation │ │ - execute() / get() │ @@ -220,11 +275,28 @@ context.step("name", Type.class, supplier, software.amazon.lambda.durable ├── DurableHandler # Entry point ├── DurableExecutor # Lifecycle orchestration -├── DurableContext # User API +├── DurableContext # User API (interface) ├── DurableFuture # Async handle -├── StepConfig # Step configuration +├── DurableCallbackFuture # Callback future with callbackId +├── ParallelDurableFuture # Parallel branch registration + AutoCloseable +├── StepContext # Context passed to step functions ├── TypeToken # Generic type capture │ +├── config/ +│ ├── StepConfig # Step configuration (retry, semantics, serDes) +│ ├── InvokeConfig # Invoke configuration (payload/result serDes, tenantId) +│ ├── CallbackConfig # Callback configuration (timeout, heartbeat, serDes) +│ ├── WaitForCallbackConfig # Composite callback + step config +│ ├── MapConfig # Map configuration (concurrency, completion, serDes) +│ ├── ParallelConfig # Parallel configuration (concurrency, completion) +│ ├── ParallelBranchConfig # Per-branch configuration +│ ├── RunInChildContextConfig # Child context configuration +│ ├── WaitForConditionConfig # Polling configuration (wait strategy, serDes, initialState) +│ └── CompletionConfig # Completion criteria for map/parallel +│ +├── context/ +│ └── BaseContext # Base interface for DurableContext +│ ├── execution/ │ ├── ExecutionManager # Central coordinator │ ├── ExecutionMode # REPLAY or EXECUTION state @@ -254,6 +326,8 @@ software.amazon.lambda.durable │ ├── RetryStrategies # Presets │ ├── RetryDecision # shouldRetry + delay │ ├── JitterStrategy # Jitter options +│ ├── PollingStrategy # Backend polling interface +│ ├── PollingStrategies # Backend polling presets │ ├── WaitForConditionWaitStrategy # Polling delay interface │ └── WaitStrategies # Polling strategy factory + Presets │ @@ -262,9 +336,15 @@ software.amazon.lambda.durable │ └── LambdaDurableFunctionsClient # AWS SDK impl │ ├── model/ -│ ├── DurableExecutionInput # Lambda input -│ ├── DurableExecutionOutput # Lambda output -│ └── ExecutionStatus # SUCCEEDED/PENDING/FAILED +│ ├── DurableExecutionInput # Lambda input +│ ├── DurableExecutionOutput # Lambda output +│ ├── ExecutionStatus # SUCCEEDED/PENDING/FAILED +│ ├── MapResult # Map operation result container +│ ├── MapResult.MapResultItem # Per-item result (status, result, error) +│ ├── MapResult.MapError # Serializable error details +│ ├── ParallelResult # Parallel operation summary +│ ├── ConcurrencyCompletionStatus # ALL_COMPLETED/MIN_SUCCESSFUL_REACHED/FAILURE_TOLERANCE_EXCEEDED +│ └── WaitForConditionResult # Check function return type (value + isDone) │ ├── serde/ │ ├── SerDes # Interface @@ -273,10 +353,25 @@ software.amazon.lambda.durable │ └── exception/ ├── DurableExecutionException + ├── UnrecoverableDurableExecutionException ├── NonDeterministicExecutionException + ├── IllegalDurableOperationException + ├── DurableOperationException + ├── StepException ├── StepFailedException ├── StepInterruptedException - ├── WaitForConditionException + ├── InvokeException + ├── InvokeFailedException + ├── InvokeTimedOutException + ├── InvokeStoppedException + ├── CallbackException + ├── CallbackFailedException + ├── CallbackTimeoutException + ├── CallbackSubmitterException + ├── WaitForConditionFailedException + ├── ChildContextFailedException + ├── MapIterationFailedException + ├── ParallelBranchFailedException └── SerDesException ``` @@ -294,13 +389,13 @@ sequenceDiagram participant EM as ExecutionManager participant Backend - UC->>DC: step("name", Type.class, func) + UC->>DC: step("name", Type.class, stepCtx -> doWork()) DC->>SO: new StepOperation(...) DC->>SO: execute() SO->>EM: sendOperationUpdate(START) EM->>Backend: checkpoint(START) - SO->>SO: func.get() [execute user code] + SO->>SO: func.apply(stepContext) [execute user code] SO->>EM: sendOperationUpdate(SUCCEED) EM->>Backend: checkpoint(SUCCEED) @@ -367,21 +462,46 @@ sequenceDiagram ``` DurableExecutionException (base) -├── StepFailedException # Step failed after all retries -├── StepInterruptedException # Step interrupted (AT_MOST_ONCE) -├── WaitForConditionException # Polling exceeded max attempts -├── NonDeterministicExecutionException # Replay mismatch -└── SerDesException # Serialization error - -SuspendExecutionException # Internal: triggers suspension (not user-facing) +├── SerDesException # Serialization error +├── UnrecoverableDurableExecutionException # Execution cannot be recovered +│ ├── NonDeterministicExecutionException # Replay mismatch +│ └── IllegalDurableOperationException # Illegal operation detected +└── DurableOperationException # Operation-specific error + ├── StepException # Step operation base + │ ├── StepFailedException # Step failed after all retries + │ └── StepInterruptedException # Step interrupted (AT_MOST_ONCE) + ├── InvokeException # Invoke operation base + │ ├── InvokeFailedException # Invoked function returned error + │ ├── InvokeTimedOutException # Invoke exceeded timeout + │ └── InvokeStoppedException # Invoke stopped before completion + ├── CallbackException # Callback operation base + │ ├── CallbackFailedException # External system sent error + │ ├── CallbackTimeoutException # Callback exceeded timeout + │ └── CallbackSubmitterException # Submitter step failed + ├── WaitForConditionFailedException # Polling exceeded max attempts or failed + ├── ChildContextFailedException # Child context failed (original exception not reconstructable) + ├── MapIterationFailedException # Map iteration failed (original exception not reconstructable) + └── ParallelBranchFailedException # Parallel branch failed (original exception not reconstructable) + +SuspendExecutionException # Internal: triggers suspension (not user-facing) ``` | Exception | Trigger | Recovery | |-----------|---------|----------| | `StepFailedException` | Step throws after exhausting retries | Catch in handler or let fail | | `StepInterruptedException` | AT_MOST_ONCE step interrupted mid-execution | Treat as failure | -| `WaitForConditionException` | waitForCondition exceeded max polling attempts | Catch in handler or let fail | +| `InvokeFailedException` | Invoked function returned an error | Catch in handler or let fail | +| `InvokeTimedOutException` | Invoke exceeded its timeout | Catch in handler or let fail | +| `InvokeStoppedException` | Invoke stopped before completion | Catch in handler or let fail | +| `CallbackFailedException` | External system sent an error response | Catch in handler or let fail | +| `CallbackTimeoutException` | Callback exceeded its timeout | Catch in handler or let fail | +| `CallbackSubmitterException` | Submitter step failed to submit callback | Catch in handler or let fail | +| `WaitForConditionFailedException` | waitForCondition exceeded max polling attempts or check function threw | Catch in handler or let fail | +| `ChildContextFailedException` | Child context failed and original exception not reconstructable | Catch in handler or let fail | +| `MapIterationFailedException` | Map iteration failed and original exception not reconstructable | Catch in handler or let fail | +| `ParallelBranchFailedException` | Parallel branch failed and original exception not reconstructable | Catch in handler or let fail | | `NonDeterministicExecutionException` | Replay finds different operation than expected | Bug in handler (non-deterministic code) | +| `IllegalDurableOperationException` | Illegal operation detected | Bug in handler | | `SerDesException` | Jackson fails to serialize/deserialize | Fix data model or custom SerDes | --- @@ -446,7 +566,7 @@ If result > 6MB Lambda limit: Multiple concurrent operations may checkpoint simultaneously. `CheckpointBatcher` batches these into single API calls to reduce latency and stay within the 750KB request limit. -Currently uses micro-batching: batches only what accumulates during the polling thread scheduling overhead. Early tests suggest this window may be too short for effective batching—an artificial delay might need to be introduced. +The `checkpointDelay` configuration option (default: 0) controls how long the batcher waits before flushing, allowing more operations to accumulate in a single batch. For functions with many concurrent operations, setting a small delay (e.g., 10ms) can significantly reduce the number of API calls. ``` StepOperation 1 ──┐