Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Docs/durable-execution-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ For better observability, you can name individual branches (matching the JS SDK
```csharp
// Named branches for easier debugging and testing
var results = await context.ParallelAsync(
new NamedBranch<object>[]
new DurableBranch<object>[]
{
new("fetch_user", async (ctx) => await ctx.StepAsync(async (step) => await FetchUserData(userId))),
new("fetch_orders", async (ctx) => await ctx.StepAsync(async (step) => await FetchOrderHistory(userId))),
Expand Down Expand Up @@ -1416,6 +1416,13 @@ public class CompletionConfig
{
public int? MinSuccessful { get; set; }
public int? ToleratedFailureCount { get; set; }
/// <summary>
/// Maximum tolerated failure ratio, expressed as a value in the range
/// <c>0.0</c> to <c>1.0</c> (inclusive). For example, <c>0.25</c> means
/// "tolerate up to 25% failures; fail when the failure ratio strictly
/// exceeds 25%". <c>null</c> = no ratio-based threshold. Validated by the
/// setter; out-of-range values throw <see cref="ArgumentOutOfRangeException"/>.
/// </summary>
public double? ToleratedFailurePercentage { get; set; }

public static CompletionConfig AllSuccessful() => new() { ToleratedFailureCount = 0 };
Expand Down
30 changes: 30 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/BatchItemStatus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Status of an individual item in a <see cref="IBatchResult{T}"/>.
/// </summary>
/// <remarks>
/// Mirrors the wire-state of the per-branch checkpoint at the moment the batch
/// resolved. Items that finished produce <see cref="Succeeded"/> or
/// <see cref="Failed"/>; items still in flight when the batch's
/// <see cref="CompletionConfig"/> short-circuits remain in <see cref="Started"/>.
/// </remarks>
public enum BatchItemStatus
{
/// <summary>
/// The branch ran to completion and produced a result.
/// </summary>
Succeeded,

/// <summary>
/// The branch ran to completion and threw.
/// </summary>
Failed,

/// <summary>
/// The branch was still in flight when the batch's <see cref="CompletionConfig"/>
/// resolved (e.g., <see cref="CompletionConfig.FirstSuccessful"/> returned
/// before this branch finished).
/// </summary>
Started
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Why a batch operation (<see cref="IDurableContext.ParallelAsync{T}(IReadOnlyList{System.Func{IDurableContext, System.Threading.Tasks.Task{T}}}, string?, ParallelConfig?, System.Threading.CancellationToken)"/>
/// or future Map) resolved.
/// </summary>
public enum CompletionReason
{
/// <summary>
/// Every branch finished — no <see cref="CompletionConfig"/> short-circuit
/// was triggered. Branches may be a mix of <see cref="BatchItemStatus.Succeeded"/>
/// and <see cref="BatchItemStatus.Failed"/>.
/// </summary>
AllCompleted,

/// <summary>
/// <see cref="CompletionConfig.MinSuccessful"/> branches succeeded; remaining
/// branches were left in <see cref="BatchItemStatus.Started"/>.
/// </summary>
MinSuccessfulReached,

/// <summary>
/// <see cref="CompletionConfig.ToleratedFailureCount"/> or
/// <see cref="CompletionConfig.ToleratedFailurePercentage"/> was exceeded.
/// The batch is considered failed and surfaces a
/// <see cref="ParallelException"/> when awaited.
/// </summary>
FailureToleranceExceeded
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Defines completion criteria for parallel/map operations.
/// </summary>
/// <remarks>
/// Construct via the static factories (<see cref="AllSuccessful"/>,
/// <see cref="AllCompleted"/>, <see cref="FirstSuccessful"/>) or set the
/// individual properties directly. Multiple criteria combine: the operation
/// resolves as soon as any criterion is met (success short-circuit) or violated
/// (failure short-circuit).
/// </remarks>
public sealed class CompletionConfig
{
private double? _toleratedFailurePercentage;

/// <summary>
/// Minimum number of <see cref="BatchItemStatus.Succeeded"/> items required
/// before the operation resolves successfully. <c>null</c> = no minimum.
/// </summary>
public int? MinSuccessful { get; set; }

/// <summary>
/// Maximum tolerated <see cref="BatchItemStatus.Failed"/> count. When the
/// failure count <i>strictly exceeds</i> this value, the operation resolves
/// with <see cref="CompletionReason.FailureToleranceExceeded"/>.
/// <c>null</c> = no count-based failure threshold.
/// </summary>
public int? ToleratedFailureCount { get; set; }

/// <summary>
/// Maximum tolerated failure ratio, expressed as a value in the range
/// <c>0.0</c> to <c>1.0</c> (inclusive). For example, <c>0.25</c> means
/// "tolerate up to 25% failures; fail when the failure ratio strictly
/// exceeds 25%". <c>null</c> = no ratio-based failure threshold.
/// </summary>
/// <exception cref="System.ArgumentOutOfRangeException">
/// Thrown by the setter if the value is outside <c>[0.0, 1.0]</c>.
/// </exception>
public double? ToleratedFailurePercentage
{
get => _toleratedFailurePercentage;
set
{
if (value is { } v && (v < 0.0 || v > 1.0))
{
throw new ArgumentOutOfRangeException(nameof(value), v,
"ToleratedFailurePercentage must be a ratio in [0.0, 1.0].");
}
_toleratedFailurePercentage = value;
}
}

/// <summary>
/// All items must succeed. Equivalent to
/// <see cref="ToleratedFailureCount"/> = 0. The default for
/// <see cref="ParallelConfig.CompletionConfig"/>.
/// </summary>
public static CompletionConfig AllSuccessful() => new() { ToleratedFailureCount = 0 };

/// <summary>
/// Run every branch regardless of failures; surface failures per-item via
/// <see cref="IBatchResult{T}.Failed"/>. Resolution does not auto-throw —
/// the caller can inspect the result and call
/// <see cref="IBatchResult{T}.ThrowIfError"/> if they want strict-success
/// behavior.
/// </summary>
public static CompletionConfig AllCompleted() => new();

/// <summary>
/// Resolve as soon as one branch succeeds. Remaining in-flight branches are
/// reported as <see cref="BatchItemStatus.Started"/>.
/// </summary>
public static CompletionConfig FirstSuccessful() => new() { MinSuccessful = 1 };
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Configuration for
/// <see cref="IDurableContext.ParallelAsync{T}(IReadOnlyList{System.Func{IDurableContext, System.Threading.Tasks.Task{T}}}, string?, ParallelConfig?, System.Threading.CancellationToken)"/>.
/// </summary>
/// <remarks>
/// Per-branch and aggregate serializers are supplied via the AOT-safe
/// <c>ParallelAsync</c> overloads that take an
/// <see cref="ICheckpointSerializer{T}"/>; this config does not expose a
/// serializer slot.
/// </remarks>
public sealed class ParallelConfig
{
private int? _maxConcurrency;

/// <summary>
/// Maximum number of branches running concurrently. <c>null</c> (default) =
/// unlimited. Must be at least 1 when set.
/// </summary>
/// <exception cref="System.ArgumentOutOfRangeException">
/// Thrown by the setter if the value is less than or equal to 0.
/// </exception>
public int? MaxConcurrency
{
get => _maxConcurrency;
set
{
if (value is { } v && v <= 0)
{
throw new ArgumentOutOfRangeException(nameof(value), v,
"MaxConcurrency must be at least 1, or null for unlimited.");
}
_maxConcurrency = value;
}
}

/// <summary>
/// When the parallel operation is considered complete. Defaults to
/// <see cref="CompletionConfig.AllSuccessful"/> — any single branch failure
/// surfaces as a <see cref="ParallelException"/> when the parallel result
/// is awaited.
/// </summary>
public CompletionConfig CompletionConfig { get; set; } = CompletionConfig.AllSuccessful();

/// <summary>
/// How branches are represented in the checkpoint graph. Defaults to
/// <see cref="NestingType.Nested"/>.
/// </summary>
/// <remarks>
/// <see cref="NestingType.Flat"/> is not yet supported in the .NET SDK and
/// will throw <see cref="System.NotSupportedException"/> when the parallel
/// operation is invoked.
/// </remarks>
public NestingType NestingType { get; set; } = NestingType.Nested;
}
13 changes: 13 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableBranch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// A named branch for
/// <see cref="IDurableContext.ParallelAsync{T}(IReadOnlyList{DurableBranch{T}}, string?, ParallelConfig?, System.Threading.CancellationToken)"/>.
/// Names appear in execution traces and on the wire <c>OperationUpdate.Name</c>
/// field, and surface on <see cref="IBatchItem{T}.Name"/>.
/// </summary>
/// <typeparam name="T">The branch's result type.</typeparam>
/// <param name="Name">Human-readable branch name. Required.</param>
/// <param name="Func">The user function executed inside the branch's
/// child context.</param>
public sealed record DurableBranch<T>(string Name, Func<IDurableContext, Task<T>> Func);
109 changes: 100 additions & 9 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,110 @@ private Task<T> RunChildContext<T>(
{
var operationId = _idGenerator.NextId();

// Capture this DurableContext's collaborators; the child shares state,
// termination, batcher, ARN, and Lambda context — but uses a child
// OperationIdGenerator so its operation IDs are deterministically
// namespaced under the parent op ID.
IDurableContext ChildFactory(string parentOpId) => new DurableContext(
_state, _terminationManager, _idGenerator.CreateChild(parentOpId),
_durableExecutionArn, LambdaContext, _batcher);

var op = new ChildContextOperation<T>(
operationId, name, func, config, serializer, ChildFactory,
operationId, name, func, config, serializer, MakeChildFactory(),
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

[RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
[RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
public Task<IBatchResult<T>> ParallelAsync<T>(
IReadOnlyList<Func<IDurableContext, Task<T>>> branches,
string? name = null,
ParallelConfig? config = null,
CancellationToken cancellationToken = default)
=> RunParallel(WrapToDurableBranches(branches), new ReflectionJsonCheckpointSerializer<T>(), name, config, cancellationToken);

[RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
[RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
public Task<IBatchResult<T>> ParallelAsync<T>(
IReadOnlyList<DurableBranch<T>> branches,
string? name = null,
ParallelConfig? config = null,
CancellationToken cancellationToken = default)
=> RunParallel(branches, new ReflectionJsonCheckpointSerializer<T>(), name, config, cancellationToken);

public Task<IBatchResult<T>> ParallelAsync<T>(
IReadOnlyList<Func<IDurableContext, Task<T>>> branches,
ICheckpointSerializer<T> serializer,
string? name = null,
ParallelConfig? config = null,
CancellationToken cancellationToken = default)
=> RunParallel(WrapToDurableBranches(branches), serializer, name, config, cancellationToken);

public Task<IBatchResult<T>> ParallelAsync<T>(
IReadOnlyList<DurableBranch<T>> branches,
ICheckpointSerializer<T> serializer,
string? name = null,
ParallelConfig? config = null,
CancellationToken cancellationToken = default)
=> RunParallel(branches, serializer, name, config, cancellationToken);

private static IReadOnlyList<DurableBranch<T>> WrapToDurableBranches<T>(
IReadOnlyList<Func<IDurableContext, Task<T>>> branches)
{
if (branches == null) throw new ArgumentNullException(nameof(branches));

var result = new DurableBranch<T>[branches.Count];
for (var i = 0; i < branches.Count; i++)
{
var func = branches[i];
if (func == null)
throw new ArgumentException($"Branch at index {i} is null.", nameof(branches));
// Default name is the index — surfaces in execution traces and on
// IBatchItem<T>.Name. Users wanting custom names use the
// DurableBranch<T> overload.
result[i] = new DurableBranch<T>(i.ToString(System.Globalization.CultureInfo.InvariantCulture), func);
}
return result;
}

private Task<IBatchResult<T>> RunParallel<T>(
IReadOnlyList<DurableBranch<T>> branches,
ICheckpointSerializer<T> serializer,
string? name,
ParallelConfig? config,
CancellationToken cancellationToken)
{
if (branches == null) throw new ArgumentNullException(nameof(branches));
for (var i = 0; i < branches.Count; i++)
{
if (branches[i] == null)
throw new ArgumentException($"Branch at index {i} is null.", nameof(branches));
if (branches[i].Func == null)
throw new ArgumentException($"Branch at index {i} has a null Func.", nameof(branches));
}

var effectiveConfig = config ?? new ParallelConfig();
if (effectiveConfig.NestingType == NestingType.Flat)
{
throw new NotSupportedException(
"NestingType.Flat is not yet supported in the .NET Durable Execution SDK. " +
"Use NestingType.Nested (the default) for now.");
}

var operationId = _idGenerator.NextId();
var op = new Internal.ParallelOperation<T>(
operationId, name, branches, effectiveConfig, serializer, MakeChildFactory(),
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

/// <summary>
/// Builds the factory used by <see cref="ChildContextOperation{T}"/> (and
/// each <see cref="Internal.ParallelOperation{T}"/> branch) to construct
/// the inner <see cref="IDurableContext"/>. The child shares state,
/// termination, batcher, ARN, and Lambda context — but uses a child
/// <see cref="OperationIdGenerator"/> so its operation IDs are
/// deterministically namespaced under the parent op ID.
/// </summary>
private Func<string, IDurableContext> MakeChildFactory()
{
return parentOpId => new DurableContext(
_state, _terminationManager, _idGenerator.CreateChild(parentOpId),
_durableExecutionArn, LambdaContext, _batcher);
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,36 @@ public ChildContextException(string message) : base(message) { }
/// <summary>Creates a <see cref="ChildContextException"/> wrapping an inner exception.</summary>
public ChildContextException(string message, Exception innerException) : base(message, innerException) { }
}

/// <summary>
/// Thrown when a parallel operation resolves with
/// <see cref="CompletionReason.FailureToleranceExceeded"/>. The aggregate
/// <see cref="IBatchResult"/> is preserved on <see cref="Result"/> so callers
/// can inspect per-branch outcomes.
/// </summary>
/// <remarks>
/// This is the base type for parallel failures. Subclasses may be added in
/// future releases (for example, a dedicated
/// <c>ParallelFailureToleranceExceededException</c>); catching
/// <see cref="ParallelException"/> remains forward-compatible.
/// </remarks>
public class ParallelException : DurableExecutionException
{
/// <summary>
/// The aggregate result of the parallel operation. Type-erased — cast to
/// <c>IBatchResult&lt;T&gt;</c> if the per-branch result type is known.
/// </summary>
public IBatchResult? Result { get; init; }

/// <summary>
/// Why the parallel operation resolved.
/// </summary>
public CompletionReason CompletionReason { get; init; }

/// <summary>Creates an empty <see cref="ParallelException"/>.</summary>
public ParallelException() { }
/// <summary>Creates a <see cref="ParallelException"/> with the given message.</summary>
public ParallelException(string message) : base(message) { }
/// <summary>Creates a <see cref="ParallelException"/> wrapping an inner exception.</summary>
public ParallelException(string message, Exception innerException) : base(message, innerException) { }
}
Loading
Loading