Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 108 additions & 2 deletions Docs/durable-execution-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,31 @@ await context.WaitAsync(TimeSpan.FromDays(7), name: "weekly_reminder");

> **Validation:** The duration must be at least 1 second. Values less than 1 second throw `ArgumentOutOfRangeException`. Sub-second precision is truncated to whole seconds (the underlying service operates at second granularity).

#### Wait For Condition

`WaitForConditionAsync` polls a user-supplied check function until a configured `IWaitStrategy<TState>` decides to stop. Between polls the workflow is suspended (no compute charge); the service re-invokes when the strategy's chosen delay elapses. The check function receives the state from the previous iteration, so users can carry per-poll bookkeeping inside the state itself.

```csharp
// Poll until an order's status reaches a terminal value.
var finalStatus = await context.WaitForConditionAsync<OrderStatus>(
check: async (state, ctx) =>
{
ctx.Logger.LogInformation("Polling order on attempt {Attempt}", ctx.AttemptNumber);
return await orderService.GetStatusAsync(orderId);
},
config: new WaitForConditionConfig<OrderStatus>
{
InitialState = OrderStatus.Unknown,
WaitStrategy = WaitStrategy.Exponential<OrderStatus>(
isDone: s => s == OrderStatus.Completed || s == OrderStatus.Cancelled)
},
name: "wait_for_order_settle");
```

Built-in strategies live on the `WaitStrategy` factory (`Exponential`, `Linear`, `Fixed`, plus `FromDelegate`) and all accept an optional `isDone` predicate so the common case stays declarative. When the strategy hits its `maxAttempts` limit it throws `WaitForConditionException` (carrying `AttemptsExhausted` and `LastState`); when the check function itself throws, the operation surfaces a `StepException` with the original error type. State is checkpointed per-iteration in the operation's payload so polling survives Lambda re-invocations deterministically.

> **Cross-SDK note (Python migration):** .NET (and Java + JS) treat `maxAttempts` exhaustion as a failure — the operation throws `WaitForConditionException` with the last observed state attached. **Python** instead returns `WaitDecision.no_wait()` from its built-in strategies, so a Python workflow at max-attempts *succeeds* with the last state as its result. The .NET behavior was chosen to match the majority of SDKs and to give callers an idiomatic typed-exception path; if you are porting a workflow from Python and want the "succeed-with-last-state" semantic, write a custom `IWaitStrategy<TState>` (or a `WaitStrategy.FromDelegate(...)` lambda) that returns `WaitDecision.Stop()` instead of throwing when the attempt counter is exhausted.

---

### Callbacks
Expand Down Expand Up @@ -1173,13 +1198,30 @@ public interface IDurableContext
CancellationToken cancellationToken = default);

/// <summary>
/// Poll until a condition is met.
/// Poll until a condition is met. The check function returns the next
/// state on each invocation; the configured <c>IWaitStrategy&lt;TState&gt;</c>
/// decides whether to keep polling and how long to wait between calls.
/// Reflection-based JSON — not AOT-safe.
/// </summary>
[RequiresUnreferencedCode("Reflection-based JSON for TState. Use the ICheckpointSerializer<TState> overload for AOT/trimmed deployments.")]
[RequiresDynamicCode("Reflection-based JSON for TState. Use the ICheckpointSerializer<TState> overload for AOT/trimmed deployments.")]
Task<TState> WaitForConditionAsync<TState>(
Func<TState, IConditionCheckContext, Task<TState>> check,
WaitForConditionConfig<TState> config,
string? name = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Poll until a condition is met. AOT-safe — the supplied
/// <c>ICheckpointSerializer&lt;TState&gt;</c> is used in place of
/// reflection-based JSON for the per-iteration state checkpoint.
/// </summary>
Task<TState> WaitForConditionAsync<TState>(
Func<TState, IConditionCheckContext, Task<TState>> check,
WaitForConditionConfig<TState> config,
ICheckpointSerializer<TState> serializer,
string? name = null,
CancellationToken cancellationToken = default);
}
```

Expand Down Expand Up @@ -1213,6 +1255,63 @@ public interface IStepContext
/// traces and can be inspected by name in the test runner.
/// </summary>
public record DurableBranch<T>(string Name, Func<IDurableContext, Task<T>> Func);

/// <summary>
/// Context passed to a WaitForCondition check function on every polling
/// iteration. Mirrors IStepContext minus OperationId (every iteration of a
/// wait-for-condition operation shares the same operation ID, so exposing
/// it here would be misleading).
/// </summary>
public interface IConditionCheckContext
{
/// <summary>Logger scoped to this condition-check attempt.</summary>
ILogger Logger { get; }

/// <summary>The current 1-based attempt number.</summary>
int AttemptNumber { get; }
}

/// <summary>
/// Decides, per polling iteration, whether a WaitForConditionAsync operation
/// should keep polling and how long to wait. Implementations are typically
/// obtained via the <c>WaitStrategy</c> factory; users may also implement
/// directly. Built-in implementations throw <c>WaitForConditionException</c>
/// when their max-attempts limit is reached so the operation can produce a
/// failure with the last observed state.
/// </summary>
public interface IWaitStrategy<TState>
{
WaitDecision Decide(TState state, int attemptNumber);
}

/// <summary>
/// Decision returned by IWaitStrategy on each polling iteration. Stop()
/// indicates the condition has been met (the operation SUCCEEDs and returns
/// the latest state); ContinueAfter(delay) schedules the next poll.
/// </summary>
public readonly record struct WaitDecision
{
public bool ShouldContinue { get; }
public TimeSpan Delay { get; }
public static WaitDecision Stop();
public static WaitDecision ContinueAfter(TimeSpan delay);
}

/// <summary>
/// Factory for built-in IWaitStrategy implementations. Each accepts an
/// optional isDone predicate so users can terminate polling declaratively
/// when the latest state satisfies a condition (e.g. state =&gt; state.IsReady)
/// without implementing IWaitStrategy themselves. Defaults are intentionally
/// tuned for polling, NOT retry-on-exception: 60 attempts / 5s initial /
/// 300s max / 1.5x backoff / Full jitter (matches Python+JS+Java SDKs).
/// </summary>
public static class WaitStrategy
{
public static IWaitStrategy<TState> Exponential<TState>(...);
public static IWaitStrategy<TState> Linear<TState>(...);
public static IWaitStrategy<TState> Fixed<TState>(TimeSpan delay, ...);
public static IWaitStrategy<TState> FromDelegate<TState>(Func<TState, int, WaitDecision> strategy);
}
```

#### CancellationToken behavior
Expand Down Expand Up @@ -1633,11 +1732,18 @@ public class ChildContextException : DurableExecutionException

/// <summary>
/// Thrown when a wait-for-condition operation exhausts all attempts
/// without the condition being met.
/// without the condition being met. Subclassable: future failure modes
/// (e.g. timeout) should add derived exceptions rather than discriminator
/// flags so callers can catch by static type.
/// </summary>
public class WaitForConditionException : DurableExecutionException
{
public int AttemptsExhausted { get; }

/// <summary>The most recent state observed by the check function before
/// the strategy gave up. Boxed because the exception type is not generic;
/// callers cast to the workflow's known state type.</summary>
public object? LastState { get; }
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Decides, per polling iteration, whether a <c>WaitForConditionAsync</c>
/// operation should keep polling and how long to wait before the next attempt.
/// </summary>
/// <remarks>
/// Distinct from <see cref="IRetryStrategy"/>: that interface decides
/// retry-on-exception (input is the thrown <see cref="Exception"/>); this one
/// decides poll-until-condition (input is the latest <typeparamref name="TState"/>
/// observed by the check function). Implementations are typically obtained
/// via the <see cref="WaitStrategy"/> factory; users who need richer logic
/// (e.g. wall-clock-time budgets, conditional jitter) can implement this
/// interface directly.
/// </remarks>
/// <typeparam name="TState">The state type produced by the check function.</typeparam>
public interface IWaitStrategy<TState>
{
/// <summary>
/// Evaluates the latest <paramref name="state"/> from the check function
/// and the 1-based <paramref name="attemptNumber"/> just executed, and
/// returns either <see cref="WaitDecision.Stop"/> (terminate) or
/// <see cref="WaitDecision.ContinueAfter(TimeSpan)"/> (poll again after
/// the given delay).
/// </summary>
WaitDecision Decide(TState state, int attemptNumber);
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Text.RegularExpressions;
using Amazon.Lambda.DurableExecution.Internal;

namespace Amazon.Lambda.DurableExecution;

Expand Down Expand Up @@ -97,10 +98,6 @@ internal sealed class ExponentialRetryStrategy : IRetryStrategy
private readonly Type[]? _retryableExceptions;
private readonly Regex[]? _retryableMessagePatterns;

[ThreadStatic]
private static Random? t_random;
private static Random Random => t_random ??= new Random();

public ExponentialRetryStrategy(
int maxAttempts,
TimeSpan initialDelay,
Expand Down Expand Up @@ -156,19 +153,7 @@ private bool IsRetryable(Exception exception)
}

internal TimeSpan CalculateDelay(int attemptNumber)
{
var baseDelay = _initialDelay.TotalSeconds * Math.Pow(_backoffRate, attemptNumber - 1);
var cappedDelay = Math.Min(baseDelay, _maxDelay.TotalSeconds);

var finalDelay = _jitter switch
{
JitterStrategy.Full => Random.NextDouble() * cappedDelay,
JitterStrategy.Half => cappedDelay * (0.5 + 0.5 * Random.NextDouble()),
_ => cappedDelay
};

return TimeSpan.FromSeconds(Math.Max(1, Math.Ceiling(finalDelay)));
}
=> ExponentialBackoff.CalculateDelay(attemptNumber, _initialDelay, _maxDelay, _backoffRate, _jitter);
}

internal sealed class DelegateRetryStrategy : IRetryStrategy
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Decision returned by an <see cref="IWaitStrategy{TState}"/> on each polling
/// iteration: either stop polling (the condition has been met or attempts
/// have been exhausted) or continue after the given delay.
/// </summary>
public readonly record struct WaitDecision
{
/// <summary>
/// True when the strategy wants the operation to keep polling; false when
/// the operation should terminate (condition satisfied or limit reached).
/// </summary>
public bool ShouldContinue { get; }

/// <summary>
/// Delay before the next poll. Only meaningful when
/// <see cref="ShouldContinue"/> is <c>true</c>; otherwise
/// <see cref="TimeSpan.Zero"/>. The wire-level timer floors this at 1
/// second.
/// </summary>
public TimeSpan Delay { get; }

private WaitDecision(bool shouldContinue, TimeSpan delay)
{
ShouldContinue = shouldContinue;
Delay = delay;
}

/// <summary>
/// Stop polling. The current state is treated as the final result of the
/// wait-for-condition operation and returned to the caller.
/// </summary>
public static WaitDecision Stop() => new(false, TimeSpan.Zero);

/// <summary>
/// Continue polling after the given delay. The Lambda is suspended until
/// the delay elapses, at which point the service re-invokes and the
/// condition is re-evaluated.
/// </summary>
public static WaitDecision ContinueAfter(TimeSpan delay) => new(true, delay);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Configuration for a <c>WaitForConditionAsync</c> polling operation.
/// </summary>
/// <remarks>
/// Both properties are required: the strategy decides "continue or stop"
/// (per-call) and the initial state seeds the very first check invocation.
/// On replay, the latest checkpointed state is restored from the previous
/// RETRY checkpoint and used in place of <see cref="InitialState"/>; this
/// is what makes the polling loop survive Lambda re-invocations
/// deterministically.
/// </remarks>
/// <typeparam name="TState">The state type produced by the check function.</typeparam>
public sealed class WaitForConditionConfig<TState>
{
/// <summary>
/// Initial state passed to the very first invocation of the check
/// function. Subsequent invocations receive the state returned by the
/// previous call.
/// </summary>
public required TState InitialState { get; set; }

/// <summary>
/// Strategy that decides, after each check invocation, whether to keep
/// polling and how long to wait before the next attempt.
/// </summary>
public required IWaitStrategy<TState> WaitStrategy { get; set; }
}
Loading
Loading