From 3fa06ce3085bbb50d265f6570a3f92f9286918e9 Mon Sep 17 00:00:00 2001 From: Garrett Beatty Date: Thu, 14 May 2026 17:39:21 -0400 Subject: [PATCH] Add WaitForConditionAsync polling primitive (DOTNET-8665) Adds service-mediated polling to the .NET Durable Execution SDK. WaitForConditionAsync repeatedly evaluates a check function with configurable wait strategy between attempts; each iteration is its own Lambda invocation (suspended via STEP+RETRY checkpoints carrying NextAttemptDelaySeconds), so polling does not consume compute time. Public surface: - IDurableContext.WaitForConditionAsync (reflection + AOT-safe overloads taking ICheckpointSerializer) - IConditionCheckContext (Logger + AttemptNumber) - WaitForConditionConfig (required InitialState + WaitStrategy) - IWaitStrategy with Decide(state, attempt) returning WaitDecision - WaitDecision (readonly record struct, ShouldContinue + Delay, Stop() / ContinueAfter(TimeSpan) factories) - WaitStrategy factories: Exponential / Linear / Fixed / FromDelegate, each accepting an optional Func isDone predicate - WaitForConditionException with AttemptsExhausted and LastState (preserved across both live execution and replay) Internal: - WaitForConditionOperation wire format = STEP + SubType "WaitForCondition". Each polling iteration emits Action=RETRY with the new state in payload and NextAttemptDelaySeconds for the service to schedule the next invocation. - Strategies signal max-attempts exhausted by throwing WaitForConditionException directly from Decide(); the operation enriches with LastState before checkpointing FAIL. - LastState survives FAIL replay: serialized into FAIL payload at write time, deserialized in BuildFailureException with warning-logged fallback for legacy/corrupt data. - ExponentialBackoff helper extracted for sharing with ExponentialRetryStrategy. Math is byte-for-byte identical. - Reuses OperationSubTypes.WaitForCondition from Wave 0. Defaults: 60 attempts / 5s initial / 300s max / 1.5x rate / Full jitter - distinct from RetryStrategy.Default and matching Python/JS/Java reference SDKs. (Note: Python returns success on max-attempts; .NET/Java/JS throw - documented in design doc.) Adds 41 unit tests + 5 integration tests covering each wait strategy, isDone predicate paths, max-attempts exhaustion, user-check exceptions, replay determinism, exponential backoff bounds, and corrupt-payload fallback logging. Co-Authored-By: Claude Opus 4.7 (1M context) --- Docs/durable-execution-design.md | 110 +- .../Config/IWaitStrategy.cs | 27 + .../Config/RetryStrategy.cs | 19 +- .../Config/WaitDecision.cs | 42 + .../Config/WaitForConditionConfig.cs | 29 + .../Config/WaitStrategy.cs | 222 ++++ .../DurableContext.cs | 36 + .../Exceptions/WaitForConditionException.cs | 42 + .../IConditionCheckContext.cs | 26 + .../IDurableContext.cs | 40 + .../Internal/ExponentialBackoff.cs | 41 + .../Internal/WaitForConditionOperation.cs | 365 ++++++ .../Dockerfile | 7 + .../Function.cs | 66 ++ ...WaitForConditionExponentialFunction.csproj | 18 + .../Dockerfile | 7 + .../Function.cs | 61 + .../WaitForConditionHappyPathFunction.csproj | 18 + .../Dockerfile | 7 + .../Function.cs | 62 + ...WaitForConditionMaxAttemptsFunction.csproj | 18 + .../Dockerfile | 7 + .../Function.cs | 63 + ...rConditionReplayDeterminismFunction.csproj | 18 + .../Dockerfile | 7 + .../Function.cs | 66 ++ ...ForConditionUserCheckThrowsFunction.csproj | 18 + .../WaitForConditionExponentialTest.cs | 66 ++ .../WaitForConditionHappyPathTest.cs | 73 ++ .../WaitForConditionMaxAttemptsTest.cs | 68 ++ .../WaitForConditionReplayDeterminismTest.cs | 89 ++ .../WaitForConditionUserCheckThrowsTest.cs | 71 ++ .../WaitForConditionOperationTests.cs | 1049 +++++++++++++++++ .../WaitStrategyTests.cs | 226 ++++ 34 files changed, 3065 insertions(+), 19 deletions(-) create mode 100644 Libraries/src/Amazon.Lambda.DurableExecution/Config/IWaitStrategy.cs create mode 100644 Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitDecision.cs create mode 100644 Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitForConditionConfig.cs create mode 100644 Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitStrategy.cs create mode 100644 Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/WaitForConditionException.cs create mode 100644 Libraries/src/Amazon.Lambda.DurableExecution/IConditionCheckContext.cs create mode 100644 Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExponentialBackoff.cs create mode 100644 Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitForConditionOperation.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionExponentialFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionExponentialFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionExponentialFunction/WaitForConditionExponentialFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionHappyPathFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionHappyPathFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionHappyPathFunction/WaitForConditionHappyPathFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionMaxAttemptsFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionMaxAttemptsFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionMaxAttemptsFunction/WaitForConditionMaxAttemptsFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionReplayDeterminismFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionReplayDeterminismFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionReplayDeterminismFunction/WaitForConditionReplayDeterminismFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionUserCheckThrowsFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionUserCheckThrowsFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionUserCheckThrowsFunction/WaitForConditionUserCheckThrowsFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionExponentialTest.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionHappyPathTest.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionMaxAttemptsTest.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionReplayDeterminismTest.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionUserCheckThrowsTest.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForConditionOperationTests.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitStrategyTests.cs diff --git a/Docs/durable-execution-design.md b/Docs/durable-execution-design.md index 402d689af..bc6ae23a0 100644 --- a/Docs/durable-execution-design.md +++ b/Docs/durable-execution-design.md @@ -413,6 +413,31 @@ await context.WaitAsync(TimeSpan.FromDays(7), name: "weekly_reminder"); > **Validation:** The duration must be at least 1 second. Values less than 1 second throw `ArgumentOutOfRangeException`. Sub-second precision is truncated to whole seconds (the underlying service operates at second granularity). +#### Wait For Condition + +`WaitForConditionAsync` polls a user-supplied check function until a configured `IWaitStrategy` decides to stop. Between polls the workflow is suspended (no compute charge); the service re-invokes when the strategy's chosen delay elapses. The check function receives the state from the previous iteration, so users can carry per-poll bookkeeping inside the state itself. + +```csharp +// Poll until an order's status reaches a terminal value. +var finalStatus = await context.WaitForConditionAsync( + check: async (state, ctx) => + { + ctx.Logger.LogInformation("Polling order on attempt {Attempt}", ctx.AttemptNumber); + return await orderService.GetStatusAsync(orderId); + }, + config: new WaitForConditionConfig + { + InitialState = OrderStatus.Unknown, + WaitStrategy = WaitStrategy.Exponential( + isDone: s => s == OrderStatus.Completed || s == OrderStatus.Cancelled) + }, + name: "wait_for_order_settle"); +``` + +Built-in strategies live on the `WaitStrategy` factory (`Exponential`, `Linear`, `Fixed`, plus `FromDelegate`) and all accept an optional `isDone` predicate so the common case stays declarative. When the strategy hits its `maxAttempts` limit it throws `WaitForConditionException` (carrying `AttemptsExhausted` and `LastState`); when the check function itself throws, the operation surfaces a `StepException` with the original error type. State is checkpointed per-iteration in the operation's payload so polling survives Lambda re-invocations deterministically. + +> **Cross-SDK note (Python migration):** .NET (and Java + JS) treat `maxAttempts` exhaustion as a failure — the operation throws `WaitForConditionException` with the last observed state attached. **Python** instead returns `WaitDecision.no_wait()` from its built-in strategies, so a Python workflow at max-attempts *succeeds* with the last state as its result. The .NET behavior was chosen to match the majority of SDKs and to give callers an idiomatic typed-exception path; if you are porting a workflow from Python and want the "succeed-with-last-state" semantic, write a custom `IWaitStrategy` (or a `WaitStrategy.FromDelegate(...)` lambda) that returns `WaitDecision.Stop()` instead of throwing when the attempt counter is exhausted. + --- ### Callbacks @@ -1173,13 +1198,30 @@ public interface IDurableContext CancellationToken cancellationToken = default); /// - /// Poll until a condition is met. + /// Poll until a condition is met. The check function returns the next + /// state on each invocation; the configured IWaitStrategy<TState> + /// decides whether to keep polling and how long to wait between calls. + /// Reflection-based JSON — not AOT-safe. /// + [RequiresUnreferencedCode("Reflection-based JSON for TState. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + [RequiresDynamicCode("Reflection-based JSON for TState. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] Task WaitForConditionAsync( Func> check, WaitForConditionConfig config, string? name = null, CancellationToken cancellationToken = default); + + /// + /// Poll until a condition is met. AOT-safe — the supplied + /// ICheckpointSerializer<TState> is used in place of + /// reflection-based JSON for the per-iteration state checkpoint. + /// + Task WaitForConditionAsync( + Func> check, + WaitForConditionConfig config, + ICheckpointSerializer serializer, + string? name = null, + CancellationToken cancellationToken = default); } ``` @@ -1213,6 +1255,63 @@ public interface IStepContext /// traces and can be inspected by name in the test runner. /// public record DurableBranch(string Name, Func> Func); + +/// +/// Context passed to a WaitForCondition check function on every polling +/// iteration. Mirrors IStepContext minus OperationId (every iteration of a +/// wait-for-condition operation shares the same operation ID, so exposing +/// it here would be misleading). +/// +public interface IConditionCheckContext +{ + /// Logger scoped to this condition-check attempt. + ILogger Logger { get; } + + /// The current 1-based attempt number. + int AttemptNumber { get; } +} + +/// +/// Decides, per polling iteration, whether a WaitForConditionAsync operation +/// should keep polling and how long to wait. Implementations are typically +/// obtained via the WaitStrategy factory; users may also implement +/// directly. Built-in implementations throw WaitForConditionException +/// when their max-attempts limit is reached so the operation can produce a +/// failure with the last observed state. +/// +public interface IWaitStrategy +{ + WaitDecision Decide(TState state, int attemptNumber); +} + +/// +/// Decision returned by IWaitStrategy on each polling iteration. Stop() +/// indicates the condition has been met (the operation SUCCEEDs and returns +/// the latest state); ContinueAfter(delay) schedules the next poll. +/// +public readonly record struct WaitDecision +{ + public bool ShouldContinue { get; } + public TimeSpan Delay { get; } + public static WaitDecision Stop(); + public static WaitDecision ContinueAfter(TimeSpan delay); +} + +/// +/// Factory for built-in IWaitStrategy implementations. Each accepts an +/// optional isDone predicate so users can terminate polling declaratively +/// when the latest state satisfies a condition (e.g. state => state.IsReady) +/// without implementing IWaitStrategy themselves. Defaults are intentionally +/// tuned for polling, NOT retry-on-exception: 60 attempts / 5s initial / +/// 300s max / 1.5x backoff / Full jitter (matches Python+JS+Java SDKs). +/// +public static class WaitStrategy +{ + public static IWaitStrategy Exponential(...); + public static IWaitStrategy Linear(...); + public static IWaitStrategy Fixed(TimeSpan delay, ...); + public static IWaitStrategy FromDelegate(Func strategy); +} ``` #### CancellationToken behavior @@ -1633,11 +1732,18 @@ public class ChildContextException : DurableExecutionException /// /// Thrown when a wait-for-condition operation exhausts all attempts -/// without the condition being met. +/// without the condition being met. Subclassable: future failure modes +/// (e.g. timeout) should add derived exceptions rather than discriminator +/// flags so callers can catch by static type. /// public class WaitForConditionException : DurableExecutionException { public int AttemptsExhausted { get; } + + /// The most recent state observed by the check function before + /// the strategy gave up. Boxed because the exception type is not generic; + /// callers cast to the workflow's known state type. + public object? LastState { get; } } /// diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/IWaitStrategy.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/IWaitStrategy.cs new file mode 100644 index 000000000..7ca26964a --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/IWaitStrategy.cs @@ -0,0 +1,27 @@ +namespace Amazon.Lambda.DurableExecution; + +/// +/// Decides, per polling iteration, whether a WaitForConditionAsync +/// operation should keep polling and how long to wait before the next attempt. +/// +/// +/// Distinct from : that interface decides +/// retry-on-exception (input is the thrown ); this one +/// decides poll-until-condition (input is the latest +/// observed by the check function). Implementations are typically obtained +/// via the factory; users who need richer logic +/// (e.g. wall-clock-time budgets, conditional jitter) can implement this +/// interface directly. +/// +/// The state type produced by the check function. +public interface IWaitStrategy +{ + /// + /// Evaluates the latest from the check function + /// and the 1-based just executed, and + /// returns either (terminate) or + /// (poll again after + /// the given delay). + /// + WaitDecision Decide(TState state, int attemptNumber); +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/RetryStrategy.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/RetryStrategy.cs index b8688ca0c..658f003b2 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Config/RetryStrategy.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/RetryStrategy.cs @@ -1,4 +1,5 @@ using System.Text.RegularExpressions; +using Amazon.Lambda.DurableExecution.Internal; namespace Amazon.Lambda.DurableExecution; @@ -97,10 +98,6 @@ internal sealed class ExponentialRetryStrategy : IRetryStrategy private readonly Type[]? _retryableExceptions; private readonly Regex[]? _retryableMessagePatterns; - [ThreadStatic] - private static Random? t_random; - private static Random Random => t_random ??= new Random(); - public ExponentialRetryStrategy( int maxAttempts, TimeSpan initialDelay, @@ -156,19 +153,7 @@ private bool IsRetryable(Exception exception) } internal TimeSpan CalculateDelay(int attemptNumber) - { - var baseDelay = _initialDelay.TotalSeconds * Math.Pow(_backoffRate, attemptNumber - 1); - var cappedDelay = Math.Min(baseDelay, _maxDelay.TotalSeconds); - - var finalDelay = _jitter switch - { - JitterStrategy.Full => Random.NextDouble() * cappedDelay, - JitterStrategy.Half => cappedDelay * (0.5 + 0.5 * Random.NextDouble()), - _ => cappedDelay - }; - - return TimeSpan.FromSeconds(Math.Max(1, Math.Ceiling(finalDelay))); - } + => ExponentialBackoff.CalculateDelay(attemptNumber, _initialDelay, _maxDelay, _backoffRate, _jitter); } internal sealed class DelegateRetryStrategy : IRetryStrategy diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitDecision.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitDecision.cs new file mode 100644 index 000000000..f2b7dff98 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitDecision.cs @@ -0,0 +1,42 @@ +namespace Amazon.Lambda.DurableExecution; + +/// +/// Decision returned by an on each polling +/// iteration: either stop polling (the condition has been met or attempts +/// have been exhausted) or continue after the given delay. +/// +public readonly record struct WaitDecision +{ + /// + /// True when the strategy wants the operation to keep polling; false when + /// the operation should terminate (condition satisfied or limit reached). + /// + public bool ShouldContinue { get; } + + /// + /// Delay before the next poll. Only meaningful when + /// is true; otherwise + /// . The wire-level timer floors this at 1 + /// second. + /// + public TimeSpan Delay { get; } + + private WaitDecision(bool shouldContinue, TimeSpan delay) + { + ShouldContinue = shouldContinue; + Delay = delay; + } + + /// + /// Stop polling. The current state is treated as the final result of the + /// wait-for-condition operation and returned to the caller. + /// + public static WaitDecision Stop() => new(false, TimeSpan.Zero); + + /// + /// Continue polling after the given delay. The Lambda is suspended until + /// the delay elapses, at which point the service re-invokes and the + /// condition is re-evaluated. + /// + public static WaitDecision ContinueAfter(TimeSpan delay) => new(true, delay); +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitForConditionConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitForConditionConfig.cs new file mode 100644 index 000000000..ea99a76ef --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitForConditionConfig.cs @@ -0,0 +1,29 @@ +namespace Amazon.Lambda.DurableExecution; + +/// +/// Configuration for a WaitForConditionAsync polling operation. +/// +/// +/// Both properties are required: the strategy decides "continue or stop" +/// (per-call) and the initial state seeds the very first check invocation. +/// On replay, the latest checkpointed state is restored from the previous +/// RETRY checkpoint and used in place of ; this +/// is what makes the polling loop survive Lambda re-invocations +/// deterministically. +/// +/// The state type produced by the check function. +public sealed class WaitForConditionConfig +{ + /// + /// Initial state passed to the very first invocation of the check + /// function. Subsequent invocations receive the state returned by the + /// previous call. + /// + public required TState InitialState { get; set; } + + /// + /// Strategy that decides, after each check invocation, whether to keep + /// polling and how long to wait before the next attempt. + /// + public required IWaitStrategy WaitStrategy { get; set; } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitStrategy.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitStrategy.cs new file mode 100644 index 000000000..a2aea1d19 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitStrategy.cs @@ -0,0 +1,222 @@ +using Amazon.Lambda.DurableExecution.Internal; + +namespace Amazon.Lambda.DurableExecution; + +/// +/// Factory methods for built-in +/// implementations used with +/// IDurableContext.WaitForConditionAsync. +/// +/// +/// Each factory accepts an optional isDone predicate so users can +/// terminate polling declaratively when the latest state satisfies a +/// condition (e.g. state => state.IsReady) without implementing +/// themselves. If isDone is +/// null, the strategy polls until maxAttempts is exhausted — +/// at which point a is thrown by +/// the operation. Defaults are intentionally tuned for polling, not for +/// retry-on-exception: 60 attempts / 5s initial / 300s max / 1.5x backoff / +/// Full jitter (matches Python+JS+Java reference SDKs). +/// +public static class WaitStrategy +{ + /// + /// Exponential-backoff wait strategy. Defaults: 60 attempts, 5s initial + /// delay, 5min (300s) max delay, 1.5x backoff, Full jitter — matching + /// the Python, JS, and Java reference SDKs. + /// + /// Maximum polling attempts before the operation throws . + /// Delay before the second attempt; subsequent delays multiply by up to . + /// Cap on the per-attempt delay. + /// Multiplier applied per attempt. + /// Jitter strategy applied to each delay. + /// Optional predicate evaluated against the latest state; when it returns true, polling stops and the state is returned. + public static IWaitStrategy Exponential( + int maxAttempts = 60, + TimeSpan? initialDelay = null, + TimeSpan? maxDelay = null, + double backoffRate = 1.5, + JitterStrategy jitter = JitterStrategy.Full, + Func? isDone = null) + { + return new ExponentialWaitStrategy( + maxAttempts, + initialDelay ?? TimeSpan.FromSeconds(5), + maxDelay ?? TimeSpan.FromSeconds(300), + backoffRate, + jitter, + isDone); + } + + /// + /// Linear-growth wait strategy. The delay starts at + /// and grows by + /// each attempt, up to + /// . + /// + /// Maximum polling attempts before the operation throws . + /// Delay before the second attempt. + /// Amount added to the delay on each subsequent attempt. + /// Cap on the per-attempt delay; null means no cap. + /// Optional predicate evaluated against the latest state; when it returns true, polling stops and the state is returned. + public static IWaitStrategy Linear( + int maxAttempts = 60, + TimeSpan? initialDelay = null, + TimeSpan? increment = null, + TimeSpan? maxDelay = null, + Func? isDone = null) + { + return new LinearWaitStrategy( + maxAttempts, + initialDelay ?? TimeSpan.FromSeconds(5), + increment ?? TimeSpan.FromSeconds(5), + maxDelay, + isDone); + } + + /// + /// Fixed-delay wait strategy. Every poll waits the same + /// . + /// + /// Fixed delay between polls. + /// Maximum polling attempts before the operation throws . + /// Optional predicate evaluated against the latest state; when it returns true, polling stops and the state is returned. + public static IWaitStrategy Fixed( + TimeSpan delay, + int maxAttempts = 60, + Func? isDone = null) + { + return new FixedWaitStrategy(maxAttempts, delay, isDone); + } + + /// + /// Wraps an arbitrary delegate as an . + /// + public static IWaitStrategy FromDelegate(Func strategy) + => new DelegateWaitStrategy(strategy); +} + +internal sealed class ExponentialWaitStrategy : IWaitStrategy +{ + private readonly int _maxAttempts; + private readonly TimeSpan _initialDelay; + private readonly TimeSpan _maxDelay; + private readonly double _backoffRate; + private readonly JitterStrategy _jitter; + private readonly Func? _isDone; + + public ExponentialWaitStrategy( + int maxAttempts, + TimeSpan initialDelay, + TimeSpan maxDelay, + double backoffRate, + JitterStrategy jitter, + Func? isDone) + { + _maxAttempts = maxAttempts; + _initialDelay = initialDelay; + _maxDelay = maxDelay; + _backoffRate = backoffRate; + _jitter = jitter; + _isDone = isDone; + } + + public WaitDecision Decide(TState state, int attemptNumber) + { + // Predicate satisfied → stop normally (operation SUCCEEDs). + if (_isDone != null && _isDone(state)) return WaitDecision.Stop(); + + // Attempts saturated → throw WaitForConditionException directly. + // Matches the JS reference SDK (wait-strategy-config.ts:54-57); lets + // the operation distinguish "condition met" (Stop) from "gave up" + // (exception) without a discriminator on WaitDecision. The operation + // catches, populates LastState (which the strategy doesn't have + // access to), checkpoints FAIL, and rethrows. + if (attemptNumber >= _maxAttempts) + throw new WaitForConditionException( + $"WaitForCondition exceeded maximum attempts ({_maxAttempts}).") + { + AttemptsExhausted = attemptNumber + }; + + var delay = ExponentialBackoff.CalculateDelay( + attemptNumber, _initialDelay, _maxDelay, _backoffRate, _jitter); + return WaitDecision.ContinueAfter(delay); + } +} + +internal sealed class LinearWaitStrategy : IWaitStrategy +{ + private readonly int _maxAttempts; + private readonly TimeSpan _initialDelay; + private readonly TimeSpan _increment; + private readonly TimeSpan? _maxDelay; + private readonly Func? _isDone; + + public LinearWaitStrategy( + int maxAttempts, + TimeSpan initialDelay, + TimeSpan increment, + TimeSpan? maxDelay, + Func? isDone) + { + _maxAttempts = maxAttempts; + _initialDelay = initialDelay; + _increment = increment; + _maxDelay = maxDelay; + _isDone = isDone; + } + + public WaitDecision Decide(TState state, int attemptNumber) + { + if (_isDone != null && _isDone(state)) return WaitDecision.Stop(); + if (attemptNumber >= _maxAttempts) + throw new WaitForConditionException( + $"WaitForCondition exceeded maximum attempts ({_maxAttempts}).") + { + AttemptsExhausted = attemptNumber + }; + + var rawSeconds = _initialDelay.TotalSeconds + _increment.TotalSeconds * (attemptNumber - 1); + if (_maxDelay is { } cap) rawSeconds = Math.Min(rawSeconds, cap.TotalSeconds); + + // Floor at 1 second to match the service timer granularity. + var seconds = Math.Max(1, Math.Ceiling(rawSeconds)); + return WaitDecision.ContinueAfter(TimeSpan.FromSeconds(seconds)); + } +} + +internal sealed class FixedWaitStrategy : IWaitStrategy +{ + private readonly int _maxAttempts; + private readonly TimeSpan _delay; + private readonly Func? _isDone; + + public FixedWaitStrategy(int maxAttempts, TimeSpan delay, Func? isDone) + { + _maxAttempts = maxAttempts; + _delay = delay; + _isDone = isDone; + } + + public WaitDecision Decide(TState state, int attemptNumber) + { + if (_isDone != null && _isDone(state)) return WaitDecision.Stop(); + if (attemptNumber >= _maxAttempts) + throw new WaitForConditionException( + $"WaitForCondition exceeded maximum attempts ({_maxAttempts}).") + { + AttemptsExhausted = attemptNumber + }; + + var seconds = Math.Max(1, Math.Ceiling(_delay.TotalSeconds)); + return WaitDecision.ContinueAfter(TimeSpan.FromSeconds(seconds)); + } +} + +internal sealed class DelegateWaitStrategy : IWaitStrategy +{ + private readonly Func _strategy; + public DelegateWaitStrategy(Func strategy) => _strategy = strategy; + public WaitDecision Decide(TState state, int attemptNumber) => _strategy(state, attemptNumber); +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs index 69e5e580c..307b3c12c 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs @@ -140,6 +140,42 @@ public async Task RunInChildContextAsync( name, config, cancellationToken); } + [RequiresUnreferencedCode("Reflection-based JSON for TState. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + [RequiresDynamicCode("Reflection-based JSON for TState. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + public Task WaitForConditionAsync( + Func> check, + WaitForConditionConfig config, + string? name = null, + CancellationToken cancellationToken = default) + => RunWaitForCondition(check, config, new ReflectionJsonCheckpointSerializer(), name, cancellationToken); + + public Task WaitForConditionAsync( + Func> check, + WaitForConditionConfig config, + ICheckpointSerializer serializer, + string? name = null, + CancellationToken cancellationToken = default) + => RunWaitForCondition(check, config, serializer, name, cancellationToken); + + private Task RunWaitForCondition( + Func> check, + WaitForConditionConfig config, + ICheckpointSerializer serializer, + string? name, + CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(check); + ArgumentNullException.ThrowIfNull(config); + ArgumentNullException.ThrowIfNull(config.WaitStrategy); + ArgumentNullException.ThrowIfNull(serializer); + + var operationId = _idGenerator.NextId(); + var op = new WaitForConditionOperation( + operationId, name, check, config, serializer, Logger, + _state, _terminationManager, _durableExecutionArn, _batcher); + return op.ExecuteAsync(cancellationToken); + } + private Task RunChildContext( Func> func, ICheckpointSerializer serializer, diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/WaitForConditionException.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/WaitForConditionException.cs new file mode 100644 index 000000000..411f63390 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/WaitForConditionException.cs @@ -0,0 +1,42 @@ +namespace Amazon.Lambda.DurableExecution; + +/// +/// Thrown when a WaitForConditionAsync operation reaches its +/// strategy's max-attempts limit without the condition being met. +/// +/// +/// Designed to be subclassable: future failure modes (e.g. timeout once that's +/// implemented) should be added as derived exceptions rather than discriminator +/// flags on this type, so users can catch them by static type. +/// exposes the most recently observed state so callers +/// can incorporate it into the failure path (logging, partial results, etc.). +/// +public class WaitForConditionException : DurableExecutionException +{ + /// + /// Number of attempts the strategy made before giving up. 1-based. + /// + public int AttemptsExhausted { get; init; } + + /// + /// The most recent state observed by the check function before the + /// strategy decided to stop. Boxed because the exception type is not + /// generic; callers cast to the workflow's known state type. + /// + /// + /// Populated identically on live execution and on replay: the operation + /// serializes the last observed state into the FAIL checkpoint payload, + /// so a re-invocation that hits the cached FAIL reconstructs the same + /// LastState the original execution surfaced. Will be null + /// only if the FAIL checkpoint predates this serialization (legacy data) + /// or if the serializer cannot round-trip the state. + /// + public object? LastState { get; init; } + + /// Creates an empty . + public WaitForConditionException() { } + /// Creates a with the given message. + public WaitForConditionException(string message) : base(message) { } + /// Creates a wrapping an inner exception. + public WaitForConditionException(string message, Exception innerException) : base(message, innerException) { } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/IConditionCheckContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/IConditionCheckContext.cs new file mode 100644 index 000000000..cd1d605b6 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/IConditionCheckContext.cs @@ -0,0 +1,26 @@ +using Microsoft.Extensions.Logging; + +namespace Amazon.Lambda.DurableExecution; + +/// +/// Context passed to a WaitForConditionAsync check function on every +/// polling iteration. Provides a logger scoped to the current attempt and the +/// 1-based attempt number, mirroring the surface of +/// (minus OperationId: every iteration of a +/// wait-for-condition operation shares the same operation ID, so exposing it +/// here would be misleading — see DESIGN-QUESTIONS.md#Q6). +/// +public interface IConditionCheckContext +{ + /// + /// Logger scoped to this condition-check attempt. + /// + ILogger Logger { get; } + + /// + /// The current 1-based attempt number. Increments on every polling + /// iteration; on replay, equals the number of attempts already + /// checkpointed plus one. + /// + int AttemptNumber { get; } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs index eb10a0ffe..c34db120b 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs @@ -119,6 +119,46 @@ Task RunInChildContextAsync( string? name = null, ChildContextConfig? config = null, CancellationToken cancellationToken = default); + + /// + /// Poll a condition by repeatedly invoking until + /// the configured decides to stop. + /// Between polls the workflow is suspended (no compute charge); the + /// service re-invokes the Lambda when the strategy's chosen delay elapses. + /// + /// + /// On every iteration the function receives the + /// state returned by the previous invocation (seeded by + /// on the very + /// first call), so users can carry per-poll bookkeeping (e.g. a cursor or + /// retry counter) inside the state itself. If the strategy stops because + /// of 's max-attempts limit (rather + /// than because the condition is met), a + /// is thrown carrying the last observed state. + /// The check function's return value is serialized to a checkpoint using + /// reflection-based System.Text.Json. For NativeAOT or trimmed + /// deployments, use the overload that takes an + /// . + /// + [RequiresUnreferencedCode("Reflection-based JSON for TState. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + [RequiresDynamicCode("Reflection-based JSON for TState. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + Task WaitForConditionAsync( + Func> check, + WaitForConditionConfig config, + string? name = null, + CancellationToken cancellationToken = default); + + /// + /// Poll a condition with AOT-safe checkpoint serialization. The supplied + /// is used in place of reflection-based + /// JSON. See the reflection overload for full semantics. + /// + Task WaitForConditionAsync( + Func> check, + WaitForConditionConfig config, + ICheckpointSerializer serializer, + string? name = null, + CancellationToken cancellationToken = default); } /// diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExponentialBackoff.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExponentialBackoff.cs new file mode 100644 index 000000000..47da147c9 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExponentialBackoff.cs @@ -0,0 +1,41 @@ +namespace Amazon.Lambda.DurableExecution.Internal; + +/// +/// Shared exponential-backoff math for both +/// (retry-on-exception) and +/// ExponentialWaitStrategy<TState> (wait-for-condition polling). +/// Computes min(initialDelay * backoff^(attempt-1), maxDelay), applies +/// the requested jitter, then ceilings to whole seconds with a 1-second floor +/// (the service timer's smallest unit). +/// +internal static class ExponentialBackoff +{ + [ThreadStatic] + private static Random? t_random; + private static Random Random => t_random ??= new Random(); + + /// + /// Computes the delay for the given (1-based) + /// using exponential backoff with the requested jitter strategy. Returned + /// delay is always at least 1 second (service timer floor). + /// + public static TimeSpan CalculateDelay( + int attemptNumber, + TimeSpan initialDelay, + TimeSpan maxDelay, + double backoffRate, + JitterStrategy jitter) + { + var baseDelay = initialDelay.TotalSeconds * Math.Pow(backoffRate, attemptNumber - 1); + var cappedDelay = Math.Min(baseDelay, maxDelay.TotalSeconds); + + var finalDelay = jitter switch + { + JitterStrategy.Full => Random.NextDouble() * cappedDelay, + JitterStrategy.Half => cappedDelay * (0.5 + 0.5 * Random.NextDouble()), + _ => cappedDelay + }; + + return TimeSpan.FromSeconds(Math.Max(1, Math.Ceiling(finalDelay))); + } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitForConditionOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitForConditionOperation.cs new file mode 100644 index 000000000..92029a67c --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitForConditionOperation.cs @@ -0,0 +1,365 @@ +using Microsoft.Extensions.Logging; +using SdkErrorObject = Amazon.Lambda.Model.ErrorObject; +using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate; +using SdkStepOptions = Amazon.Lambda.Model.StepOptions; + +namespace Amazon.Lambda.DurableExecution.Internal; + +/// +/// Durable wait-for-condition (polling) operation. Repeatedly invokes a +/// user-supplied check function until an +/// decides to stop. Between iterations the workflow is suspended so the +/// Lambda is not billing compute while waiting. +/// +/// +/// Wire format reuses STEP+RETRY exactly (see DESIGN-QUESTIONS.md#Q8): +/// +/// Type=STEP, SubType="WaitForCondition" +/// Each polling iteration emits Action=RETRY with the latest +/// in Payload and the strategy's +/// chosen delay in StepOptions.NextAttemptDelaySeconds. +/// Termination emits Action=SUCCEED with the final state in +/// Payload; check-function exceptions emit Action=FAIL. +/// +/// Replay branches — example: await ctx.WaitForConditionAsync(check, config, "poll") +/// +/// Fresh: sync-flush START → run check with +/// → strategy decides Stop/Continue. +/// SUCCEEDED: return the deserialized cached state; check is NOT re-run. +/// FAILED: re-throw a +/// (or fall back to if the FAIL was caused by +/// the check function throwing — the latter carries the original +/// error type/message). +/// PENDING (RETRY scheduled): if the next-attempt timer hasn't +/// fired yet, re-suspend; otherwise read the prior state from +/// StepDetails.Result, advance the attempt counter, and run the +/// check again. +/// READY: timer fired and the service re-invoked us. Read the +/// prior state, advance the attempt counter, run the check. +/// STARTED: the START checkpoint was written but the very first +/// check attempt didn't complete (Lambda crash / timeout). Re-execute +/// with as +/// the seed. +/// +/// State checkpointing in each RETRY's payload is what makes the polling loop +/// survive Lambda re-invocations deterministically. +/// +internal sealed class WaitForConditionOperation : DurableOperation +{ + private readonly Func> _check; + private readonly WaitForConditionConfig _config; + private readonly ICheckpointSerializer _serializer; + private readonly ILogger _logger; + + public WaitForConditionOperation( + string operationId, + string? name, + Func> check, + WaitForConditionConfig config, + ICheckpointSerializer serializer, + ILogger logger, + ExecutionState state, + TerminationManager termination, + string durableExecutionArn, + CheckpointBatcher? batcher = null) + : base(operationId, name, state, termination, durableExecutionArn, batcher) + { + _check = check; + _config = config; + _serializer = serializer; + _logger = logger; + } + + protected override string OperationType => OperationTypes.Step; + + protected override Task StartAsync(CancellationToken cancellationToken) + => ExecuteIteration(_config.InitialState, attemptNumber: 1, cancellationToken); + + protected override Task ReplayAsync(Operation existing, CancellationToken cancellationToken) + { + switch (existing.Status) + { + case OperationStatuses.Succeeded: + // Polling concluded on a previous invocation; return the + // cached final state without re-running the check. + return Task.FromResult(DeserializeState(existing.StepDetails?.Result)); + + case OperationStatuses.Failed: + throw BuildFailureException(existing); + + case OperationStatuses.Pending: + return ReplayPending(existing, cancellationToken); + + case OperationStatuses.Ready: + return ReplayReady(existing, cancellationToken); + + case OperationStatuses.Started: + // START emitted but no RETRY/SUCCEED yet — the very first + // check attempt was lost. Re-execute with InitialState. Do + // NOT re-emit START (the original is authoritative). + return ExecuteIteration(_config.InitialState, attemptNumber: 1, cancellationToken); + + default: + throw new NonDeterministicExecutionException( + $"WaitForCondition operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay."); + } + } + + /// + /// PENDING means the prior iteration emitted RETRY and the service + /// scheduled a timer. If the timer hasn't fired we re-suspend; once it + /// fires, the next iteration runs against the previously checkpointed + /// state, NOT . + /// + private Task ReplayPending(Operation pending, CancellationToken cancellationToken) + { + var nextAttemptTs = pending.StepDetails?.NextAttemptTimestamp; + if (nextAttemptTs is { } scheduledMs && + DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() < scheduledMs) + { + // Timer still ticking — re-suspend without re-checkpointing. + return Termination.SuspendAndAwait( + TerminationReason.RetryScheduled, $"wait_for_condition:{Name ?? OperationId}"); + } + + var priorState = DeserializeStateOrInitial(pending.StepDetails?.Result); + var attemptNumber = (pending.StepDetails?.Attempt ?? 0) + 1; + return ExecuteIteration(priorState, attemptNumber, cancellationToken); + } + + /// + /// READY means the service has re-invoked us post-PENDING — the next + /// poll is up. Read the latest state from the prior RETRY's payload + /// and advance the attempt counter. + /// + private Task ReplayReady(Operation ready, CancellationToken cancellationToken) + { + var priorState = DeserializeStateOrInitial(ready.StepDetails?.Result); + var attemptNumber = (ready.StepDetails?.Attempt ?? 0) + 1; + return ExecuteIteration(priorState, attemptNumber, cancellationToken); + } + + private async Task ExecuteIteration(TState currentState, int attemptNumber, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + // Emit START on the very first attempt only — and sync-flush so the + // service has a record of the polling op even if the check function + // drives termination via, e.g., a wait inside it. Subsequent + // iterations resume from a RETRY/READY/PENDING checkpoint and skip + // START. + if (State.GetOperation(OperationId) == null) + { + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + Type = OperationTypes.Step, + Action = "START", + SubType = OperationSubTypes.WaitForCondition, + Name = Name + }, cancellationToken); + } + + TState newState; + try + { + var checkContext = new ConditionCheckContext(attemptNumber, _logger); + newState = await _check(currentState, checkContext); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch (Exception ex) + { + // The check threw. WaitForCondition has no per-exception retry + // strategy (Python/JS/Java SDKs all treat check failure as terminal), + // so checkpoint FAIL and surface the original exception via + // StepException — same shape as StepOperation's terminal failure. + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + Type = OperationTypes.Step, + Action = "FAIL", + SubType = OperationSubTypes.WaitForCondition, + Name = Name, + Error = ToSdkError(ex) + }, cancellationToken); + + throw new StepException(ex.Message, ex) + { + ErrorType = ex.GetType().FullName + }; + } + + WaitDecision decision; + try + { + decision = _config.WaitStrategy.Decide(newState, attemptNumber); + } + catch (WaitForConditionException maxEx) + { + // Strategy is signaling max-attempts reached. The strategy + // didn't have access to LastState; we do — populate it now, + // checkpoint FAIL, and rethrow. + var enriched = new WaitForConditionException( + $"WaitForCondition '{Name ?? OperationId}' exhausted {attemptNumber} attempts without the condition being met.", + maxEx) + { + AttemptsExhausted = attemptNumber, + LastState = newState + }; + + // Persist the last observed state in the FAIL Payload so a replay + // that hits this cached FAIL can reconstruct LastState identically + // to the live throw. Without this, replay surfaces LastState=null. + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + Type = OperationTypes.Step, + Action = "FAIL", + SubType = OperationSubTypes.WaitForCondition, + Name = Name, + Payload = SerializeState(newState), + Error = new SdkErrorObject + { + ErrorType = typeof(WaitForConditionException).FullName, + ErrorMessage = enriched.Message + } + }, cancellationToken); + + throw enriched; + } + + if (!decision.ShouldContinue) + { + // Stop() means the condition has been met. Persist the final + // state and return it to the caller. + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + Type = OperationTypes.Step, + Action = "SUCCEED", + SubType = OperationSubTypes.WaitForCondition, + Name = Name, + Payload = SerializeState(newState) + }, cancellationToken); + + return newState; + } + + // Continue polling — emit RETRY with the new state in the payload + // and the next-attempt delay in StepOptions. Sync-flush so the + // service definitely has the new state and timer scheduled before + // we suspend. + var delaySeconds = (int)Math.Max(1, Math.Ceiling(decision.Delay.TotalSeconds)); + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + Type = OperationTypes.Step, + Action = "RETRY", + SubType = OperationSubTypes.WaitForCondition, + Name = Name, + Payload = SerializeState(newState), + StepOptions = new SdkStepOptions { NextAttemptDelaySeconds = delaySeconds } + }, cancellationToken); + + return await Termination.SuspendAndAwait( + TerminationReason.RetryScheduled, $"wait_for_condition:{Name ?? OperationId}"); + } + + private TState DeserializeState(string? serialized) + { + if (serialized == null) return default!; + return _serializer.Deserialize(serialized, new SerializationContext(OperationId, DurableExecutionArn)); + } + + private TState DeserializeStateOrInitial(string? serialized) + { + if (serialized == null) return _config.InitialState; + try + { + return _serializer.Deserialize(serialized, new SerializationContext(OperationId, DurableExecutionArn)); + } + catch (Exception ex) + { + // If the serializer can't read the prior state, fall back to + // InitialState — matches Python's behavior (waits.py:163-169). + // Log a warning so corrupted payloads / schema migrations are + // observable instead of silently restarting the polling loop. + _logger.LogWarning( + "WaitForCondition operation '{OperationId}' failed to deserialize prior state ({ExceptionType}: {Message}); falling back to InitialState.", + OperationId, ex.GetType().FullName, ex.Message); + return _config.InitialState; + } + } + + private string SerializeState(TState value) + => _serializer.Serialize(value, new SerializationContext(OperationId, DurableExecutionArn)); + + private Exception BuildFailureException(Operation failedOp) + { + var err = failedOp.StepDetails?.Error; + // Distinguish "max attempts exhausted" (we recorded the type as + // WaitForConditionException above) from "check function threw" + // (recorded as the original exception type via StepException). + if (err?.ErrorType == typeof(WaitForConditionException).FullName) + { + // Recover LastState from the FAIL checkpoint's payload. Live + // execution serializes the most recent state alongside the + // error so replay surfaces an identically-populated exception. + // Falls back to null when the payload is absent (legacy data + // pre-dating this serialization) or unreadable. + object? lastState = null; + var payload = failedOp.StepDetails?.Result; + if (payload != null) + { + try + { + lastState = _serializer.Deserialize(payload, new SerializationContext(OperationId, DurableExecutionArn)); + } + catch (Exception deserEx) + { + _logger.LogWarning( + "WaitForCondition operation '{OperationId}' failed to deserialize LastState from FAIL checkpoint payload ({ExceptionType}: {Message}); LastState will be null on the rethrown exception.", + OperationId, deserEx.GetType().FullName, deserEx.Message); + } + } + + return new WaitForConditionException(err?.ErrorMessage ?? $"WaitForCondition '{Name ?? OperationId}' exhausted attempts.") + { + AttemptsExhausted = failedOp.StepDetails?.Attempt ?? 0, + LastState = lastState + }; + } + + return new StepException(err?.ErrorMessage ?? "WaitForCondition check function failed") + { + ErrorType = err?.ErrorType, + ErrorData = err?.ErrorData, + OriginalStackTrace = err?.StackTrace + }; + } + + private static SdkErrorObject ToSdkError(Exception ex) => new() + { + ErrorType = ex.GetType().FullName, + ErrorMessage = ex.Message, + StackTrace = ex.StackTrace?.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries).ToList() + }; +} + +/// +/// Internal implementation of . +/// +internal sealed class ConditionCheckContext : IConditionCheckContext +{ + public ConditionCheckContext(int attemptNumber, ILogger logger) + { + AttemptNumber = attemptNumber; + Logger = logger; + } + + public ILogger Logger { get; } + public int AttemptNumber { get; } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionExponentialFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionExponentialFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionExponentialFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionExponentialFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionExponentialFunction/Function.cs new file mode 100644 index 000000000..d73161e60 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionExponentialFunction/Function.cs @@ -0,0 +1,66 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + // Exponential strategy with no jitter so the timing is predictable. + // Done flips on attempt 3 (1-based). With initialDelay=1s, + // backoffRate=1.5, maxDelay=4s, no jitter: delays are 1s, 1.5s + // (which the SDK ceilings to 2s due to 1s timer granularity). + var finalState = await context.WaitForConditionAsync( + check: async (state, ctx) => + { + await Task.CompletedTask; + var done = ctx.AttemptNumber >= 3; + return new State(done, ctx.AttemptNumber); + }, + config: new WaitForConditionConfig + { + InitialState = new State(false, 0), + WaitStrategy = WaitStrategy.Exponential( + maxAttempts: 5, + initialDelay: TimeSpan.FromSeconds(1), + maxDelay: TimeSpan.FromSeconds(4), + backoffRate: 1.5, + jitter: JitterStrategy.None, + isDone: s => s.Done) + }, + name: "exp_poll"); + + return new TestResult + { + Status = "completed", + AttemptsTaken = finalState.AttemptNumber, + Done = finalState.Done + }; + } +} + +public record State(bool Done, int AttemptNumber); + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult +{ + public string? Status { get; set; } + public int AttemptsTaken { get; set; } + public bool Done { get; set; } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionExponentialFunction/WaitForConditionExponentialFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionExponentialFunction/WaitForConditionExponentialFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionExponentialFunction/WaitForConditionExponentialFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionHappyPathFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionHappyPathFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionHappyPathFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionHappyPathFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionHappyPathFunction/Function.cs new file mode 100644 index 000000000..086eb6bba --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionHappyPathFunction/Function.cs @@ -0,0 +1,61 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + // Counter increments every poll. isDone fires once it hits 3. + // Each poll iteration is a separate Lambda invocation; the state is + // carried across iterations via the RETRY checkpoint payload. + var finalState = await context.WaitForConditionAsync( + check: async (state, ctx) => + { + await Task.CompletedTask; + return new State(state.Counter + 1, ctx.AttemptNumber); + }, + config: new WaitForConditionConfig + { + InitialState = new State(0, 0), + WaitStrategy = WaitStrategy.Fixed( + delay: TimeSpan.FromSeconds(2), + maxAttempts: 10, + isDone: s => s.Counter >= 3) + }, + name: "happy_poll"); + + return new TestResult + { + Status = "completed", + Counter = finalState.Counter, + AttemptsTaken = finalState.AttemptNumber + }; + } +} + +public record State(int Counter, int AttemptNumber); + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult +{ + public string? Status { get; set; } + public int Counter { get; set; } + public int AttemptsTaken { get; set; } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionHappyPathFunction/WaitForConditionHappyPathFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionHappyPathFunction/WaitForConditionHappyPathFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionHappyPathFunction/WaitForConditionHappyPathFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionMaxAttemptsFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionMaxAttemptsFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionMaxAttemptsFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionMaxAttemptsFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionMaxAttemptsFunction/Function.cs new file mode 100644 index 000000000..8f631fe86 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionMaxAttemptsFunction/Function.cs @@ -0,0 +1,62 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + // Condition is never satisfied (isDone is always false), so the + // strategy will eventually exhaust maxAttempts and the operation will + // throw WaitForConditionException. The workflow catches it and + // surfaces AttemptsExhausted in the result so the test can assert on + // it without inspecting the FAILED status. + try + { + await context.WaitForConditionAsync( + check: async (state, _) => + { + await Task.CompletedTask; + return state + 1; + }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed( + delay: TimeSpan.FromSeconds(1), + maxAttempts: 3, + isDone: _ => false) + }, + name: "exhausting_poll"); + + return new TestResult { Status = "should_not_reach", AttemptsExhausted = -1 }; + } + catch (WaitForConditionException ex) + { + return new TestResult { Status = "exhausted", AttemptsExhausted = ex.AttemptsExhausted }; + } + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult +{ + public string? Status { get; set; } + public int AttemptsExhausted { get; set; } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionMaxAttemptsFunction/WaitForConditionMaxAttemptsFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionMaxAttemptsFunction/WaitForConditionMaxAttemptsFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionMaxAttemptsFunction/WaitForConditionMaxAttemptsFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionReplayDeterminismFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionReplayDeterminismFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionReplayDeterminismFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionReplayDeterminismFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionReplayDeterminismFunction/Function.cs new file mode 100644 index 000000000..6300bb6fe --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionReplayDeterminismFunction/Function.cs @@ -0,0 +1,63 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + // Step 1: capture a fresh value. On replay this MUST return the + // checkpointed value rather than re-executing. + var generatedId = await context.StepAsync( + async (_) => { await Task.CompletedTask; return Guid.NewGuid().ToString(); }, + name: "before_poll"); + + // Wait-for-condition with 3 polls. Each poll iteration is a separate + // invocation, and the operation's deterministic ID + RETRY-payload + // state must round-trip across re-invocations. + var pollResult = await context.WaitForConditionAsync( + check: async (state, ctx) => + { + await Task.CompletedTask; + return new Counter(state.Count + 1); + }, + config: new WaitForConditionConfig + { + InitialState = new Counter(0), + WaitStrategy = WaitStrategy.Fixed( + delay: TimeSpan.FromSeconds(2), + maxAttempts: 10, + isDone: c => c.Count >= 3) + }, + name: "determinism_poll"); + + // Step 2: echo the generated ID. After replay, this should see the + // SAME GUID from step 1 — proves replay returned the cached value. + var echoed = await context.StepAsync( + async (_) => { await Task.CompletedTask; return $"echo:{generatedId}:{pollResult.Count}"; }, + name: "after_poll"); + + return new TestResult { Status = "completed", Data = echoed }; + } +} + +public record Counter(int Count); + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult { public string? Status { get; set; } public string? Data { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionReplayDeterminismFunction/WaitForConditionReplayDeterminismFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionReplayDeterminismFunction/WaitForConditionReplayDeterminismFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionReplayDeterminismFunction/WaitForConditionReplayDeterminismFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionUserCheckThrowsFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionUserCheckThrowsFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionUserCheckThrowsFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionUserCheckThrowsFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionUserCheckThrowsFunction/Function.cs new file mode 100644 index 000000000..404114dc4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionUserCheckThrowsFunction/Function.cs @@ -0,0 +1,66 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + // The check function throws on attempt 2. Per the WaitForCondition + // contract, the check-thrown exception is checkpointed as FAIL and + // surfaced through the SDK as a StepException carrying the original + // exception type ("System.InvalidOperationException"). The workflow + // catches it and reports the captured ErrorType so the test can assert + // without requiring the workflow to FAIL outright. + try + { + await context.WaitForConditionAsync( + check: async (state, ctx) => + { + await Task.CompletedTask; + if (ctx.AttemptNumber == 2) + throw new InvalidOperationException("intentional check failure on attempt 2"); + return state + 1; + }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed( + delay: TimeSpan.FromSeconds(1), + maxAttempts: 10, + isDone: _ => false) + }, + name: "throwing_poll"); + + return new TestResult { Status = "should_not_reach", ErrorType = null }; + } + catch (StepException ex) + { + return new TestResult { Status = "caught_step_exception", ErrorType = ex.ErrorType, ErrorMessage = ex.Message }; + } + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult +{ + public string? Status { get; set; } + public string? ErrorType { get; set; } + public string? ErrorMessage { get; set; } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionUserCheckThrowsFunction/WaitForConditionUserCheckThrowsFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionUserCheckThrowsFunction/WaitForConditionUserCheckThrowsFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionUserCheckThrowsFunction/WaitForConditionUserCheckThrowsFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionExponentialTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionExponentialTest.cs new file mode 100644 index 000000000..0bd6b845a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionExponentialTest.cs @@ -0,0 +1,66 @@ +using System.Linq; +using System.Text; +using System.Text.Json; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class WaitForConditionExponentialTest +{ + private readonly ITestOutputHelper _output; + public WaitForConditionExponentialTest(ITestOutputHelper output) => _output = output; + + /// + /// End-to-end exponential-backoff polling. The check function flips + /// Done on attempt 3, so the strategy stops after exactly 3 + /// iterations. Validates that the service honors the per-iteration delay + /// (which grows with each retry) without any in-process Thread.Sleep. + /// Timing is asserted loosely because the service's scheduling latency + /// dominates short delays — we only require the gap to be at least the + /// configured floor. + /// + [Fact] + public async Task WaitForCondition_ExponentialBackoff_CompletesOnExpectedAttempt() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("WaitForConditionExponentialFunction"), + "wfcexp", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "wfc-exp"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Response: {responsePayload}"); + + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + // Total expected wall time: 1s + 2s of timer = ~3s + execution + // overhead. Allow generous headroom for service scheduling latency. + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Any(e => e.StepSucceededDetails != null && e.Name == "exp_poll") ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + var succeeded = events.SingleOrDefault(e => e.StepSucceededDetails != null && e.Name == "exp_poll"); + Assert.NotNull(succeeded); + + var finalPayload = succeeded!.StepSucceededDetails.Result?.Payload; + Assert.False(string.IsNullOrEmpty(finalPayload)); + + using var doc = JsonDocument.Parse(finalPayload!); + Assert.True(doc.RootElement.GetProperty("Done").GetBoolean()); + Assert.Equal(3, doc.RootElement.GetProperty("AttemptNumber").GetInt32()); + + // The polling caused real suspend/resume cycles — at least 3 + // invocations (one per attempt). + var invocations = events.Where(e => e.InvocationCompletedDetails != null).ToList(); + Assert.True( + invocations.Count >= 3, + $"Expected at least 3 InvocationCompleted events (one per poll), got {invocations.Count}"); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionHappyPathTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionHappyPathTest.cs new file mode 100644 index 000000000..0c06a88f9 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionHappyPathTest.cs @@ -0,0 +1,73 @@ +using System.Linq; +using System.Text; +using System.Text.Json; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class WaitForConditionHappyPathTest +{ + private readonly ITestOutputHelper _output; + public WaitForConditionHappyPathTest(ITestOutputHelper output) => _output = output; + + /// + /// End-to-end happy-path polling. The check function increments a counter + /// every iteration; the strategy's isDone predicate fires once the counter + /// hits 3. Validates that the service honors the RETRY-with-delay pattern, + /// re-invokes the Lambda for each poll iteration, and that state survives + /// across re-invocations via the RETRY payload — none of which the unit + /// tests can prove (they fake state transitions in-memory). + /// + [Fact] + public async Task WaitForCondition_PollsUntilConditionMet() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("WaitForConditionHappyPathFunction"), + "wfchappy", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "wfc-happy"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Response: {responsePayload}"); + + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + // Total expected wall time: 3 attempts with ~2s delay between them = + // ~4s of timer + execution overhead. Allow generous headroom. + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Any(e => e.StepSucceededDetails != null) ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + // Exactly one START emitted on the first iteration (subsequent + // iterations resume from a RETRY checkpoint and skip START). + Assert.Equal(1, events.Count(e => e.EventType == EventType.StepStarted && e.Name == "happy_poll")); + + // Final SUCCEED carries the terminal state. The polling op was + // checkpointed as a STEP+SubType=WaitForCondition; the success event + // therefore appears as a StepSucceeded. + var succeeded = events.SingleOrDefault(e => e.StepSucceededDetails != null && e.Name == "happy_poll"); + Assert.NotNull(succeeded); + + var finalPayload = succeeded!.StepSucceededDetails.Result?.Payload; + Assert.False(string.IsNullOrEmpty(finalPayload), + "final SUCCEED payload should carry the terminal state"); + + using var doc = JsonDocument.Parse(finalPayload!); + Assert.Equal(3, doc.RootElement.GetProperty("Counter").GetInt32()); + Assert.Equal(3, doc.RootElement.GetProperty("AttemptNumber").GetInt32()); + + // The polling actually caused suspend/resume cycles — at least one + // invocation per iteration (3 polls = 3+ invocations). + var invocations = events.Where(e => e.InvocationCompletedDetails != null).ToList(); + Assert.True( + invocations.Count >= 3, + $"Expected at least 3 InvocationCompleted events (one per poll iteration), got {invocations.Count}"); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionMaxAttemptsTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionMaxAttemptsTest.cs new file mode 100644 index 000000000..3e65a48a1 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionMaxAttemptsTest.cs @@ -0,0 +1,68 @@ +using System.Linq; +using System.Text; +using System.Text.Json; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class WaitForConditionMaxAttemptsTest +{ + private readonly ITestOutputHelper _output; + public WaitForConditionMaxAttemptsTest(ITestOutputHelper output) => _output = output; + + /// + /// Validates that when the strategy's max-attempts limit is reached + /// without isDone being satisfied, the operation throws + /// with the correct + /// AttemptsExhausted count, and the FAILED checkpoint records the + /// exception type. The workflow catches the exception and returns the + /// count, so we expect the workflow itself to SUCCEED. + /// + [Fact] + public async Task WaitForCondition_MaxAttemptsExhausted_ThrowsWithCount() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("WaitForConditionMaxAttemptsFunction"), + "wfcmax", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "wfc-max"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Response: {responsePayload}"); + + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + // 3 attempts at ~1s delay between them = ~2s of timer + execution + // overhead. Allow generous headroom. + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + // The workflow caught the WaitForConditionException and returned a + // result containing AttemptsExhausted. Verify the final payload from + // the workflow itself (parsed from the GetExecution response). + var execution = await deployment.GetExecutionAsync(arn!); + var resultPayload = execution.Result; + Assert.False(string.IsNullOrEmpty(resultPayload), + "workflow result payload should be present"); + + using var doc = JsonDocument.Parse(resultPayload!); + Assert.Equal("exhausted", doc.RootElement.GetProperty("Status").GetString()); + // The exact attempts count is 3 — strategy maxAttempts. + Assert.Equal(3, doc.RootElement.GetProperty("AttemptsExhausted").GetInt32()); + + // Verify the operation itself was checkpointed as FAILED with the + // WaitForConditionException type, even though the workflow recovers. + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Any(e => e.StepFailedDetails != null && e.Name == "exhausting_poll") ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + var stepFailed = events.FirstOrDefault(e => e.StepFailedDetails != null && e.Name == "exhausting_poll"); + Assert.NotNull(stepFailed); + Assert.Contains("WaitForConditionException", + stepFailed!.StepFailedDetails.Error?.Payload?.ErrorType ?? string.Empty); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionReplayDeterminismTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionReplayDeterminismTest.cs new file mode 100644 index 000000000..8e10152e2 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionReplayDeterminismTest.cs @@ -0,0 +1,89 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class WaitForConditionReplayDeterminismTest +{ + private readonly ITestOutputHelper _output; + public WaitForConditionReplayDeterminismTest(ITestOutputHelper output) => _output = output; + + /// + /// End-to-end replay-determinism check for a step + wait-for-condition + + /// step workflow. The wait-for-condition triggers multiple suspend/resume + /// cycles (one per polling iteration), so the surrounding steps are + /// replayed multiple times. Verifies that: + /// 1. The leading step is re-replayed (not re-executed) across all + /// iterations — its checkpointed GUID flows through to the trailing + /// step regardless of how many polling iterations happen. + /// 2. The wait-for-condition operation is checkpointed exactly once + /// (one StepStarted), with one terminal SUCCEED carrying the final + /// counter state. + /// 3. Multiple invocations were recorded (proves real replay happened). + /// + [Fact] + public async Task WaitForCondition_ReplayPreservesIdentityAndState() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("WaitForConditionReplayDeterminismFunction"), + "wfcrep", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "wfc-replay"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Response: {responsePayload}"); + + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + // 3 polls with ~2s delay = ~4s of timer + 2 step invocations. + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + // History is eventually consistent — wait until both step-succeeded + // AND the polling op-succeeded events are visible. + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 3, + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + // Each named step / polling op started exactly once. The leading and + // trailing steps each have one StepStarted; the polling op also has + // one (sub-iterations replay from RETRY/READY/PENDING and skip START). + Assert.Single(events.Where(e => e.EventType == EventType.StepStarted && e.Name == "before_poll")); + Assert.Single(events.Where(e => e.EventType == EventType.StepStarted && e.Name == "after_poll")); + Assert.Single(events.Where(e => e.EventType == EventType.StepStarted && e.Name == "determinism_poll")); + + // Each name SUCCEEDed exactly once — replay returned the cached + // values rather than re-executing. + var stepSucceededEvents = events.Where(e => e.StepSucceededDetails != null).ToList(); + Assert.Single(stepSucceededEvents.Where(e => e.Name == "before_poll")); + Assert.Single(stepSucceededEvents.Where(e => e.Name == "after_poll")); + Assert.Single(stepSucceededEvents.Where(e => e.Name == "determinism_poll")); + + // Verify the trailing step received the GUID from the leading step + // verbatim, AND the final counter — proves the cached step value and + // the WaitForCondition's terminal payload both round-tripped through + // replay. + var beforeEvent = stepSucceededEvents.First(e => e.Name == "before_poll"); + var afterEvent = stepSucceededEvents.First(e => e.Name == "after_poll"); + var generatedGuid = beforeEvent.StepSucceededDetails.Result?.Payload?.Trim('"'); + var echoedResult = afterEvent.StepSucceededDetails.Result?.Payload?.Trim('"'); + Assert.NotNull(generatedGuid); + Assert.NotNull(echoedResult); + Assert.True(Guid.TryParse(generatedGuid, out _), + $"before_poll should produce a valid GUID, got: {generatedGuid}"); + Assert.Equal($"echo:{generatedGuid}:3", echoedResult); + + // The wait-for-condition truly drove suspend/resume — one invocation + // per poll iteration plus one for the final continuation. With 3 + // polls we expect at least 3 InvocationCompleted events. + var invocations = events.Where(e => e.InvocationCompletedDetails != null).ToList(); + Assert.True( + invocations.Count >= 3, + $"Expected at least 3 InvocationCompleted events (one per poll iteration), got {invocations.Count}"); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionUserCheckThrowsTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionUserCheckThrowsTest.cs new file mode 100644 index 000000000..7da2ba87f --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForConditionUserCheckThrowsTest.cs @@ -0,0 +1,71 @@ +using System.Linq; +using System.Text; +using System.Text.Json; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class WaitForConditionUserCheckThrowsTest +{ + private readonly ITestOutputHelper _output; + public WaitForConditionUserCheckThrowsTest(ITestOutputHelper output) => _output = output; + + /// + /// Validates the user-check-throws path: when the check function throws + /// on a polling iteration, the operation checkpoints FAIL with the + /// original exception type and the SDK surfaces a + /// carrying that ErrorType. Mirrors the unit test + /// WaitForConditionOperationTests.CheckThrows_CheckpointsFailAndThrows. + /// + [Fact] + public async Task WaitForCondition_UserCheckThrows_SurfacesAsStepException() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("WaitForConditionUserCheckThrowsFunction"), + "wfcthrow", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "wfc-throw"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Response: {responsePayload}"); + + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + // Attempt 1 succeeds (returns state+1=1), strategy schedules ~1s + // delay, then attempt 2 throws. ~2s of timer + execution overhead. + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + // The workflow caught the StepException. Verify it captured the + // expected error type via the workflow's returned payload. + var execution = await deployment.GetExecutionAsync(arn!); + var resultPayload = execution.Result; + Assert.False(string.IsNullOrEmpty(resultPayload), + "workflow result payload should be present"); + + using var doc = JsonDocument.Parse(resultPayload!); + Assert.Equal("caught_step_exception", doc.RootElement.GetProperty("Status").GetString()); + Assert.Equal("System.InvalidOperationException", + doc.RootElement.GetProperty("ErrorType").GetString()); + Assert.Contains("intentional check failure", + doc.RootElement.GetProperty("ErrorMessage").GetString() ?? string.Empty); + + // Verify the polling op itself was checkpointed as FAILED with the + // original exception type (NOT WaitForConditionException — that's + // reserved for max-attempts exhaustion). + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Any(e => e.StepFailedDetails != null && e.Name == "throwing_poll") ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + var stepFailed = events.FirstOrDefault(e => e.StepFailedDetails != null && e.Name == "throwing_poll"); + Assert.NotNull(stepFailed); + Assert.Equal("System.InvalidOperationException", + stepFailed!.StepFailedDetails.Error?.Payload?.ErrorType); + Assert.Contains("intentional check failure", + stepFailed.StepFailedDetails.Error?.Payload?.ErrorMessage ?? string.Empty); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForConditionOperationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForConditionOperationTests.cs new file mode 100644 index 000000000..5aaa5b51c --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForConditionOperationTests.cs @@ -0,0 +1,1049 @@ +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.DurableExecution.Internal; +using Amazon.Lambda.TestUtilities; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Xunit; + +namespace Amazon.Lambda.DurableExecution.Tests; + +public class WaitForConditionOperationTests +{ + /// Reproduces the Id that emits for the n-th root-level operation. + private static string IdAt(int position) => OperationIdGenerator.HashOperationId(position.ToString()); + + private static (DurableContext context, RecordingBatcher recorder, TerminationManager tm, ExecutionState state) + CreateContext(InitialExecutionState? initialState = null) + { + var state = new ExecutionState(); + state.LoadFromCheckpoint(initialState); + var tm = new TerminationManager(); + var idGen = new OperationIdGenerator(); + var lambdaContext = new TestLambdaContext(); + var recorder = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + return (context, recorder, tm, state); + } + + // ── Fresh execution ───────────────────────────────────────────────── + + [Fact] + public async Task FreshExecution_StrategyStopsImmediately_SucceedsWithFinalState() + { + var (context, recorder, tm, _) = CreateContext(); + + // The check function "advances" the state to 42; the strategy's + // isDone predicate matches immediately. This exercises the synchronous + // success path with no polling iterations. + int checkInvocations = 0; + var result = await context.WaitForConditionAsync( + check: async (state, ctx) => + { + checkInvocations++; + Assert.Equal(checkInvocations, ctx.AttemptNumber); + await Task.CompletedTask; + return 42; + }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Exponential(isDone: s => s == 42) + }, + name: "poll"); + + Assert.Equal(42, result); + Assert.Equal(1, checkInvocations); + Assert.False(tm.IsTerminated); + + await recorder.Batcher.DrainAsync(); + + var actions = recorder.Flushed.Select(o => $"{o.Type}:{o.Action}").ToArray(); + Assert.Equal(new[] { "STEP:START", "STEP:SUCCEED" }, actions); + + var succeed = recorder.Flushed.Single(o => o.Action == "SUCCEED"); + Assert.Equal(IdAt(1), succeed.Id); + Assert.Equal("WaitForCondition", succeed.SubType); + Assert.Equal("poll", succeed.Name); + Assert.Equal("42", succeed.Payload); + } + + [Fact] + public async Task FreshExecution_StrategyContinues_EmitsRetryAndSuspends() + { + var (context, recorder, tm, _) = CreateContext(); + + // Strategy says continue → operation must emit RETRY and suspend. + var task = context.WaitForConditionAsync( + check: async (state, _) => { await Task.CompletedTask; return state + 1; }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(3), maxAttempts: 10) + }, + name: "poll"); + + await Task.Delay(50); + + Assert.True(tm.IsTerminated); + Assert.False(task.IsCompleted); + + await recorder.Batcher.DrainAsync(); + + var actions = recorder.Flushed.Select(o => $"{o.Type}:{o.Action}").ToArray(); + Assert.Equal(new[] { "STEP:START", "STEP:RETRY" }, actions); + + var retry = recorder.Flushed.Single(o => o.Action == "RETRY"); + Assert.Equal("WaitForCondition", retry.SubType); + Assert.Equal("1", retry.Payload); // state advanced to 1 + Assert.NotNull(retry.StepOptions); + Assert.Equal(3, retry.StepOptions.NextAttemptDelaySeconds); + } + + [Fact] + public async Task FreshExecution_UsesInitialStateOnFirstCall() + { + var (context, _, _, _) = CreateContext(); + + int? observedInitial = null; + await context.WaitForConditionAsync( + check: async (state, _) => + { + observedInitial ??= state; + await Task.CompletedTask; + return state; + }, + config: new WaitForConditionConfig + { + InitialState = 99, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1), maxAttempts: 10, isDone: _ => true) + }, + name: "poll"); + + Assert.Equal(99, observedInitial); + } + + [Fact] + public async Task FreshExecution_AttemptNumberIs1OnFirstCall() + { + var (context, _, _, _) = CreateContext(); + + int observed = -1; + await context.WaitForConditionAsync( + check: async (state, ctx) => + { + observed = ctx.AttemptNumber; + await Task.CompletedTask; + return state; + }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1), maxAttempts: 5, isDone: _ => true) + }); + + Assert.Equal(1, observed); + } + + // ── Replay paths ──────────────────────────────────────────────────── + + [Fact] + public async Task Replay_Succeeded_ReturnsCachedAndSkipsCheck() + { + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + SubType = OperationSubTypes.WaitForCondition, + Status = OperationStatuses.Succeeded, + Name = "poll", + StepDetails = new StepDetails { Result = "7" } + } + } + }); + + var checkInvoked = false; + var result = await context.WaitForConditionAsync( + check: async (_, _) => { checkInvoked = true; await Task.CompletedTask; return 0; }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1)) + }, + name: "poll"); + + Assert.False(checkInvoked); + Assert.Equal(7, result); + + await recorder.Batcher.DrainAsync(); + Assert.Empty(recorder.Flushed); + } + + [Fact] + public async Task Replay_PendingTimerNotFired_ReSuspends() + { + // NextAttemptTimestamp 1 hour in the future → timer hasn't fired, + // operation must re-suspend without re-checkpointing or re-running. + var futureMs = DateTimeOffset.UtcNow.AddHours(1).ToUnixTimeMilliseconds(); + + var (context, recorder, tm, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + SubType = OperationSubTypes.WaitForCondition, + Status = OperationStatuses.Pending, + Name = "poll", + StepDetails = new StepDetails + { + Result = "5", + Attempt = 2, + NextAttemptTimestamp = futureMs + } + } + } + }); + + var checkInvoked = false; + var task = context.WaitForConditionAsync( + check: async (_, _) => { checkInvoked = true; await Task.CompletedTask; return 0; }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1)) + }, + name: "poll"); + + await Task.Delay(50); + + Assert.False(checkInvoked); + Assert.True(tm.IsTerminated); + Assert.False(task.IsCompleted); + + await recorder.Batcher.DrainAsync(); + Assert.Empty(recorder.Flushed); + } + + [Fact] + public async Task Replay_PendingTimerFired_ResumesWithCheckpointedState() + { + // NextAttemptTimestamp 1 hour in the past → timer fired (service + // hasn't yet stamped READY but the deadline is met). Continue. + var pastMs = DateTimeOffset.UtcNow.AddHours(-1).ToUnixTimeMilliseconds(); + + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + SubType = OperationSubTypes.WaitForCondition, + Status = OperationStatuses.Pending, + Name = "poll", + StepDetails = new StepDetails + { + Result = "5", + Attempt = 2, + NextAttemptTimestamp = pastMs + } + } + } + }); + + int? observedState = null; + int? observedAttempt = null; + var result = await context.WaitForConditionAsync( + check: async (state, ctx) => + { + observedState = state; + observedAttempt = ctx.AttemptNumber; + await Task.CompletedTask; + return state; // condition met (isDone returns true) + }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1), isDone: _ => true) + }, + name: "poll"); + + // Critical: state survives across iterations. Check receives the + // PRIOR state (5, from the prior RETRY's payload), not InitialState (0). + Assert.Equal(5, observedState); + Assert.Equal(3, observedAttempt); // prior attempt was 2, this is attempt 3 + Assert.Equal(5, result); + + await recorder.Batcher.DrainAsync(); + + // No new START — original is authoritative. + Assert.DoesNotContain(recorder.Flushed, o => o.Action == "START"); + Assert.Contains(recorder.Flushed, o => o.Action == "SUCCEED"); + } + + [Fact] + public async Task Replay_Ready_ResumesWithCheckpointedState() + { + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + SubType = OperationSubTypes.WaitForCondition, + Status = OperationStatuses.Ready, + Name = "poll", + StepDetails = new StepDetails + { + Result = "11", + Attempt = 3 + } + } + } + }); + + int? observedState = null; + int? observedAttempt = null; + var result = await context.WaitForConditionAsync( + check: async (state, ctx) => + { + observedState = state; + observedAttempt = ctx.AttemptNumber; + await Task.CompletedTask; + return state * 2; + }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1), isDone: _ => true) + }, + name: "poll"); + + Assert.Equal(11, observedState); + Assert.Equal(4, observedAttempt); // prior=3 → next=4 + Assert.Equal(22, result); + + await recorder.Batcher.DrainAsync(); + Assert.DoesNotContain(recorder.Flushed, o => o.Action == "START"); + } + + [Fact] + public async Task Replay_Started_ResumesWithInitialState() + { + // STARTED with no payload means the very first check attempt was + // lost (Lambda crash before RETRY/SUCCEED). Re-execute with + // InitialState since no prior state is available. + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + SubType = OperationSubTypes.WaitForCondition, + Status = OperationStatuses.Started, + Name = "poll" + } + } + }); + + int? observedState = null; + int? observedAttempt = null; + var result = await context.WaitForConditionAsync( + check: async (state, ctx) => + { + observedState = state; + observedAttempt = ctx.AttemptNumber; + await Task.CompletedTask; + return state + 100; + }, + config: new WaitForConditionConfig + { + InitialState = 50, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1), isDone: _ => true) + }, + name: "poll"); + + Assert.Equal(50, observedState); // InitialState is the seed + Assert.Equal(1, observedAttempt); + Assert.Equal(150, result); + + await recorder.Batcher.DrainAsync(); + // Do NOT re-emit START on STARTED replay. + Assert.DoesNotContain(recorder.Flushed, o => o.Action == "START"); + } + + [Fact] + public async Task Replay_Failed_FromCheckException_ThrowsStepException() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + SubType = OperationSubTypes.WaitForCondition, + Status = OperationStatuses.Failed, + Name = "poll", + StepDetails = new StepDetails + { + Error = new ErrorObject + { + ErrorType = "System.InvalidOperationException", + ErrorMessage = "check went wrong", + StackTrace = new[] { "at A.B()" } + } + } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.WaitForConditionAsync( + check: async (_, _) => { await Task.CompletedTask; return 0; }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1)) + }, + name: "poll")); + + Assert.Equal("check went wrong", ex.Message); + Assert.Equal("System.InvalidOperationException", ex.ErrorType); + } + + [Fact] + public async Task Replay_Failed_FromMaxAttempts_ThrowsWaitForConditionException() + { + // The FAIL checkpoint records the LastState in StepDetails.Result so + // replay can reconstruct an identically-populated exception. Live + // execution sets the same field in MaxAttemptsExhausted_FreshExecution. + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + SubType = OperationSubTypes.WaitForCondition, + Status = OperationStatuses.Failed, + Name = "poll", + StepDetails = new StepDetails + { + Attempt = 3, + Result = "42", + Error = new ErrorObject + { + ErrorType = typeof(WaitForConditionException).FullName, + ErrorMessage = "exhausted" + } + } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.WaitForConditionAsync( + check: async (_, _) => { await Task.CompletedTask; return 0; }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1)) + }, + name: "poll")); + + Assert.Equal(3, ex.AttemptsExhausted); + Assert.Equal("exhausted", ex.Message); + Assert.Equal(42, ex.LastState); // round-tripped from FAIL payload + } + + [Fact] + public async Task Replay_Failed_FromMaxAttempts_LastState_MatchesLiveExecution() + { + // Live execution path: exhaust max-attempts and capture the + // exception's LastState. Then construct a FAIL checkpoint mirroring + // what was written, replay, and assert LastState round-trips. + var (liveCtx, liveRecorder, _, _) = CreateContext(); + + var liveEx = await Assert.ThrowsAsync(() => + liveCtx.WaitForConditionAsync( + check: async (state, _) => { await Task.CompletedTask; return state + 1; }, + config: new WaitForConditionConfig + { + InitialState = 5, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1), maxAttempts: 1) + }, + name: "poll")); + + await liveRecorder.Batcher.DrainAsync(); + var failUpdate = liveRecorder.Flushed.Single(o => o.Action == "FAIL"); + Assert.Equal("6", failUpdate.Payload); // last state was 5+1=6 in the FAIL payload + + // Reconstruct the operation as the service would echo it back on + // replay (Payload → StepDetails.Result; Error → StepDetails.Error). + var (replayCtx, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + SubType = OperationSubTypes.WaitForCondition, + Status = OperationStatuses.Failed, + Name = "poll", + StepDetails = new StepDetails + { + Attempt = liveEx.AttemptsExhausted, + Result = failUpdate.Payload, + Error = new ErrorObject + { + ErrorType = failUpdate.Error?.ErrorType, + ErrorMessage = failUpdate.Error?.ErrorMessage + } + } + } + } + }); + + var replayEx = await Assert.ThrowsAsync(() => + replayCtx.WaitForConditionAsync( + check: async (_, _) => { await Task.CompletedTask; return 0; }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1), maxAttempts: 1) + }, + name: "poll")); + + Assert.Equal(liveEx.AttemptsExhausted, replayEx.AttemptsExhausted); + Assert.NotNull(replayEx.LastState); + Assert.Equal(liveEx.LastState, replayEx.LastState); + } + + [Fact] + public async Task Replay_Failed_FromMaxAttempts_NullPayload_LeavesLastStateNull() + { + // Backwards-compat: a FAIL checkpoint produced before the LastState + // payload was added (or one that lost its payload) should not blow + // up — LastState falls back to null. + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + SubType = OperationSubTypes.WaitForCondition, + Status = OperationStatuses.Failed, + Name = "poll", + StepDetails = new StepDetails + { + Attempt = 2, + // Result intentionally null (legacy FAIL). + Error = new ErrorObject + { + ErrorType = typeof(WaitForConditionException).FullName, + ErrorMessage = "exhausted" + } + } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.WaitForConditionAsync( + check: async (_, _) => { await Task.CompletedTask; return 0; }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1)) + }, + name: "poll")); + + Assert.Equal(2, ex.AttemptsExhausted); + Assert.Null(ex.LastState); + } + + // ── Max attempts exhaustion ───────────────────────────────────────── + + [Fact] + public async Task MaxAttemptsExhausted_FreshExecution_ThrowsWaitForConditionException() + { + var (context, recorder, _, _) = CreateContext(); + + // maxAttempts=1 + isDone always false → strategy stops on attempt 1 + // but it's because the counter is saturated, NOT because the + // condition was met. Operation must throw, not SUCCEED. + var ex = await Assert.ThrowsAsync(() => + context.WaitForConditionAsync( + check: async (state, _) => { await Task.CompletedTask; return state + 1; }, + config: new WaitForConditionConfig + { + InitialState = 5, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1), maxAttempts: 1) + }, + name: "poll")); + + Assert.Equal(1, ex.AttemptsExhausted); + Assert.Equal(6, ex.LastState); // last state observed was 5+1 + + await recorder.Batcher.DrainAsync(); + var actions = recorder.Flushed.Select(o => $"{o.Type}:{o.Action}").ToArray(); + Assert.Equal(new[] { "STEP:START", "STEP:FAIL" }, actions); + + var fail = recorder.Flushed.Single(o => o.Action == "FAIL"); + Assert.Equal("WaitForCondition", fail.SubType); + Assert.NotNull(fail.Error); + Assert.Equal(typeof(WaitForConditionException).FullName, fail.Error.ErrorType); + // LastState round-trips through the FAIL Payload so replay sees + // identical exception state. See Replay_Failed_FromMaxAttempts_LastState_MatchesLiveExecution. + Assert.Equal("6", fail.Payload); + } + + [Fact] + public async Task MaxAttemptsExhausted_DistinguishesFromConditionMet() + { + var (context, _, _, _) = CreateContext(); + + // The same maxAttempts=1 strategy WITH an isDone that's satisfied + // should SUCCEED, not throw. + var result = await context.WaitForConditionAsync( + check: async (_, _) => { await Task.CompletedTask; return 99; }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed( + TimeSpan.FromSeconds(1), + maxAttempts: 1, + isDone: state => state == 99) + }, + name: "poll"); + + Assert.Equal(99, result); + } + + // ── Check function exception ──────────────────────────────────────── + + [Fact] + public async Task CheckThrows_CheckpointsFailAndThrows() + { + var (context, recorder, _, _) = CreateContext(); + + var ex = await Assert.ThrowsAsync(() => + context.WaitForConditionAsync( + check: async (_, _) => { await Task.CompletedTask; throw new InvalidOperationException("boom"); }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1)) + }, + name: "poll")); + + Assert.Equal("boom", ex.Message); + Assert.Equal("System.InvalidOperationException", ex.ErrorType); + + await recorder.Batcher.DrainAsync(); + var actions = recorder.Flushed.Select(o => $"{o.Type}:{o.Action}").ToArray(); + Assert.Equal(new[] { "STEP:START", "STEP:FAIL" }, actions); + + var fail = recorder.Flushed.Single(o => o.Action == "FAIL"); + Assert.Equal("WaitForCondition", fail.SubType); + Assert.Equal("System.InvalidOperationException", fail.Error?.ErrorType); + } + + // ── Replay determinism: state survives iterations ─────────────────── + + [Fact] + public async Task ReplayDeterminism_StateIsCarriedAcrossIterations() + { + // Simulate a multi-iteration history: invocation N had advanced the + // state to {Count=3}; invocation N+1 should pick that up and + // continue from there. + var pastMs = DateTimeOffset.UtcNow.AddSeconds(-1).ToUnixTimeMilliseconds(); + + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + SubType = OperationSubTypes.WaitForCondition, + Status = OperationStatuses.Ready, + Name = "counter", + StepDetails = new StepDetails + { + Result = """{"Count":3}""", + Attempt = 3, + NextAttemptTimestamp = pastMs + } + } + } + }); + + CounterState? observed = null; + int? observedAttempt = null; + var result = await context.WaitForConditionAsync( + check: async (state, ctx) => + { + observed = state; + observedAttempt = ctx.AttemptNumber; + await Task.CompletedTask; + return new CounterState { Count = state.Count + 1 }; + }, + config: new WaitForConditionConfig + { + InitialState = new CounterState { Count = 0 }, // ignored on replay + WaitStrategy = WaitStrategy.Fixed( + TimeSpan.FromSeconds(1), + maxAttempts: 100, + isDone: c => c.Count >= 4) // stop when we hit 4 + }, + name: "counter"); + + // Started from the checkpointed counter=3 (NOT InitialState=0), + // incremented to 4, isDone returned true, returned 4. + Assert.Equal(3, observed?.Count); + Assert.Equal(4, observedAttempt); + Assert.Equal(4, result.Count); + + await recorder.Batcher.DrainAsync(); + var succeed = recorder.Flushed.Single(o => o.Action == "SUCCEED"); + Assert.Equal("""{"Count":4}""", succeed.Payload); + } + + [Fact] + public async Task ReplayDeterminism_RoundTripsThroughCustomSerializer() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + SubType = OperationSubTypes.WaitForCondition, + Status = OperationStatuses.Succeeded, + Name = "poll", + StepDetails = new StepDetails { Result = "Marie,30" } + } + } + }); + + var serializer = new RecordingPersonSerializer(); + var result = await context.WaitForConditionAsync( + check: async (_, _) => { await Task.CompletedTask; return new TestPerson { Name = "ignored", Age = 0 }; }, + config: new WaitForConditionConfig + { + InitialState = new TestPerson { Name = "init", Age = 0 }, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1)) + }, + serializer: serializer, + name: "poll"); + + Assert.True(serializer.DeserializeCalled); + Assert.Equal("Marie", result.Name); + Assert.Equal(30, result.Age); + } + + // ── Sync-flush of START before suspending ─────────────────────────── + + [Fact] + public async Task FreshExecution_FlushesStartBeforeSuspending() + { + // The START checkpoint MUST be persisted before the workflow + // suspends — otherwise the service has no record of the polling op + // and replay can't find it. + var (context, recorder, tm, _) = CreateContext(); + + var task = context.WaitForConditionAsync( + check: async (state, _) => { await Task.CompletedTask; return state + 1; }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(5), maxAttempts: 10) + }, + name: "poll"); + + await Task.Delay(50); + + Assert.True(tm.IsTerminated); + Assert.False(task.IsCompleted); + + // At the moment of suspension, both START and RETRY must already be + // flushed (sync-enqueued ahead of SuspendAndAwait). No drain needed. + var actions = recorder.Flushed.Select(o => $"{o.Type}:{o.Action}").ToArray(); + Assert.Contains("STEP:START", actions); + Assert.Contains("STEP:RETRY", actions); + } + + // ── Replay non-determinism guards ─────────────────────────────────── + + [Fact] + public async Task ReplayUnknownStatus_ThrowsNonDeterministicException() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + SubType = OperationSubTypes.WaitForCondition, + Status = "BOGUS", + Name = "poll" + } + } + }); + + await Assert.ThrowsAsync(() => + context.WaitForConditionAsync( + check: async (_, _) => { await Task.CompletedTask; return 0; }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1)) + }, + name: "poll")); + } + + [Fact] + public async Task ReplayTypeMismatch_ThrowsNonDeterministicException() + { + // Same Id but a different Type — operation order changed between + // deployments. The base class's ValidateReplayConsistency catches it. + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Wait, + Status = OperationStatuses.Succeeded, + Name = "poll" + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.WaitForConditionAsync( + check: async (_, _) => { await Task.CompletedTask; return 0; }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1)) + }, + name: "poll")); + + Assert.Contains("expected type 'STEP'", ex.Message); + } + + // ── Argument validation ───────────────────────────────────────────── + + [Fact] + public async Task NullCheck_ThrowsArgumentNullException() + { + var (context, _, _, _) = CreateContext(); + await Assert.ThrowsAsync(() => + context.WaitForConditionAsync( + check: null!, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1)) + })); + } + + [Fact] + public async Task NullConfig_ThrowsArgumentNullException() + { + var (context, _, _, _) = CreateContext(); + await Assert.ThrowsAsync(() => + context.WaitForConditionAsync( + check: async (_, _) => { await Task.CompletedTask; return 0; }, + config: null!)); + } + + // ── Observability: warning on payload deserialization failure ────── + + [Fact] + public async Task DeserializeStateOrInitial_CorruptPayload_LogsWarningAndFallsBack() + { + // A READY checkpoint with a payload the serializer cannot read should + // NOT fail the workflow (Python parity); it should fall back to + // InitialState. The recovery should be logged at Warning level so + // corruption / schema-migrations are observable. + var state = new ExecutionState(); + state.LoadFromCheckpoint(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + SubType = OperationSubTypes.WaitForCondition, + Status = OperationStatuses.Ready, + Name = "poll", + StepDetails = new StepDetails { Result = "this-is-not-valid", Attempt = 2 } + } + } + }); + + var recorder = new RecordingBatcher(); + var logger = new RecordingLogger(); + + var op = new WaitForConditionOperation( + operationId: IdAt(1), + name: "poll", + check: async (s, _) => { await Task.CompletedTask; return s; }, + config: new WaitForConditionConfig + { + InitialState = 999, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1), isDone: _ => true) + }, + serializer: new ThrowingDeserializer(), + logger: logger, + state: state, + termination: new TerminationManager(), + durableExecutionArn: "arn:test", + batcher: recorder.Batcher); + + var result = await op.ExecuteAsync(CancellationToken.None); + + Assert.Equal(999, result); // fell back to InitialState + var warning = Assert.Single(logger.Entries.Where(e => e.Level == LogLevel.Warning)); + Assert.Contains("failed to deserialize prior state", warning.Message); + Assert.Contains(IdAt(1), warning.Message); + } + + [Fact] + public async Task ReplayFailed_CorruptLastStatePayload_LogsWarningAndLastStateNull() + { + // FAIL replay's LastState recovery: same observability story — if the + // FAIL Payload can't be deserialized, log a warning and surface + // LastState=null instead of throwing. + var state = new ExecutionState(); + state.LoadFromCheckpoint(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + SubType = OperationSubTypes.WaitForCondition, + Status = OperationStatuses.Failed, + Name = "poll", + StepDetails = new StepDetails + { + Attempt = 4, + Result = "bogus-payload", + Error = new ErrorObject + { + ErrorType = typeof(WaitForConditionException).FullName, + ErrorMessage = "exhausted" + } + } + } + } + }); + + var recorder = new RecordingBatcher(); + var logger = new RecordingLogger(); + + var op = new WaitForConditionOperation( + operationId: IdAt(1), + name: "poll", + check: async (s, _) => { await Task.CompletedTask; return s; }, + config: new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1)) + }, + serializer: new ThrowingDeserializer(), + logger: logger, + state: state, + termination: new TerminationManager(), + durableExecutionArn: "arn:test", + batcher: recorder.Batcher); + + var ex = await Assert.ThrowsAsync(() => op.ExecuteAsync(CancellationToken.None)); + + Assert.Equal(4, ex.AttemptsExhausted); + Assert.Null(ex.LastState); + var warning = Assert.Single(logger.Entries.Where(e => e.Level == LogLevel.Warning)); + Assert.Contains("failed to deserialize LastState", warning.Message); + } + + // ── Test helpers ──────────────────────────────────────────────────── + + private class CounterState + { + public int Count { get; set; } + } + + private class TestPerson + { + public string? Name { get; set; } + public int Age { get; set; } + } + + private class RecordingPersonSerializer : ICheckpointSerializer + { + public bool SerializeCalled { get; private set; } + public bool DeserializeCalled { get; private set; } + + public string Serialize(TestPerson value, SerializationContext context) + { + SerializeCalled = true; + return $"{value.Name},{value.Age}"; + } + + public TestPerson Deserialize(string data, SerializationContext context) + { + DeserializeCalled = true; + var inner = data.Replace("", "").Replace("", ""); + var parts = inner.Split(','); + return new TestPerson { Name = parts[0], Age = int.Parse(parts[1]) }; + } + } + + /// Serializer whose Deserialize always throws — exercises the fallback paths. + private sealed class ThrowingDeserializer : ICheckpointSerializer + { + public string Serialize(int value, SerializationContext context) => value.ToString(); + public int Deserialize(string data, SerializationContext context) + => throw new InvalidOperationException($"cannot deserialize '{data}'"); + } + + /// Captures log calls so tests can assert on level and rendered message. + private sealed class RecordingLogger : ILogger + { + public List<(LogLevel Level, string Message)> Entries { get; } = new(); + + public IDisposable? BeginScope(TState state) where TState : notnull => null; + public bool IsEnabled(LogLevel logLevel) => true; + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) + => Entries.Add((logLevel, formatter(state, exception))); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitStrategyTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitStrategyTests.cs new file mode 100644 index 000000000..f03635326 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitStrategyTests.cs @@ -0,0 +1,226 @@ +using Amazon.Lambda.DurableExecution; +using Xunit; + +namespace Amazon.Lambda.DurableExecution.Tests; + +public class WaitStrategyTests +{ + [Fact] + public void Exponential_Defaults_MatchReferenceSDKs() + { + // Reference SDKs (Python, JS, Java) all default to: + // maxAttempts=60, initialDelay=5s, maxDelay=300s, backoff=1.5x, FullJitter. + // Verify by exercising the boundary: an attempt one short of 60 + // continues; the 60th throws (matches the JS SDK pattern of + // signaling max-attempts via exception so the operation can produce + // a WaitForConditionException carrying the last state). + var strategy = WaitStrategy.Exponential(); + + Assert.True(strategy.Decide("any", 1).ShouldContinue); + Assert.True(strategy.Decide("any", 59).ShouldContinue); + + var ex = Assert.Throws(() => strategy.Decide("any", 60)); + Assert.Equal(60, ex.AttemptsExhausted); + } + + [Fact] + public void Exponential_NoIsDone_ThrowsAtMaxAttempts() + { + var strategy = WaitStrategy.Exponential(maxAttempts: 5); + + Assert.True(strategy.Decide(0, 1).ShouldContinue); + Assert.True(strategy.Decide(0, 4).ShouldContinue); + + var ex = Assert.Throws(() => strategy.Decide(0, 5)); + Assert.Equal(5, ex.AttemptsExhausted); + } + + [Fact] + public void Exponential_IsDoneTrue_StopsRegardlessOfAttempt() + { + var strategy = WaitStrategy.Exponential( + maxAttempts: 100, + isDone: state => state >= 10); + + // Predicate is the gate, not the attempt counter. + Assert.True(strategy.Decide(5, 1).ShouldContinue); + Assert.False(strategy.Decide(10, 1).ShouldContinue); + Assert.False(strategy.Decide(15, 1).ShouldContinue); + } + + [Fact] + public void Exponential_DelayGrowsAndCapsAtMax() + { + var strategy = WaitStrategy.Exponential( + maxAttempts: 20, + initialDelay: TimeSpan.FromSeconds(2), + maxDelay: TimeSpan.FromSeconds(20), + backoffRate: 2.0, + jitter: JitterStrategy.None); + + Assert.Equal(TimeSpan.FromSeconds(2), strategy.Decide(0, 1).Delay); + Assert.Equal(TimeSpan.FromSeconds(4), strategy.Decide(0, 2).Delay); + Assert.Equal(TimeSpan.FromSeconds(8), strategy.Decide(0, 3).Delay); + Assert.Equal(TimeSpan.FromSeconds(16), strategy.Decide(0, 4).Delay); + // 2 * 2^4 = 32, capped at 20. + Assert.Equal(TimeSpan.FromSeconds(20), strategy.Decide(0, 5).Delay); + } + + [Fact] + public void Exponential_FullJitter_StaysWithinBounds() + { + var strategy = WaitStrategy.Exponential( + maxAttempts: 20, + initialDelay: TimeSpan.FromSeconds(10), + maxDelay: TimeSpan.FromSeconds(100), + backoffRate: 2.0, + jitter: JitterStrategy.Full); + + for (int i = 0; i < 50; i++) + { + var d = strategy.Decide(0, 1).Delay; + // With Full jitter at attempt 1: between 1 (floor) and 10 inclusive. + Assert.True(d >= TimeSpan.FromSeconds(1)); + Assert.True(d <= TimeSpan.FromSeconds(10)); + } + } + + [Fact] + public void Exponential_HalfJitter_StaysWithinBounds() + { + // Half-jitter formula: cappedDelay * (0.5 + 0.5 * rand) ⇒ output is in + // [cappedDelay/2, cappedDelay], then ceilinged to whole seconds with a + // 1-second floor. At attempt 3 with initialDelay=10s, backoff=2.0: + // cappedDelay = min(10 * 2^2, 100) = 40s ⇒ output ∈ [20, 40] seconds. + var strategy = WaitStrategy.Exponential( + maxAttempts: 20, + initialDelay: TimeSpan.FromSeconds(10), + maxDelay: TimeSpan.FromSeconds(100), + backoffRate: 2.0, + jitter: JitterStrategy.Half); + + for (int i = 0; i < 50; i++) + { + var d = strategy.Decide(0, 3).Delay; + Assert.True(d >= TimeSpan.FromSeconds(20), $"expected >= 20s, got {d}"); + Assert.True(d <= TimeSpan.FromSeconds(40), $"expected <= 40s, got {d}"); + } + } + + [Fact] + public void Linear_DefaultsAreSensible() + { + // Default: 5s initial, +5s per attempt, no cap, 60 attempts. + var strategy = WaitStrategy.Linear(); + + Assert.Equal(TimeSpan.FromSeconds(5), strategy.Decide(0, 1).Delay); + Assert.Equal(TimeSpan.FromSeconds(10), strategy.Decide(0, 2).Delay); + Assert.Equal(TimeSpan.FromSeconds(15), strategy.Decide(0, 3).Delay); + } + + [Fact] + public void Linear_RespectsMaxDelay() + { + var strategy = WaitStrategy.Linear( + maxAttempts: 10, + initialDelay: TimeSpan.FromSeconds(2), + increment: TimeSpan.FromSeconds(3), + maxDelay: TimeSpan.FromSeconds(8)); + + Assert.Equal(TimeSpan.FromSeconds(2), strategy.Decide(0, 1).Delay); + Assert.Equal(TimeSpan.FromSeconds(5), strategy.Decide(0, 2).Delay); + Assert.Equal(TimeSpan.FromSeconds(8), strategy.Decide(0, 3).Delay); + // 2+3*3=11, capped to 8. + Assert.Equal(TimeSpan.FromSeconds(8), strategy.Decide(0, 4).Delay); + } + + [Fact] + public void Linear_ThrowsAtMaxAttempts() + { + var strategy = WaitStrategy.Linear(maxAttempts: 3); + + Assert.True(strategy.Decide(0, 1).ShouldContinue); + Assert.True(strategy.Decide(0, 2).ShouldContinue); + Assert.Throws(() => strategy.Decide(0, 3)); + } + + [Fact] + public void Linear_IsDonePredicate_ShortCircuits() + { + var strategy = WaitStrategy.Linear( + maxAttempts: 100, + isDone: state => state == 42); + + Assert.True(strategy.Decide(1, 1).ShouldContinue); + Assert.False(strategy.Decide(42, 1).ShouldContinue); + } + + [Fact] + public void Fixed_AlwaysReturnsSameDelay() + { + var strategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(7), maxAttempts: 5); + + Assert.Equal(TimeSpan.FromSeconds(7), strategy.Decide(0, 1).Delay); + Assert.Equal(TimeSpan.FromSeconds(7), strategy.Decide(0, 2).Delay); + Assert.Equal(TimeSpan.FromSeconds(7), strategy.Decide(0, 4).Delay); + } + + [Fact] + public void Fixed_ThrowsAtMaxAttempts() + { + var strategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(2), maxAttempts: 3); + + Assert.True(strategy.Decide(0, 1).ShouldContinue); + Assert.True(strategy.Decide(0, 2).ShouldContinue); + Assert.Throws(() => strategy.Decide(0, 3)); + } + + [Fact] + public void Fixed_FloorsDelayAtOneSecond() + { + // Service timer granularity is 1 second; sub-second delays would + // round to 0 if we didn't floor. + var strategy = WaitStrategy.Fixed(TimeSpan.FromMilliseconds(100), maxAttempts: 3); + var decision = strategy.Decide(0, 1); + Assert.True(decision.ShouldContinue); + Assert.Equal(TimeSpan.FromSeconds(1), decision.Delay); + } + + [Fact] + public void Fixed_IsDonePredicate_ShortCircuits() + { + var strategy = WaitStrategy.Fixed( + TimeSpan.FromSeconds(1), + maxAttempts: 50, + isDone: state => state); + + Assert.True(strategy.Decide(false, 1).ShouldContinue); + Assert.False(strategy.Decide(true, 1).ShouldContinue); + } + + [Fact] + public void FromDelegate_UsesProvidedFunction() + { + var strategy = WaitStrategy.FromDelegate((state, attempt) => + state >= 3 || attempt >= 5 + ? WaitDecision.Stop() + : WaitDecision.ContinueAfter(TimeSpan.FromSeconds(state + 1))); + + Assert.True(strategy.Decide(0, 1).ShouldContinue); + Assert.Equal(TimeSpan.FromSeconds(1), strategy.Decide(0, 1).Delay); + Assert.False(strategy.Decide(3, 1).ShouldContinue); + Assert.False(strategy.Decide(0, 5).ShouldContinue); + } + + [Fact] + public void WaitDecision_StopAndContinueAfter_ProduceExpectedShape() + { + var stop = WaitDecision.Stop(); + Assert.False(stop.ShouldContinue); + Assert.Equal(TimeSpan.Zero, stop.Delay); + + var cont = WaitDecision.ContinueAfter(TimeSpan.FromSeconds(3)); + Assert.True(cont.ShouldContinue); + Assert.Equal(TimeSpan.FromSeconds(3), cont.Delay); + } +}