-
Notifications
You must be signed in to change notification settings - Fork 7
docs: Update outdated design and API documentation #327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ayushiahjolia
wants to merge
1
commit into
main
Choose a base branch
from
docs_update
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,143 +1,121 @@ | ||
| # Parallel Operations Design Plan | ||
| ## parallel() – Concurrent Branch Execution | ||
|
|
||
| ## Overview | ||
|
|
||
| Add parallel execution capability to the AWS Lambda Durable Execution SDK, allowing multiple branches to run concurrently within a single durable function execution. | ||
|
|
||
| ## API Design | ||
|
|
||
| ### User Interface | ||
| `parallel()` runs multiple independent branches concurrently, each in its own child context. Branches are registered via `branch()` and execute immediately (respecting `maxConcurrency`). The operation completes when all branches finish or completion criteria are met. | ||
|
|
||
| ```java | ||
| try (var parallelContext = ctx.parallel(ParallelConfig.builder().build())) { | ||
| DurableFuture<Boolean> task1 = parallelContext.branch("validate", Boolean.class, branchContext -> validate()); | ||
| DurableFuture<String> task2 = parallelContext.branch("process", String.class, branchContext -> process()); | ||
| parallelContext.join(); // Wait for completion based on config | ||
|
|
||
| // Access results | ||
| Boolean validated = task1.get(); | ||
| String processed = task2.get(); | ||
| } | ||
| // Basic parallel execution | ||
| var parallel = ctx.parallel("validate-and-process"); | ||
| DurableFuture<Boolean> task1 = parallel.branch("validate", Boolean.class, branchCtx -> { | ||
| return branchCtx.step("check", Boolean.class, stepCtx -> validate()); | ||
| }); | ||
| DurableFuture<String> task2 = parallel.branch("process", String.class, branchCtx -> { | ||
| return branchCtx.step("work", String.class, stepCtx -> process()); | ||
| }); | ||
|
|
||
| // Wait for all branches and get the aggregate result | ||
| ParallelResult result = parallel.get(); | ||
|
|
||
| // Access individual branch results | ||
| Boolean validated = task1.get(); | ||
| String processed = task2.get(); | ||
| ``` | ||
|
|
||
| ### Core Components | ||
|
|
||
| #### 1. ParallelConfig | ||
| Configuration object controlling parallel execution behavior: | ||
| `ParallelDurableFuture` implements `AutoCloseable` — calling `close()` triggers `get()` if it hasn't been called yet, ensuring all branches complete. | ||
|
|
||
| ```java | ||
| ParallelConfig config = ParallelConfig.builder() | ||
| .maxConcurrency(5) // Max branches running simultaneously | ||
| .minSuccessful(3) // Minimum successful branches required (-1 = all) | ||
| .toleratedFailureCount(2) // Max failures before stopping execution | ||
| .build(); | ||
| // AutoCloseable pattern | ||
| try (var parallel = ctx.parallel("work")) { | ||
| parallel.branch("a", String.class, branchCtx -> branchCtx.step("a1", String.class, stepCtx -> "a")); | ||
| parallel.branch("b", String.class, branchCtx -> branchCtx.step("b1", String.class, stepCtx -> "b")); | ||
| } // close() calls get() automatically | ||
| ``` | ||
|
|
||
| **Configuration Rules:** | ||
| - `maxConcurrency`: Controls resource usage, prevents overwhelming the system | ||
| - `minSuccessful`: Enables "best effort" scenarios where not all branches need to succeed | ||
| - `toleratedFailureCount`: Fail-fast behavior when too many branches fail | ||
| ### ParallelResult | ||
|
|
||
| #### 2. ParallelContext | ||
| Manages the lifecycle of parallel branches: | ||
| `ParallelResult` is a summary of the parallel execution: | ||
|
|
||
| ```java | ||
| public class ParallelContext implements AutoCloseable { | ||
| // Create branches | ||
| public <T> DurableFuture<T> branch(String name, Class<T> resultType, Function<DurableContext, T> func); | ||
| public <T> DurableFuture<T> branch(String name, TypeToken<T> resultType, Function<DurableContext, T> func); | ||
|
|
||
| // Wait for completion | ||
| public void join(); | ||
|
|
||
| // AutoCloseable ensures join() is called | ||
| public void close(); | ||
| } | ||
| ``` | ||
| | Field | Description | | ||
| |-------|-------------| | ||
| | `size()` | Total number of registered branches | | ||
| | `succeeded()` | Number of branches that succeeded | | ||
| | `failed()` | Number of branches that failed | | ||
| | `completionStatus()` | Why the operation completed (`ALL_COMPLETED`, `MIN_SUCCESSFUL_REACHED`, `FAILURE_TOLERANCE_EXCEEDED`) | | ||
|
|
||
| #### 3. DurableContext Integration | ||
| Add single method to existing `DurableContext`: | ||
|
|
||
| ```java | ||
| public ParallelContext parallel(ParallelConfig config); | ||
| ``` | ||
| ### ParallelConfig | ||
|
|
||
| ## Implementation Strategy | ||
| Configure concurrency limits and completion criteria: | ||
|
|
||
| ### 1. Leverage Existing Child Context Infrastructure | ||
|
|
||
| Each parallel branch will be implemented as a `ChildContextOperation`: | ||
| - **Isolation**: Each branch has its own checkpoint log | ||
| - **Replay Safety**: Branches replay independently | ||
| - **Error Handling**: Branch failures don't affect other branches directly | ||
|
|
||
| ### 2. Execution Flow | ||
|
|
||
| 1. **Branch Registration**: `branch()` calls create `ChildContextOperation` instances but don't execute immediately | ||
| 2. **Execution Start**: `join()` triggers execution of branches respecting `maxConcurrency` | ||
| 3. **Concurrency Control**: Use a queue to manage pending branches when `maxConcurrency` is reached | ||
| 4. **Completion Logic**: Monitor success/failure counts against configuration thresholds | ||
| 5. **Result Collection**: Return results via `DurableFuture` instances | ||
| ```java | ||
| var config = ParallelConfig.builder() | ||
| .maxConcurrency(5) // at most 5 branches run at once | ||
| .completionConfig(CompletionConfig.allCompleted()) // default: run all branches | ||
| .build(); | ||
|
|
||
| var parallel = ctx.parallel("work", config); | ||
| ``` | ||
|
|
||
| ### 4. Error Handling Strategy | ||
| | Option | Default | Description | | ||
| |--------|---------|-------------| | ||
| | `maxConcurrency` | Unlimited | Maximum branches running simultaneously (must be ≥ 1) | | ||
| | `completionConfig` | `allCompleted()` | Controls when the operation stops starting new branches | | ||
|
|
||
| **Branch-Level Failures:** | ||
| - Individual branch failures are captured in their respective `DurableFuture` | ||
| - Don't immediately fail the entire parallel operation | ||
| - Count towards `failureCount` for threshold checking | ||
| #### CompletionConfig | ||
|
|
||
| **Parallel-Level Failures:** | ||
| - Exceed `toleratedFailureCount`: Stop starting new branches, wait for running ones | ||
| - Insufficient `minSuccessful`: Throw `ParallelExecutionException` after all branches complete | ||
| - Configuration validation errors: Fail immediately | ||
| `CompletionConfig` controls when the parallel operation stops starting new branches: | ||
|
|
||
| ## Key Design Decisions | ||
| | Factory Method | Behavior | | ||
| |----------------|----------| | ||
| | `allCompleted()` (default) | All branches run regardless of failures | | ||
| | `allSuccessful()` | Stop if any branch fails (zero failures tolerated) | | ||
| | `firstSuccessful()` | Stop after the first branch succeeds | | ||
| | `minSuccessful(n)` | Stop after `n` branches succeed | | ||
| | `toleratedFailureCount(n)` | Stop after more than `n` failures | | ||
|
|
||
| ### 1. Build on Child Contexts | ||
| - **Pros**: Reuses existing isolation and checkpointing logic | ||
| - **Cons**: Each branch has overhead of a separate child context | ||
| - **Decision**: Acceptable trade-off for clean isolation and replay safety | ||
| Note: `toleratedFailurePercentage` is not supported for parallel operations. | ||
|
|
||
| ### 2. Eager vs Lazy Execution | ||
| - **Chosen**: Lazy execution (branches start only on `join()`) | ||
| - **Rationale**: Allows all branches to be registered before execution starts, enabling better concurrency planning | ||
| ### ParallelBranchConfig | ||
|
|
||
| ### 3. AutoCloseable Pattern | ||
| - **Purpose**: Ensures `join()` is called even if user forgets | ||
| - **Behavior**: If `close()` is called before `join()`, automatically call `join()` | ||
| Per-branch configuration can be provided: | ||
|
|
||
| ### 4. Configuration Validation | ||
| - Validate at `ParallelConfig.build()` time: | ||
| - `maxConcurrency > 0` | ||
| - `minSuccessful >= -1` (where -1 means "all") | ||
| - `toleratedFailureCount >= 0` | ||
| - `minSuccessful + toleratedFailureCount <= total branches` (validated at runtime) | ||
| ```java | ||
| parallel.branch("work", String.class, branchCtx -> doWork(), | ||
| ParallelBranchConfig.builder() | ||
| .serDes(customSerDes) | ||
| .build()); | ||
| ``` | ||
|
|
||
| ## Implementation Files | ||
| ### Error Handling | ||
|
|
||
| ### New Files to Create | ||
| 1. `ParallelConfig.java` - Configuration builder | ||
| 2. `ParallelContext.java` - User-facing parallel context | ||
| 3. `operation/ParallelOperation.java` - Core execution logic | ||
| 4. `exception/ParallelExecutionException.java` - Parallel-specific exceptions | ||
| Branch failures are captured individually. A failed branch throws its exception when you call `get()` on its `DurableFuture`: | ||
|
|
||
| ### Files to Modify | ||
| 1. `DurableContext.java` - Add `parallel()` method | ||
| 2. `DurableFuture.java` - Ensure compatibility with parallel results (likely no changes needed) | ||
| ```java | ||
| var parallel = ctx.parallel("work"); | ||
| var risky = parallel.branch("risky", String.class, branchCtx -> { | ||
| throw new RuntimeException("failed"); | ||
| }); | ||
| var safe = parallel.branch("safe", String.class, branchCtx -> { | ||
| return branchCtx.step("ok", String.class, stepCtx -> "done"); | ||
| }); | ||
|
|
||
| ParallelResult result = parallel.get(); | ||
|
|
||
| String safeResult = safe.get(); // "done" | ||
| try { | ||
| risky.get(); // throws | ||
| } catch (ParallelBranchFailedException e) { | ||
| // Branch failed — original exception could not be reconstructed | ||
| } | ||
| ``` | ||
|
|
||
| ## Testing Strategy | ||
| | Exception | When Thrown | | ||
| |-----------|-------------| | ||
| | `ParallelBranchFailedException` | Branch failed and the original exception could not be reconstructed | | ||
| | User's exception | Branch threw a reconstructable exception — propagated through `get()` | | ||
|
|
||
| ### Unit Tests | ||
| - `ParallelConfigTest` - Configuration validation | ||
| - `ParallelOperationTest` - Core execution logic with mocked child contexts | ||
| ### Checkpoint-and-Replay | ||
|
|
||
| ### Integration Tests | ||
| - Success scenarios with various configurations | ||
| - Failure scenarios (exceeding thresholds) | ||
| - Concurrency limits | ||
| - Replay behavior | ||
| Parallel operations are fully durable. On replay after interruption: | ||
|
|
||
| ### Example Implementation | ||
| - `ParallelExample.java` in examples module | ||
| - Demonstrate common patterns and error handling | ||
| - Completed branches return cached results without re-execution | ||
| - Incomplete branches resume from their last checkpoint | ||
| - Branches that never started execute fresh |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also mention
SuspendExecutionException?