Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/adr/003-completable-future-based-coordination.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# ADR-003: CompletableFuture-Based Operation Coordination

**Status:** Review
**Status:** Accepted
**Date:** 2026-02-18

## Context
Expand Down
2 changes: 2 additions & 0 deletions docs/advanced/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,7 @@ public class OrderProcessor extends DurableHandler<Order, OrderResult> {
| `withSerDes()` | Serializer for step results | Jackson with default settings |
| `withExecutorService()` | Thread pool for user-defined operations | Cached daemon thread pool |
| `withLoggerConfig()` | Logger behavior configuration | Suppress logs during replay |
| `withPollingStrategy()` | Backend polling strategy | Exponential backoff: 1s base, 2x rate, FULL jitter, 10s max |
| `withCheckpointDelay()` | How often the SDK checkpoints updates | `Duration.ofSeconds(0)` (as soon as possible) |

The `withExecutorService()` option configures the thread pool used for running user-defined operations. Internal SDK coordination (checkpoint batching, polling) runs on an SDK-managed thread pool.
15 changes: 10 additions & 5 deletions docs/advanced/error-handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,26 @@ The SDK throws specific exceptions to help you handle different failure scenario

```
DurableExecutionException - General durable exception
├── NonDeterministicExecutionException - Code changed between original execution and replay. Fix code to maintain determinism; don't change step order/names.
├── SerDesException - Serialization and deserialization exception.
├── UnrecoverableDurableExecutionException - Execution cannot be recovered. The durable execution will be immediately terminated.
│ ├── NonDeterministicExecutionException - Code changed between original execution and replay. Fix code to maintain determinism; don't change step order/names.
│ └── IllegalDurableOperationException - An illegal operation was detected. The execution will be immediately terminated.
└── DurableOperationException - General operation exception
├── StepException - General Step exception
│ ├── StepFailedException - Step exhausted all retry attempts.Catch to implement fallback logic or let execution fail.
│ ├── StepFailedException - Step exhausted all retry attempts. Catch to implement fallback logic or let execution fail.
│ └── StepInterruptedException - `AT_MOST_ONCE` step was interrupted before completion. Implement manual recovery (check if operation completed externally)
├── InvokeException - General chained invocation exception
│ ├── InvokeFailedException - Chained invocation failed. Handle the error or propagate failure.
│ ├── InvokeTimedoutException - Chained invocation timed out. Handle the error or propagate failure.
│ ├── InvokeTimedOutException - Chained invocation timed out. Handle the error or propagate failure.
│ └── InvokeStoppedException - Chained invocation stopped. Handle the error or propagate failure.
├── CallbackException - General callback exception
│ ├── CallbackFailedException - External system sent an error response to the callback. Handle the error or propagate failure
│ └── CallbackTimeoutException - Callback exceeded its timeout duration. Handle the error or propagate the failure
│ ├── CallbackTimeoutException - Callback exceeded its timeout duration. Handle the error or propagate the failure
│ └── CallbackSubmitterException - Submitter step failed to submit the callback. Handle the error or propagate failure
├── WaitForConditionFailedException- waitForCondition exceeded max polling attempts or failed. Catch to implement fallback logic.
└── ChildContextFailedException - Child context failed and the original exception could not be reconstructed
├── ChildContextFailedException - Child context failed and the original exception could not be reconstructed
├── MapIterationFailedException - Map iteration failed and the original exception could not be reconstructed
└── ParallelBranchFailedException - Parallel branch failed and the original exception could not be reconstructed
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also mention SuspendExecutionException?

```

```java
Expand Down
2 changes: 1 addition & 1 deletion docs/core/callbacks.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var waitForCallbackConfig = WaitForCallbackConfig.builder()
.callbackConfig(config)
.stepConfig(StepConfig.builder().retryStrategy(...).build())
.build();
ctx.waitForCallback("approval", String.class, callbackId -> sendApprovalRequest(callbackId), waitForCallbackConfig);
ctx.waitForCallback("approval", String.class, (callbackId, stepCtx) -> sendApprovalRequest(callbackId), waitForCallbackConfig);
```

| Option | Description |
Expand Down
3 changes: 1 addition & 2 deletions docs/core/invoke.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ var result = ctx.invoke("invoke-function",
Result.class,
InvokeConfig.builder()
.payloadSerDes(...) // payload serializer
.resultSerDes(...) // result deserializer
.timeout(Duration.of(...)) // wait timeout
.serDes(...) // result deserializer
.tenantId(...) // Lambda tenantId
.build()
);
Expand Down
10 changes: 5 additions & 5 deletions docs/core/map.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ Each `MapResultItem<T>` contains:

| Field | Description |
|-------|-------------|
| `status()` | `SUCCEEDED`, `FAILED`, or `NOT_STARTED` |
| `result()` | The result value, or `null` if failed/not started |
| `error()` | The error details as `MapError`, or `null` if succeeded/not started |
| `status()` | `SUCCEEDED`, `FAILED`, or `SKIPPED` |
| `result()` | The result value, or `null` if failed/skipped |
| `error()` | The error details as `MapError`, or `null` if succeeded/skipped |

### MapError

Expand Down Expand Up @@ -135,10 +135,10 @@ var config = MapConfig.builder()
.build();

var result = ctx.map("find-two", items, String.class, fn, config);
assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason());
assertEquals(ConcurrencyCompletionStatus.MIN_SUCCESSFUL_REACHED, result.completionReason());
```

When early termination triggers, items that were never started have `NOT_STARTED` status with `null` for both result and error in the `MapResult`.
When early termination triggers, items that were never started have `SKIPPED` status with `null` for both result and error in the `MapResult`.

### Checkpoint-and-Replay

Expand Down
204 changes: 91 additions & 113 deletions docs/core/parallel.md
Original file line number Diff line number Diff line change
@@ -1,143 +1,121 @@
# Parallel Operations Design Plan
## parallel() – Concurrent Branch Execution

## Overview

Add parallel execution capability to the AWS Lambda Durable Execution SDK, allowing multiple branches to run concurrently within a single durable function execution.

## API Design

### User Interface
`parallel()` runs multiple independent branches concurrently, each in its own child context. Branches are registered via `branch()` and execute immediately (respecting `maxConcurrency`). The operation completes when all branches finish or completion criteria are met.

```java
try (var parallelContext = ctx.parallel(ParallelConfig.builder().build())) {
DurableFuture<Boolean> task1 = parallelContext.branch("validate", Boolean.class, branchContext -> validate());
DurableFuture<String> task2 = parallelContext.branch("process", String.class, branchContext -> process());
parallelContext.join(); // Wait for completion based on config

// Access results
Boolean validated = task1.get();
String processed = task2.get();
}
// Basic parallel execution
var parallel = ctx.parallel("validate-and-process");
DurableFuture<Boolean> task1 = parallel.branch("validate", Boolean.class, branchCtx -> {
return branchCtx.step("check", Boolean.class, stepCtx -> validate());
});
DurableFuture<String> task2 = parallel.branch("process", String.class, branchCtx -> {
return branchCtx.step("work", String.class, stepCtx -> process());
});

// Wait for all branches and get the aggregate result
ParallelResult result = parallel.get();

// Access individual branch results
Boolean validated = task1.get();
String processed = task2.get();
```

### Core Components

#### 1. ParallelConfig
Configuration object controlling parallel execution behavior:
`ParallelDurableFuture` implements `AutoCloseable` — calling `close()` triggers `get()` if it hasn't been called yet, ensuring all branches complete.

```java
ParallelConfig config = ParallelConfig.builder()
.maxConcurrency(5) // Max branches running simultaneously
.minSuccessful(3) // Minimum successful branches required (-1 = all)
.toleratedFailureCount(2) // Max failures before stopping execution
.build();
// AutoCloseable pattern
try (var parallel = ctx.parallel("work")) {
parallel.branch("a", String.class, branchCtx -> branchCtx.step("a1", String.class, stepCtx -> "a"));
parallel.branch("b", String.class, branchCtx -> branchCtx.step("b1", String.class, stepCtx -> "b"));
} // close() calls get() automatically
```

**Configuration Rules:**
- `maxConcurrency`: Controls resource usage, prevents overwhelming the system
- `minSuccessful`: Enables "best effort" scenarios where not all branches need to succeed
- `toleratedFailureCount`: Fail-fast behavior when too many branches fail
### ParallelResult

#### 2. ParallelContext
Manages the lifecycle of parallel branches:
`ParallelResult` is a summary of the parallel execution:

```java
public class ParallelContext implements AutoCloseable {
// Create branches
public <T> DurableFuture<T> branch(String name, Class<T> resultType, Function<DurableContext, T> func);
public <T> DurableFuture<T> branch(String name, TypeToken<T> resultType, Function<DurableContext, T> func);

// Wait for completion
public void join();

// AutoCloseable ensures join() is called
public void close();
}
```
| Field | Description |
|-------|-------------|
| `size()` | Total number of registered branches |
| `succeeded()` | Number of branches that succeeded |
| `failed()` | Number of branches that failed |
| `completionStatus()` | Why the operation completed (`ALL_COMPLETED`, `MIN_SUCCESSFUL_REACHED`, `FAILURE_TOLERANCE_EXCEEDED`) |

#### 3. DurableContext Integration
Add single method to existing `DurableContext`:

```java
public ParallelContext parallel(ParallelConfig config);
```
### ParallelConfig

## Implementation Strategy
Configure concurrency limits and completion criteria:

### 1. Leverage Existing Child Context Infrastructure

Each parallel branch will be implemented as a `ChildContextOperation`:
- **Isolation**: Each branch has its own checkpoint log
- **Replay Safety**: Branches replay independently
- **Error Handling**: Branch failures don't affect other branches directly

### 2. Execution Flow

1. **Branch Registration**: `branch()` calls create `ChildContextOperation` instances but don't execute immediately
2. **Execution Start**: `join()` triggers execution of branches respecting `maxConcurrency`
3. **Concurrency Control**: Use a queue to manage pending branches when `maxConcurrency` is reached
4. **Completion Logic**: Monitor success/failure counts against configuration thresholds
5. **Result Collection**: Return results via `DurableFuture` instances
```java
var config = ParallelConfig.builder()
.maxConcurrency(5) // at most 5 branches run at once
.completionConfig(CompletionConfig.allCompleted()) // default: run all branches
.build();

var parallel = ctx.parallel("work", config);
```

### 4. Error Handling Strategy
| Option | Default | Description |
|--------|---------|-------------|
| `maxConcurrency` | Unlimited | Maximum branches running simultaneously (must be ≥ 1) |
| `completionConfig` | `allCompleted()` | Controls when the operation stops starting new branches |

**Branch-Level Failures:**
- Individual branch failures are captured in their respective `DurableFuture`
- Don't immediately fail the entire parallel operation
- Count towards `failureCount` for threshold checking
#### CompletionConfig

**Parallel-Level Failures:**
- Exceed `toleratedFailureCount`: Stop starting new branches, wait for running ones
- Insufficient `minSuccessful`: Throw `ParallelExecutionException` after all branches complete
- Configuration validation errors: Fail immediately
`CompletionConfig` controls when the parallel operation stops starting new branches:

## Key Design Decisions
| Factory Method | Behavior |
|----------------|----------|
| `allCompleted()` (default) | All branches run regardless of failures |
| `allSuccessful()` | Stop if any branch fails (zero failures tolerated) |
| `firstSuccessful()` | Stop after the first branch succeeds |
| `minSuccessful(n)` | Stop after `n` branches succeed |
| `toleratedFailureCount(n)` | Stop after more than `n` failures |

### 1. Build on Child Contexts
- **Pros**: Reuses existing isolation and checkpointing logic
- **Cons**: Each branch has overhead of a separate child context
- **Decision**: Acceptable trade-off for clean isolation and replay safety
Note: `toleratedFailurePercentage` is not supported for parallel operations.

### 2. Eager vs Lazy Execution
- **Chosen**: Lazy execution (branches start only on `join()`)
- **Rationale**: Allows all branches to be registered before execution starts, enabling better concurrency planning
### ParallelBranchConfig

### 3. AutoCloseable Pattern
- **Purpose**: Ensures `join()` is called even if user forgets
- **Behavior**: If `close()` is called before `join()`, automatically call `join()`
Per-branch configuration can be provided:

### 4. Configuration Validation
- Validate at `ParallelConfig.build()` time:
- `maxConcurrency > 0`
- `minSuccessful >= -1` (where -1 means "all")
- `toleratedFailureCount >= 0`
- `minSuccessful + toleratedFailureCount <= total branches` (validated at runtime)
```java
parallel.branch("work", String.class, branchCtx -> doWork(),
ParallelBranchConfig.builder()
.serDes(customSerDes)
.build());
```

## Implementation Files
### Error Handling

### New Files to Create
1. `ParallelConfig.java` - Configuration builder
2. `ParallelContext.java` - User-facing parallel context
3. `operation/ParallelOperation.java` - Core execution logic
4. `exception/ParallelExecutionException.java` - Parallel-specific exceptions
Branch failures are captured individually. A failed branch throws its exception when you call `get()` on its `DurableFuture`:

### Files to Modify
1. `DurableContext.java` - Add `parallel()` method
2. `DurableFuture.java` - Ensure compatibility with parallel results (likely no changes needed)
```java
var parallel = ctx.parallel("work");
var risky = parallel.branch("risky", String.class, branchCtx -> {
throw new RuntimeException("failed");
});
var safe = parallel.branch("safe", String.class, branchCtx -> {
return branchCtx.step("ok", String.class, stepCtx -> "done");
});

ParallelResult result = parallel.get();

String safeResult = safe.get(); // "done"
try {
risky.get(); // throws
} catch (ParallelBranchFailedException e) {
// Branch failed — original exception could not be reconstructed
}
```

## Testing Strategy
| Exception | When Thrown |
|-----------|-------------|
| `ParallelBranchFailedException` | Branch failed and the original exception could not be reconstructed |
| User's exception | Branch threw a reconstructable exception — propagated through `get()` |

### Unit Tests
- `ParallelConfigTest` - Configuration validation
- `ParallelOperationTest` - Core execution logic with mocked child contexts
### Checkpoint-and-Replay

### Integration Tests
- Success scenarios with various configurations
- Failure scenarios (exceeding thresholds)
- Concurrency limits
- Replay behavior
Parallel operations are fully durable. On replay after interruption:

### Example Implementation
- `ParallelExample.java` in examples module
- Demonstrate common patterns and error handling
- Completed branches return cached results without re-execution
- Incomplete branches resume from their last checkpoint
- Branches that never started execute fresh
2 changes: 1 addition & 1 deletion docs/core/steps.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,4 @@ var orderMap = ctx.step("fetch-orders", new TypeToken<Map<String, Order>>() {},
stepCtx -> orderService.getOrdersByCustomer());
```

This is needed for the SDK to deserialize a checkpointed result and get the exact type to reconstruct. See [TypeToken and Type Erasure](docs/internal-design.md#typetoken-and-type-erasure) for technical details.
This is needed for the SDK to deserialize a checkpointed result and get the exact type to reconstruct. See [TypeToken and Type Erasure](../design.md#custom-serdes-and-typetoken) for technical details.
Loading
Loading