diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/IRetryStrategy.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/IRetryStrategy.cs new file mode 100644 index 000000000..f291bed1e --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/IRetryStrategy.cs @@ -0,0 +1,39 @@ +namespace Amazon.Lambda.DurableExecution; + +/// +/// Determines whether a failed step should be retried and with what delay. +/// +public interface IRetryStrategy +{ + /// + /// Evaluates whether the given exception warrants a retry. + /// + /// The exception that caused the step to fail. + /// The 1-based attempt number that just failed. + /// A decision indicating whether to retry and the delay before the next attempt. + RetryDecision ShouldRetry(Exception exception, int attemptNumber); +} + +/// +/// The outcome of a retry evaluation. +/// +public readonly struct RetryDecision +{ + /// Whether the step should be retried. + public bool ShouldRetry { get; } + + /// The delay before the next retry attempt. + public TimeSpan Delay { get; } + + private RetryDecision(bool shouldRetry, TimeSpan delay) + { + ShouldRetry = shouldRetry; + Delay = delay; + } + + /// Indicates the step should not be retried. + public static RetryDecision DoNotRetry() => new(false, TimeSpan.Zero); + + /// Indicates the step should be retried after the specified delay. + public static RetryDecision RetryAfter(TimeSpan delay) => new(true, delay); +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/RetryStrategy.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/RetryStrategy.cs new file mode 100644 index 000000000..b8688ca0c --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/RetryStrategy.cs @@ -0,0 +1,185 @@ +using System.Text.RegularExpressions; + +namespace Amazon.Lambda.DurableExecution; + +/// +/// Jitter strategy for exponential backoff to prevent thundering-herd scenarios. +/// +public enum JitterStrategy +{ + /// No randomization — delay is exactly the calculated backoff value. + None, + /// Random delay between 0 and the calculated backoff value (recommended). + Full, + /// Random delay between 50% and 100% of the calculated backoff value. + Half +} + +/// +/// Controls whether a step re-executes if the Lambda is re-invoked mid-attempt. +/// +public enum StepSemantics +{ + /// + /// Default. The step may re-execute if the Lambda is re-invoked during execution. + /// Use for idempotent operations. + /// + AtLeastOncePerRetry, + + /// + /// The step executes at most once per retry attempt. A START checkpoint is written + /// before execution; on replay with an existing START, the SDK skips re-execution + /// and proceeds to the retry handler. + /// + AtMostOncePerRetry +} + +/// +/// Factory methods for common retry strategies. +/// +public static class RetryStrategy +{ + /// 6 attempts, 2x backoff, 5s initial delay, 60s max, Full jitter. + public static IRetryStrategy Default { get; } = Exponential( + maxAttempts: 6, + initialDelay: TimeSpan.FromSeconds(5), + maxDelay: TimeSpan.FromSeconds(60), + backoffRate: 2.0, + jitter: JitterStrategy.Full); + + /// 3 attempts, 2x backoff, 1s initial delay, 5s max, Half jitter. + public static IRetryStrategy Transient { get; } = Exponential( + maxAttempts: 3, + initialDelay: TimeSpan.FromSeconds(1), + maxDelay: TimeSpan.FromSeconds(5), + backoffRate: 2.0, + jitter: JitterStrategy.Half); + + /// No retry — 1 attempt only. + public static IRetryStrategy None { get; } = Exponential(maxAttempts: 1); + + /// + /// Creates an exponential backoff retry strategy. + /// + public static IRetryStrategy Exponential( + int maxAttempts = 3, + TimeSpan? initialDelay = null, + TimeSpan? maxDelay = null, + double backoffRate = 2.0, + JitterStrategy jitter = JitterStrategy.Full, + Type[]? retryableExceptions = null, + string[]? retryableMessagePatterns = null) + { + return new ExponentialRetryStrategy( + maxAttempts, + initialDelay ?? TimeSpan.FromSeconds(5), + maxDelay ?? TimeSpan.FromSeconds(300), + backoffRate, + jitter, + retryableExceptions, + retryableMessagePatterns); + } + + /// + /// Creates a retry strategy from a delegate. + /// + public static IRetryStrategy FromDelegate(Func strategy) + => new DelegateRetryStrategy(strategy); +} + +internal sealed class ExponentialRetryStrategy : IRetryStrategy +{ + private readonly int _maxAttempts; + private readonly TimeSpan _initialDelay; + private readonly TimeSpan _maxDelay; + private readonly double _backoffRate; + private readonly JitterStrategy _jitter; + private readonly Type[]? _retryableExceptions; + private readonly Regex[]? _retryableMessagePatterns; + + [ThreadStatic] + private static Random? t_random; + private static Random Random => t_random ??= new Random(); + + public ExponentialRetryStrategy( + int maxAttempts, + TimeSpan initialDelay, + TimeSpan maxDelay, + double backoffRate, + JitterStrategy jitter, + Type[]? retryableExceptions, + string[]? retryableMessagePatterns) + { + _maxAttempts = maxAttempts; + _initialDelay = initialDelay; + _maxDelay = maxDelay; + _backoffRate = backoffRate; + _jitter = jitter; + _retryableExceptions = retryableExceptions; + _retryableMessagePatterns = retryableMessagePatterns? + .Select(p => new Regex(p, RegexOptions.Compiled)) + .ToArray(); + } + + public RetryDecision ShouldRetry(Exception exception, int attemptNumber) + { + if (attemptNumber >= _maxAttempts) + return RetryDecision.DoNotRetry(); + + if (!IsRetryable(exception)) + return RetryDecision.DoNotRetry(); + + var delay = CalculateDelay(attemptNumber); + return RetryDecision.RetryAfter(delay); + } + + private bool IsRetryable(Exception exception) + { + if (_retryableExceptions == null && _retryableMessagePatterns == null) + return true; + + if (_retryableExceptions != null) + { + var exType = exception.GetType(); + if (_retryableExceptions.Any(t => t.IsAssignableFrom(exType))) + return true; + } + + if (_retryableMessagePatterns != null) + { + var message = exception.Message; + if (_retryableMessagePatterns.Any(p => p.IsMatch(message))) + return true; + } + + return false; + } + + internal TimeSpan CalculateDelay(int attemptNumber) + { + var baseDelay = _initialDelay.TotalSeconds * Math.Pow(_backoffRate, attemptNumber - 1); + var cappedDelay = Math.Min(baseDelay, _maxDelay.TotalSeconds); + + var finalDelay = _jitter switch + { + JitterStrategy.Full => Random.NextDouble() * cappedDelay, + JitterStrategy.Half => cappedDelay * (0.5 + 0.5 * Random.NextDouble()), + _ => cappedDelay + }; + + return TimeSpan.FromSeconds(Math.Max(1, Math.Ceiling(finalDelay))); + } +} + +internal sealed class DelegateRetryStrategy : IRetryStrategy +{ + private readonly Func _strategy; + + public DelegateRetryStrategy(Func strategy) + { + _strategy = strategy; + } + + public RetryDecision ShouldRetry(Exception exception, int attemptNumber) + => _strategy(exception, attemptNumber); +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/StepConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/StepConfig.cs index 2380967de..362867c09 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Config/StepConfig.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/StepConfig.cs @@ -5,9 +5,14 @@ namespace Amazon.Lambda.DurableExecution; /// public sealed class StepConfig { - // TODO: Retry support is deferred to a follow-up PR. When added, this is - // where RetryStrategy and Semantics (AtLeastOncePerRetry / AtMostOncePerRetry) - // will live. The follow-up needs to use service-mediated retries (checkpoint - // a RETRY operation + suspend the Lambda) rather than an in-process Task.Delay - // loop, to avoid billing Lambda compute time during retry backoff. + /// + /// Retry strategy for failed steps. When null (default), failures are not retried. + /// + public IRetryStrategy? RetryStrategy { get; set; } + + /// + /// Controls whether a step may re-execute if the Lambda is re-invoked mid-attempt. + /// Default is . + /// + public StepSemantics Semantics { get; set; } = StepSemantics.AtLeastOncePerRetry; } diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs index 8039e7c56..b800ef55d 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs @@ -11,13 +11,13 @@ namespace Amazon.Lambda.DurableExecution.Internal; /// call awaits the flush of its containing batch (sync semantics). /// /// -/// TODO: when Map / Parallel / ChildContext / WaitForCondition land — or when -/// AtLeastOncePerRetry step START gets a non-blocking variant — they will need -/// a fire-and-forget overload like -/// Task EnqueueAsync(SdkOperationUpdate update, bool sync) where -/// sync=false returns as soon as the item is queued. Java's -/// sendOperationUpdate vs sendOperationUpdateAsync is the model. -/// Today every call site is sync, so the API stays minimal. +/// Fire-and-forget semantics are achieved by simply not awaiting the returned +/// Task — matching Java/Python/JS SDKs which use the same one-method pattern. +/// Errors still surface deterministically via _terminalError: the next +/// sync or rethrows. +/// Callers using fire-and-forget should observe the discarded Task's exception +/// (see StepOperation.FireAndForget) so it doesn't trip the runtime's +/// UnobservedTaskException event. /// internal sealed class CheckpointBatcher : IAsyncDisposable { diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs index 2decdb309..54e52005d 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs @@ -1,20 +1,27 @@ using Microsoft.Extensions.Logging; using SdkErrorObject = Amazon.Lambda.Model.ErrorObject; using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate; +using SdkStepOptions = Amazon.Lambda.Model.StepOptions; namespace Amazon.Lambda.DurableExecution.Internal; /// -/// Durable step operation. Runs the user's function once across the lifetime -/// of a durable execution, persisting its result so subsequent invocations -/// replay the cached value without re-executing. +/// Durable step operation. Runs the user's function (with retry support), +/// persisting its result so subsequent invocations replay the cached value +/// without re-executing. /// /// -/// Replay semantics — example: await ctx.StepAsync(ChargeCard, "charge") +/// Replay branches — example: await ctx.StepAsync(ChargeCard, "charge") /// -/// Fresh: no prior state → run func → emit SUCCEED → return result. -/// Replay (SUCCEEDED): return cached result; func is NOT re-executed. -/// Replay (FAILED): re-throw the recorded exception. +/// Fresh: no prior state → run func → emit SUCCEED → return. +/// SUCCEEDED: return cached result; func is NOT re-executed. +/// FAILED: re-throw the recorded exception. +/// PENDING (retry timer not yet fired): re-suspend without +/// running func; service re-invokes once NextAttemptTimestamp elapses. +/// STARTED + AtMostOncePerRetry: crash recovery — treat as a +/// failed attempt, route through retry strategy. +/// READY: service has post-PENDING re-invoked us; the retry +/// timer fired and the next attempt is up. Run it. /// /// Serialization is delegated to the supplied ; /// the AOT-safe overloads of IDurableContext.StepAsync wire in a @@ -50,7 +57,7 @@ public StepOperation( protected override string OperationType => OperationTypes.Step; protected override Task StartAsync(CancellationToken cancellationToken) - => ExecuteFunc(cancellationToken); + => ExecuteFunc(attemptNumber: 1, cancellationToken); protected override Task ReplayAsync(Operation existing, CancellationToken cancellationToken) { @@ -66,31 +73,122 @@ protected override Task ReplayAsync(Operation existing, CancellationToken can // user's catch-block flow matches the original execution. throw CreateStepException(existing); + case OperationStatuses.Pending: + return ReplayPending(existing, cancellationToken); + + case OperationStatuses.Started: + return ReplayStarted(existing, cancellationToken); + + case OperationStatuses.Ready: + return ReplayReady(existing, cancellationToken); + default: - // STARTED/READY/PENDING from a prior invocation — no retry logic - // in this commit, so fall through and execute fresh. (Future work - // on retries will replace this default with explicit arms.) - return ExecuteFunc(cancellationToken); + // Unknown status — treat as fresh. + return ExecuteFunc(attemptNumber: 1, cancellationToken); + } + } + + /// + /// READY means the service has post-PENDING re-invoked us — the retry + /// timer fired and the step is eligible to run its next attempt. No + /// timer check is needed (the service has already decided we're up); + /// just advance the attempt counter and execute. Matches Java's + /// case READY -> executeStepLogic(attempt). + /// + private Task ReplayReady(Operation ready, CancellationToken cancellationToken) + { + var attemptNumber = (ready.StepDetails?.Attempt ?? 0) + 1; + return ExecuteFunc(attemptNumber, cancellationToken); + } + + /// + /// PENDING means a retry was scheduled (RETRY checkpoint). If + /// NextAttemptTimestamp is in the future, re-suspend; otherwise the timer + /// has fired and we run the next attempt. + /// + private Task ReplayPending(Operation pending, CancellationToken cancellationToken) + { + var nextAttemptTs = pending.StepDetails?.NextAttemptTimestamp; + var attemptNumber = (pending.StepDetails?.Attempt ?? 0) + 1; + + if (nextAttemptTs is { } scheduledMs && + DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() < scheduledMs) + { + // Retry timer hasn't fired yet — re-suspend so we don't bill compute + // while the timer ticks. Service re-invokes once the timer elapses. + return Termination.SuspendAndAwait( + TerminationReason.RetryScheduled, $"retry:{Name ?? OperationId}"); + } + + return ExecuteFunc(attemptNumber, cancellationToken); + } + + /// + /// STARTED means a START checkpoint was written but no SUCCEED/FAIL exists. + /// For AtMostOncePerRetry this signals a crash mid-step — treat as failure + /// and route through retry. For AtLeastOncePerRetry just re-execute. + /// + private Task ReplayStarted(Operation started, CancellationToken cancellationToken) + { + var attemptNumber = (started.StepDetails?.Attempt ?? 0) + 1; + + if (_config?.Semantics == StepSemantics.AtMostOncePerRetry) + { + // Re-running func would risk a duplicate side effect (e.g. double + // charge). Treat the lost result as a failure; let the retry + // strategy decide whether to try again or give up. + var error = started.StepDetails?.Error; + var ex = error != null + ? new StepException(error.ErrorMessage ?? "Step failed on previous attempt") { ErrorType = error.ErrorType } + : new StepException("Step result lost during AtMostOncePerRetry replay"); + return HandleStepFailureAsync(ex, attemptNumber, cancellationToken); } + + return ExecuteFunc(attemptNumber, cancellationToken); } - private async Task ExecuteFunc(CancellationToken cancellationToken) + private async Task ExecuteFunc(int attemptNumber, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); - // TODO: emit a STEP_STARTED checkpoint (action = "START") here when retries - // and/or AtMostOncePerRetry semantics land. AtMostOncePerRetry needs the - // START to be sync-flushed before user code runs (so replay can detect - // "we already attempted this and must not re-run"). AtLeastOncePerRetry - // wants it fire-and-forget for telemetry (attempt timing, retry count in - // history). Both require the async-flush overload in CheckpointBatcher - // (see TODO in CheckpointBatcher.cs). Today neither feature is wired up, - // so the START is intentionally omitted — SUCCEED alone is sufficient - // for replay correctness in the AtLeastOncePerRetry-only world this PR - // ships. Java SDK precedent: StepOperation.checkpointStarted(). + // Emit a START checkpoint before running user code, unless we're already + // resuming a STARTED record (which means an earlier attempt wrote it). + // + // AtMostOncePerRetry: SYNC flush. If Lambda crashes before SUCCEED is + // flushed, ReplayStarted routes through retry instead of re-executing. + // A queued-but-unflushed START is indistinguishable from "never ran" if + // we die, so the sync flush is correctness-load-bearing here. + // + // AtLeastOncePerRetry (default): FIRE-AND-FORGET. Replay correctness + // doesn't depend on the START — SUCCEED alone is sufficient — so this + // is purely telemetry (attempt timing, retry count visible in history). + // Java/Python/JS SDKs all use the same pattern: one enqueue API, sync + // for AtMostOnce, async for AtLeastOnce. + if (State.GetOperation(OperationId)?.Status != OperationStatuses.Started) + { + var startUpdate = new SdkOperationUpdate + { + Id = OperationId, + Type = OperationTypes.Step, + Action = "START", + SubType = "Step", + Name = Name + }; + + if (_config?.Semantics == StepSemantics.AtMostOncePerRetry) + { + await EnqueueAsync(startUpdate, cancellationToken); + } + else + { + FireAndForget(EnqueueAsync(startUpdate, cancellationToken)); + } + } + + try { - var stepContext = new StepContext(OperationId, attemptNumber: 1, _logger); + var stepContext = new StepContext(OperationId, attemptNumber, _logger); var result = await _func(stepContext); await EnqueueAsync(new SdkOperationUpdate @@ -111,24 +209,54 @@ await EnqueueAsync(new SdkOperationUpdate } catch (Exception ex) { - // No retry logic in this commit: any thrown exception becomes a - // FAIL checkpoint and is re-thrown as a StepException. On replay, - // the FAILED branch above will re-throw without re-executing. - await EnqueueAsync(new SdkOperationUpdate - { - Id = OperationId, - Type = OperationTypes.Step, - Action = "FAIL", - SubType = "Step", - Name = Name, - Error = ToSdkError(ex) - }, cancellationToken); + // Funnel into the retry/fail decision tree. May checkpoint RETRY and + // suspend (Pending), or checkpoint FAIL and rethrow to user. + return await HandleStepFailureAsync(ex, attemptNumber, cancellationToken); + } + } - throw new StepException(ex.Message, ex) + /// + /// Funnels a step failure into the retry/fail decision. May checkpoint + /// RETRY and suspend (Pending), or checkpoint FAIL and rethrow. + /// + private async Task HandleStepFailureAsync(Exception ex, int attemptNumber, CancellationToken cancellationToken) + { + var retryStrategy = _config?.RetryStrategy; + if (retryStrategy != null) + { + var decision = retryStrategy.ShouldRetry(ex, attemptNumber); + if (decision.ShouldRetry) { - ErrorType = ex.GetType().FullName - }; + var delaySeconds = (int)Math.Max(1, Math.Ceiling(decision.Delay.TotalSeconds)); + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + Type = OperationTypes.Step, + Action = "RETRY", + SubType = "Step", + Name = Name, + Error = ToSdkError(ex), + StepOptions = new SdkStepOptions { NextAttemptDelaySeconds = delaySeconds } + }, cancellationToken); + return await Termination.SuspendAndAwait( + TerminationReason.RetryScheduled, $"retry:{Name ?? OperationId}"); + } } + + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + Type = OperationTypes.Step, + Action = "FAIL", + SubType = "Step", + Name = Name, + Error = ToSdkError(ex) + }, cancellationToken); + + throw new StepException(ex.Message, ex) + { + ErrorType = ex.GetType().FullName + }; } private T DeserializeResult(string? serialized) @@ -157,4 +285,20 @@ private static StepException CreateStepException(Operation failedOp) ErrorMessage = ex.Message, StackTrace = ex.StackTrace?.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries).ToList() }; + + /// + /// Discards a Task but observes any exception so it doesn't surface as an + /// UnobservedTaskException. Used for fire-and-forget START checkpoints + /// under AtLeastOncePerRetry semantics. The actual error still propagates + /// via CheckpointBatcher._terminalError: the next sync EnqueueAsync + /// or DrainAsync will rethrow with the original cause. + /// + private static void FireAndForget(Task task) + { + _ = task.ContinueWith( + static t => _ = t.Exception, + CancellationToken.None, + TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + } } diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/TerminationManager.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/TerminationManager.cs index 1350c3d70..5d61e611b 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/TerminationManager.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/TerminationManager.cs @@ -6,6 +6,7 @@ namespace Amazon.Lambda.DurableExecution.Internal; internal enum TerminationReason { WaitScheduled, + RetryScheduled, CallbackPending, InvokePending, CheckpointFailed diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs index 8b5bb2e1b..b2ba4bb1a 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs @@ -263,8 +263,16 @@ public async Task WaitForHistoryAsync( { last = await GetHistoryAsync(durableExecutionArn, includeExecutionData); var eventCount = last.Events?.Count ?? 0; - _output.WriteLine($"[WaitForHistory] attempt {attempt}: {eventCount} events"); - if (predicate(last)) return last; + var typeCounts = last.Events? + .GroupBy(e => e.EventType?.Value ?? "") + .Select(g => $"{g.Key}:{g.Count()}") + .OrderBy(s => s); + _output.WriteLine($"[WaitForHistory] attempt {attempt}: {eventCount} events [{string.Join(",", typeCounts ?? Enumerable.Empty())}]"); + if (predicate(last)) + { + DumpEvents(last); + return last; + } } catch (Exception ex) { @@ -274,9 +282,21 @@ public async Task WaitForHistoryAsync( } _output.WriteLine($"[WaitForHistory] gave up after {attempt} attempts; returning last response with {last?.Events?.Count ?? 0} events"); + if (last != null) DumpEvents(last); return last ?? throw new TimeoutException($"GetDurableExecutionHistory never succeeded within {timeout.TotalSeconds}s"); } + private void DumpEvents(GetDurableExecutionHistoryResponse history) + { + var events = history.Events ?? new List(); + _output.WriteLine($"[WaitForHistory] event dump ({events.Count} total):"); + for (int i = 0; i < events.Count; i++) + { + var e = events[i]; + _output.WriteLine($" [{i}] type={e.EventType?.Value ?? ""} name={e.Name ?? ""} ts={e.EventTimestamp:O}"); + } + } + public string? ExtractDurableExecutionArn(string responsePayload) { try @@ -375,14 +395,18 @@ await Task.WhenAny( var stdout = await stdoutTask; var stderr = await stderrTask; - if (!string.IsNullOrWhiteSpace(stdout)) - _output.WriteLine($"stdout: {stdout[..Math.Min(stdout.Length, 1000)]}"); - if (process.ExitCode != 0) { + // Dump the FULL streams on failure — diagnosing build errors with + // truncated output is painful, and these only fire on test failure. + _output.WriteLine($"stdout: {stdout}"); _output.WriteLine($"stderr: {stderr}"); - throw new Exception($"{fileName} failed (exit {process.ExitCode}): {stderr}"); + var detail = !string.IsNullOrWhiteSpace(stderr) ? stderr : stdout; + throw new Exception($"{fileName} failed (exit {process.ExitCode}): {detail}"); } + + if (!string.IsNullOrWhiteSpace(stdout)) + _output.WriteLine($"stdout: {stdout[..Math.Min(stdout.Length, 1000)]}"); } public async ValueTask DisposeAsync() diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongerWaitTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongerWaitTest.cs index 0592d0d44..bfc2913ed 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongerWaitTest.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongerWaitTest.cs @@ -30,11 +30,14 @@ public async Task LongerWait_ExpiresAndCompletes() var history = await deployment.WaitForHistoryAsync( arn!, - h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2 + h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 2 + && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2 && (h.Events?.Any(e => e.WaitSucceededDetails != null) ?? false), TimeSpan.FromSeconds(60)); var events = history.Events ?? new List(); + Assert.Equal(2, events.Count(e => e.EventType == EventType.StepStarted)); + // Steps before and after the wait both ran, with the post-wait step seeing // the pre-wait step's value via replay. var stepResults = events diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MultipleStepsTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MultipleStepsTest.cs index 573ecc082..6b0ae0bc7 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MultipleStepsTest.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MultipleStepsTest.cs @@ -32,10 +32,13 @@ public async Task MultipleSteps_AllCheckpointed() // all events are indexed. Wait until we see all 5 step-succeeded events. var history = await deployment.WaitForHistoryAsync( arn!, - h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 5, + h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 5 + && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 5, TimeSpan.FromSeconds(60)); var events = history.Events ?? new List(); + Assert.Equal(5, events.Count(e => e.EventType == EventType.StepStarted)); + // Each step ran exactly once (no replay-induced duplicates) in declaration order, // and each step's output chained from the previous one. var stepResults = events diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayDeterminismTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayDeterminismTest.cs index 0fd7aa569..137bb28b8 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayDeterminismTest.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayDeterminismTest.cs @@ -31,10 +31,13 @@ public async Task ReplayDeterminism_SameGuidAcrossInvocations() // History is eventually consistent — wait until both step-succeeded events are visible. var history = await deployment.WaitForHistoryAsync( arn!, - h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2, + h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 2 + && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2, TimeSpan.FromSeconds(60)); var events = history.Events ?? new List(); + Assert.Equal(2, events.Count(e => e.EventType == EventType.StepStarted)); + // Each step succeeded exactly once — generate_id was NOT re-executed on replay // (a duplicate would show up as two succeeded events for the same name). var stepSucceededEvents = events.Where(e => e.StepSucceededDetails != null).ToList(); diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryTest.cs new file mode 100644 index 000000000..82be3d105 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryTest.cs @@ -0,0 +1,78 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class RetryTest +{ + private readonly ITestOutputHelper _output; + public RetryTest(ITestOutputHelper output) => _output = output; + + /// + /// End-to-end retry: step throws on attempts 1 and 2, succeeds on attempt 3. + /// Validates that the service honors the RETRY checkpoint, schedules the + /// requested delay, and re-invokes the Lambda — none of which the unit + /// tests can prove (they fake state transitions in-memory). + /// + [Fact] + public async Task FlakyStep_RetriesAndSucceedsOnThirdAttempt() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("RetryFunction"), + "retry", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "x"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Response: {responsePayload}"); + + // Initial invoke returns when the SDK suspends after the first failure. + // The execution continues asynchronously via service-driven re-invokes. + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + // Total expected wall time: 2s + 4s of retry delay + execution overhead. + // Allow generous headroom for service scheduling latency. + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 3 + && (h.Events?.Any(e => e.StepSucceededDetails != null) ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + // Three attempts ran (attempts 1, 2, 3). + Assert.Equal(3, events.Count(e => e.EventType == EventType.StepStarted)); + + // Two failed attempts recorded retry metadata; the final attempt succeeded. + Assert.Equal(2, events.Count(e => e.StepFailedDetails != null && e.Name == "flaky_step")); + var succeeded = events.SingleOrDefault(e => e.StepSucceededDetails != null && e.Name == "flaky_step"); + Assert.NotNull(succeeded); + Assert.Equal("\"ok on attempt 3\"", succeeded!.StepSucceededDetails.Result?.Payload); + + // The two recorded failure messages reflect the per-attempt exception. + var failures = events + .Where(e => e.StepFailedDetails != null && e.Name == "flaky_step") + .Select(e => e.StepFailedDetails.Error?.Payload?.ErrorMessage ?? string.Empty) + .ToList(); + Assert.Contains(failures, m => m.Contains("attempt 1")); + Assert.Contains(failures, m => m.Contains("attempt 2")); + + // Timing check: the service must have actually waited between attempts. + // With initialDelay=2s, backoffRate=2.0, no jitter: delays are 2s and 4s. + // The gap between the first and last StepStarted should be >= 6s. + var startedTimestamps = events + .Where(e => e.EventType == EventType.StepStarted && e.EventTimestamp.HasValue) + .OrderBy(e => e.EventTimestamp!.Value) + .Select(e => e.EventTimestamp!.Value) + .ToList(); + var totalGap = startedTimestamps[^1] - startedTimestamps[0]; + _output.WriteLine($"Time between first and last attempt: {totalGap.TotalSeconds:F1}s"); + Assert.True(totalGap >= TimeSpan.FromSeconds(6), + $"Service did not honor retry delays: {totalGap.TotalSeconds:F1}s gap (expected >= 6s)"); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepFailsTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepFailsTest.cs index 7b2afd427..b51e26b2d 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepFailsTest.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepFailsTest.cs @@ -36,10 +36,13 @@ public async Task StepFails_PropagatesAsFailedStatus() var history = await deployment.WaitForHistoryAsync( arn!, - h => h.Events?.Any(e => e.StepFailedDetails != null) ?? false, + h => (h.Events?.Any(e => e.EventType == EventType.StepStarted) ?? false) + && (h.Events?.Any(e => e.StepFailedDetails != null) ?? false), TimeSpan.FromSeconds(60)); var events = history.Events ?? new List(); + Assert.Equal(1, events.Count(e => e.EventType == EventType.StepStarted)); + // The failing step recorded a StepFailed event with the exception message. var stepFailed = events.FirstOrDefault(e => e.StepFailedDetails != null && e.Name == "fail_step"); Assert.NotNull(stepFailed); diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepWaitStepTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepWaitStepTest.cs index 684486dd9..05e2bfc72 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepWaitStepTest.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepWaitStepTest.cs @@ -32,11 +32,14 @@ public async Task StepWaitStep_CompletesViaService() var history = await deployment.WaitForHistoryAsync( arn!, - h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2 + h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 2 + && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2 && (h.Events?.Any(e => e.WaitSucceededDetails != null) ?? false), TimeSpan.FromSeconds(60)); var events = history.Events ?? new List(); + Assert.Equal(2, events.Count(e => e.EventType == EventType.StepStarted)); + // Both steps ran in order and produced the expected chained outputs. var stepResults = events .Where(e => e.StepSucceededDetails != null) diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs new file mode 100644 index 000000000..9ebffdf11 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs @@ -0,0 +1,49 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + var result = await context.StepAsync( + async (ctx) => + { + await Task.CompletedTask; + if (ctx.AttemptNumber < 3) + throw new InvalidOperationException($"flake on attempt {ctx.AttemptNumber}"); + return $"ok on attempt {ctx.AttemptNumber}"; + }, + name: "flaky_step", + config: new StepConfig + { + RetryStrategy = RetryStrategy.Exponential( + maxAttempts: 3, + initialDelay: TimeSpan.FromSeconds(2), + maxDelay: TimeSpan.FromSeconds(10), + backoffRate: 2.0, + jitter: JitterStrategy.None) + }); + + return new TestResult { Status = "completed", Data = result }; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult { public string? Status { get; set; } public string? Data { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/RetryFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/RetryFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/RetryFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs index 806ebd844..cc3b9a460 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs @@ -666,4 +666,306 @@ public TestPerson Deserialize(string data, SerializationContext context) return new TestPerson { Name = parts[0], Age = int.Parse(parts[1]) }; } } + + #region StepAsync Retry Tests + + [Fact] + public async Task StepAsync_FailsWithRetryStrategy_CheckpointsRetryAndSuspends() + { + var tm = new TerminationManager(); + var state = new ExecutionState(); + state.LoadFromCheckpoint(null); + var idGen = new OperationIdGenerator(); + var lambdaContext = new TestLambdaContext(); + var recorder = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + + var stepTask = context.StepAsync( + async (_) => { await Task.CompletedTask; throw new InvalidOperationException("transient"); }, + name: "flaky_step", + config: new StepConfig + { + RetryStrategy = RetryStrategy.Exponential( + maxAttempts: 3, + initialDelay: TimeSpan.FromSeconds(5), + jitter: JitterStrategy.None) + }); + + await Task.Delay(50); + + Assert.True(tm.IsTerminated); + Assert.False(stepTask.IsCompleted); + + // Fresh attempt 1 emits a fire-and-forget START (telemetry under + // AtLeastOncePerRetry), then a RETRY when the user code throws and + // the retry strategy decides to retry. + var checkpoints = recorder.Flushed; + Assert.Equal(2, checkpoints.Count); + Assert.Equal("START", checkpoints[0].Action); + Assert.Equal("RETRY", checkpoints[1].Action); + Assert.Equal(IdAt(1), checkpoints[1].Id); + Assert.Equal(5, checkpoints[1].StepOptions.NextAttemptDelaySeconds); + } + + [Fact] + public async Task StepAsync_FailsNoRetryStrategy_CheckpointsFail() + { + var context = CreateContext(); + + var ex = await Assert.ThrowsAsync(() => + context.StepAsync( + async (_) => { await Task.CompletedTask; throw new InvalidOperationException("permanent"); }, + name: "fail_step")); + + Assert.Equal("permanent", ex.Message); + } + + [Fact] + public async Task StepAsync_RetryExhausted_CheckpointsFail() + { + var state = new ExecutionState(); + state.LoadFromCheckpoint(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + Status = OperationStatuses.Pending, + StepDetails = new StepDetails + { + Attempt = 2, + NextAttemptTimestamp = DateTimeOffset.UtcNow.AddSeconds(-10).ToUnixTimeMilliseconds() + } + } + } + }); + var tm = new TerminationManager(); + var idGen = new OperationIdGenerator(); + var lambdaContext = new TestLambdaContext(); + var recorder = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + + // Attempt 3 (last one) — should fail after this + var ex = await Assert.ThrowsAsync(() => + context.StepAsync( + async (_) => { await Task.CompletedTask; throw new InvalidOperationException("still failing"); }, + name: "exhaust_step", + config: new StepConfig + { + RetryStrategy = RetryStrategy.Exponential(maxAttempts: 3, jitter: JitterStrategy.None) + })); + + Assert.Equal("still failing", ex.Message); + + // Fresh attempt 3 emits a fire-and-forget START (telemetry under + // AtLeastOncePerRetry), then a FAIL after the retry strategy gives up. + var checkpoints = recorder.Flushed; + Assert.Equal(2, checkpoints.Count); + Assert.Equal("START", checkpoints[0].Action); + Assert.Equal("FAIL", checkpoints[1].Action); + } + + [Fact] + public async Task StepAsync_PendingWithFutureTimestamp_Suspends() + { + var futureMs = DateTimeOffset.UtcNow.AddSeconds(300).ToUnixTimeMilliseconds(); + var tm = new TerminationManager(); + var state = new ExecutionState(); + state.LoadFromCheckpoint(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + Status = OperationStatuses.Pending, + StepDetails = new StepDetails + { + Attempt = 1, + NextAttemptTimestamp = futureMs + } + } + } + }); + var idGen = new OperationIdGenerator(); + var lambdaContext = new TestLambdaContext(); + var recorder = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + + var stepTask = context.StepAsync( + async (_) => { await Task.CompletedTask; return "should not run"; }, + name: "pending_step", + config: new StepConfig { RetryStrategy = RetryStrategy.Default }); + + await Task.Delay(50); + + Assert.True(tm.IsTerminated); + Assert.False(stepTask.IsCompleted); + Assert.Empty(recorder.Flushed); + } + + [Fact] + public async Task StepAsync_PendingWithPastTimestamp_ReExecutes() + { + var pastMs = DateTimeOffset.UtcNow.AddSeconds(-10).ToUnixTimeMilliseconds(); + var state = new ExecutionState(); + state.LoadFromCheckpoint(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + Status = OperationStatuses.Pending, + StepDetails = new StepDetails + { + Attempt = 1, + NextAttemptTimestamp = pastMs + } + } + } + }); + var tm = new TerminationManager(); + var idGen = new OperationIdGenerator(); + var lambdaContext = new TestLambdaContext(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext); + + var result = await context.StepAsync( + async (ctx) => + { + await Task.CompletedTask; + Assert.Equal(2, ctx.AttemptNumber); + return "retry success"; + }, + name: "retry_step", + config: new StepConfig { RetryStrategy = RetryStrategy.Default }); + + Assert.Equal("retry success", result); + } + + [Fact] + public async Task StepAsync_ReadyReplay_AdvancesAttemptAndExecutes() + { + // READY = service has post-PENDING re-invoked us; the retry timer + // already fired so no timestamp check is needed. Just advance the + // attempt counter and run. Matches Java's case READY -> executeStepLogic. + var state = new ExecutionState(); + state.LoadFromCheckpoint(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + Status = OperationStatuses.Ready, + StepDetails = new StepDetails { Attempt = 2 } + } + } + }); + var tm = new TerminationManager(); + var idGen = new OperationIdGenerator(); + var lambdaContext = new TestLambdaContext(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext); + + var executed = false; + var result = await context.StepAsync( + async (ctx) => + { + executed = true; + Assert.Equal(3, ctx.AttemptNumber); + await Task.CompletedTask; + return "ok"; + }, + name: "ready_step", + config: new StepConfig { RetryStrategy = RetryStrategy.Default }); + + Assert.True(executed); + Assert.Equal("ok", result); + Assert.False(tm.IsTerminated); + Assert.Equal(ExecutionMode.Execution, state.Mode); + } + + [Fact] + public async Task StepAsync_AtMostOnce_FlushesStartBeforeExecution() + { + var state = new ExecutionState(); + state.LoadFromCheckpoint(null); + var tm = new TerminationManager(); + var idGen = new OperationIdGenerator(); + var lambdaContext = new TestLambdaContext(); + var recorder = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + + IReadOnlyList? flushedAtFuncEntry = null; + + var result = await context.StepAsync( + async (_) => + { + flushedAtFuncEntry = recorder.Flushed.Select(o => o.Action.ToString()).ToArray(); + await Task.CompletedTask; + return "done"; + }, + name: "amo_step", + config: new StepConfig { Semantics = StepSemantics.AtMostOncePerRetry }); + + Assert.Equal("done", result); + + // START must be flushed before user func runs (AtMostOnce invariant). + Assert.NotNull(flushedAtFuncEntry); + Assert.Equal(new[] { "START" }, flushedAtFuncEntry); + + // After step returns, SUCCEED has also been flushed. + var actions = recorder.Flushed.Select(o => o.Action.ToString()).ToArray(); + Assert.Equal(new[] { "START", "SUCCEED" }, actions); + } + + [Fact] + public async Task StepAsync_AtMostOnce_StartedReplay_TriggersRetryHandler() + { + var tm = new TerminationManager(); + var state = new ExecutionState(); + state.LoadFromCheckpoint(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + Status = OperationStatuses.Started + } + } + }); + var idGen = new OperationIdGenerator(); + var lambdaContext = new TestLambdaContext(); + var recorder = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + + var executed = false; + var stepTask = context.StepAsync( + async (_) => { executed = true; await Task.CompletedTask; return "should not run"; }, + name: "amo_replay", + config: new StepConfig + { + Semantics = StepSemantics.AtMostOncePerRetry, + RetryStrategy = RetryStrategy.Exponential(maxAttempts: 3, jitter: JitterStrategy.None) + }); + + await Task.Delay(50); + + Assert.False(executed); + Assert.True(tm.IsTerminated); + Assert.False(stepTask.IsCompleted); + + var checkpoints = recorder.Flushed; + Assert.Single(checkpoints); + Assert.Equal("RETRY", checkpoints[0].Action); + } + + #endregion } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs index 032a25a66..b624766eb 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs @@ -196,30 +196,37 @@ public async Task WrapAsync_CheckpointsAreSentToService() mockClient); Assert.Equal(InvocationStatus.Pending, output.Status); - Assert.Equal(2, mockClient.CheckpointCalls.Count); - - // First flush: step SUCCEED (the user awaits StepAsync, which awaits - // its SUCCEED enqueue, which blocks until the batcher flushes it). - var firstCall = mockClient.CheckpointCalls[0]; - Assert.Equal("arn:aws:lambda:us-east-1:123:durable-execution:checkpoint-test", firstCall.DurableExecutionArn); - Assert.Equal("initial-token", firstCall.CheckpointToken); - Assert.Single(firstCall.Updates); - var stepUpdate = firstCall.Updates[0]; - Assert.Equal("STEP", stepUpdate.Type); - Assert.Equal("SUCCEED", stepUpdate.Action); - Assert.Equal("validate", stepUpdate.Name); - Assert.NotNull(stepUpdate.Payload); - - // Second flush: wait START (blocks until the service has the timer - // recorded before WaitAsync suspends). - var secondCall = mockClient.CheckpointCalls[1]; - Assert.Single(secondCall.Updates); - var waitUpdate = secondCall.Updates[0]; - Assert.Equal("WAIT", waitUpdate.Type); - Assert.Equal("START", waitUpdate.Action); - Assert.Equal("delay", waitUpdate.Name); - Assert.NotNull(waitUpdate.WaitOptions); - Assert.Equal(30, waitUpdate.WaitOptions.WaitSeconds); + + // Each StepAsync emits a fire-and-forget START before user code runs + // (telemetry under AtLeastOncePerRetry). With FlushInterval = 0 the + // worker may flush the START on its own before SUCCEED arrives, so the + // exact batching of START vs SUCCEED is timing-dependent. Assert on + // the flat sequence of updates instead. + var allUpdates = mockClient.CheckpointCalls + .SelectMany(c => c.Updates) + .ToList(); + + // Expect: step START, step SUCCEED, wait START (in that order). + Assert.Equal(3, allUpdates.Count); + + Assert.Equal("STEP", allUpdates[0].Type); + Assert.Equal("START", allUpdates[0].Action); + Assert.Equal("validate", allUpdates[0].Name); + + Assert.Equal("STEP", allUpdates[1].Type); + Assert.Equal("SUCCEED", allUpdates[1].Action); + Assert.Equal("validate", allUpdates[1].Name); + Assert.NotNull(allUpdates[1].Payload); + + Assert.Equal("WAIT", allUpdates[2].Type); + Assert.Equal("START", allUpdates[2].Action); + Assert.Equal("delay", allUpdates[2].Name); + Assert.NotNull(allUpdates[2].WaitOptions); + Assert.Equal(30, allUpdates[2].WaitOptions.WaitSeconds); + + // The first call sends the initial checkpoint token. + Assert.Equal("arn:aws:lambda:us-east-1:123:durable-execution:checkpoint-test", mockClient.CheckpointCalls[0].DurableExecutionArn); + Assert.Equal("initial-token", mockClient.CheckpointCalls[0].CheckpointToken); } [Fact] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RetryStrategyTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RetryStrategyTests.cs new file mode 100644 index 000000000..e5a277fb6 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RetryStrategyTests.cs @@ -0,0 +1,202 @@ +using Amazon.Lambda.DurableExecution; +using Xunit; + +namespace Amazon.Lambda.DurableExecution.Tests; + +public class RetryStrategyTests +{ + [Fact] + public void ExponentialDefault_RetriesUpToMaxAttempts() + { + var strategy = RetryStrategy.Default; + + // Attempts 1-5 should retry (maxAttempts=6 means 6 total attempts) + for (int i = 1; i < 6; i++) + { + var decision = strategy.ShouldRetry(new InvalidOperationException("fail"), i); + Assert.True(decision.ShouldRetry); + Assert.True(decision.Delay >= TimeSpan.FromSeconds(1)); + } + + // Attempt 6 should not retry (exhausted) + var lastDecision = strategy.ShouldRetry(new InvalidOperationException("fail"), 6); + Assert.False(lastDecision.ShouldRetry); + } + + [Fact] + public void None_NeverRetries() + { + var strategy = RetryStrategy.None; + + var decision = strategy.ShouldRetry(new Exception("fail"), 1); + Assert.False(decision.ShouldRetry); + } + + [Fact] + public void Transient_RetriesUpTo3Attempts() + { + var strategy = RetryStrategy.Transient; + + Assert.True(strategy.ShouldRetry(new Exception("fail"), 1).ShouldRetry); + Assert.True(strategy.ShouldRetry(new Exception("fail"), 2).ShouldRetry); + Assert.False(strategy.ShouldRetry(new Exception("fail"), 3).ShouldRetry); + } + + [Fact] + public void Exponential_DelayIncreases() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 5, + initialDelay: TimeSpan.FromSeconds(2), + maxDelay: TimeSpan.FromSeconds(120), + backoffRate: 2.0, + jitter: JitterStrategy.None); + + var d1 = strategy.ShouldRetry(new Exception(), 1).Delay; + var d2 = strategy.ShouldRetry(new Exception(), 2).Delay; + var d3 = strategy.ShouldRetry(new Exception(), 3).Delay; + + // With no jitter: 2s, 4s, 8s (ceiling to whole seconds) + Assert.Equal(TimeSpan.FromSeconds(2), d1); + Assert.Equal(TimeSpan.FromSeconds(4), d2); + Assert.Equal(TimeSpan.FromSeconds(8), d3); + } + + [Fact] + public void Exponential_DelayCapsAtMax() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 10, + initialDelay: TimeSpan.FromSeconds(10), + maxDelay: TimeSpan.FromSeconds(30), + backoffRate: 3.0, + jitter: JitterStrategy.None); + + // Attempt 3: 10 * 3^2 = 90, capped to 30 + var decision = strategy.ShouldRetry(new Exception(), 3); + Assert.Equal(TimeSpan.FromSeconds(30), decision.Delay); + } + + [Fact] + public void Exponential_FullJitter_BoundedByDelay() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 5, + initialDelay: TimeSpan.FromSeconds(10), + maxDelay: TimeSpan.FromSeconds(100), + backoffRate: 2.0, + jitter: JitterStrategy.Full); + + // Run multiple times to check bounds + for (int i = 0; i < 50; i++) + { + var decision = strategy.ShouldRetry(new Exception(), 1); + Assert.True(decision.Delay >= TimeSpan.FromSeconds(1)); + Assert.True(decision.Delay <= TimeSpan.FromSeconds(10)); + } + } + + [Fact] + public void Exponential_HalfJitter_BoundedBetween50And100Percent() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 5, + initialDelay: TimeSpan.FromSeconds(10), + maxDelay: TimeSpan.FromSeconds(100), + backoffRate: 2.0, + jitter: JitterStrategy.Half); + + for (int i = 0; i < 50; i++) + { + var decision = strategy.ShouldRetry(new Exception(), 1); + Assert.True(decision.Delay >= TimeSpan.FromSeconds(5)); + Assert.True(decision.Delay <= TimeSpan.FromSeconds(10)); + } + } + + [Fact] + public void Exponential_RetryableExceptions_FiltersCorrectly() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 3, + retryableExceptions: new[] { typeof(TimeoutException), typeof(HttpRequestException) }); + + Assert.True(strategy.ShouldRetry(new TimeoutException(), 1).ShouldRetry); + Assert.True(strategy.ShouldRetry(new HttpRequestException(), 1).ShouldRetry); + Assert.False(strategy.ShouldRetry(new InvalidOperationException(), 1).ShouldRetry); + } + + [Fact] + public void Exponential_RetryableExceptions_MatchesDerivedTypes() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 3, + retryableExceptions: new[] { typeof(IOException) }); + + Assert.True(strategy.ShouldRetry(new FileNotFoundException(), 1).ShouldRetry); + } + + [Fact] + public void Exponential_MessagePatterns_FiltersCorrectly() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 3, + retryableMessagePatterns: new[] { "timeout", "throttl", "5\\d{2}" }); + + Assert.True(strategy.ShouldRetry(new Exception("connection timeout"), 1).ShouldRetry); + Assert.True(strategy.ShouldRetry(new Exception("request throttled"), 1).ShouldRetry); + Assert.True(strategy.ShouldRetry(new Exception("HTTP 503"), 1).ShouldRetry); + Assert.False(strategy.ShouldRetry(new Exception("not found"), 1).ShouldRetry); + } + + [Fact] + public void Exponential_BothFilters_EitherMatches() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 3, + retryableExceptions: new[] { typeof(TimeoutException) }, + retryableMessagePatterns: new[] { "throttl" }); + + // Matches exception type + Assert.True(strategy.ShouldRetry(new TimeoutException("any message"), 1).ShouldRetry); + // Matches message pattern + Assert.True(strategy.ShouldRetry(new Exception("throttled"), 1).ShouldRetry); + // Matches neither + Assert.False(strategy.ShouldRetry(new InvalidOperationException("bad state"), 1).ShouldRetry); + } + + [Fact] + public void Exponential_NoFilters_RetriesAllExceptions() + { + var strategy = RetryStrategy.Exponential(maxAttempts: 3); + + Assert.True(strategy.ShouldRetry(new Exception("anything"), 1).ShouldRetry); + Assert.True(strategy.ShouldRetry(new InvalidOperationException(), 1).ShouldRetry); + Assert.True(strategy.ShouldRetry(new OutOfMemoryException(), 1).ShouldRetry); + } + + [Fact] + public void Exponential_MinimumDelayIsOneSecond() + { + var strategy = RetryStrategy.Exponential( + maxAttempts: 3, + initialDelay: TimeSpan.FromMilliseconds(100), + jitter: JitterStrategy.None); + + var decision = strategy.ShouldRetry(new Exception(), 1); + Assert.True(decision.Delay >= TimeSpan.FromSeconds(1)); + } + + [Fact] + public void FromDelegate_UsesProvidedFunction() + { + var strategy = RetryStrategy.FromDelegate((ex, attempt) => + attempt < 2 && ex is TimeoutException + ? RetryDecision.RetryAfter(TimeSpan.FromSeconds(5)) + : RetryDecision.DoNotRetry()); + + Assert.True(strategy.ShouldRetry(new TimeoutException(), 1).ShouldRetry); + Assert.False(strategy.ShouldRetry(new TimeoutException(), 2).ShouldRetry); + Assert.False(strategy.ShouldRetry(new Exception(), 1).ShouldRetry); + } +}