diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/IRetryStrategy.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/IRetryStrategy.cs
new file mode 100644
index 000000000..f291bed1e
--- /dev/null
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/IRetryStrategy.cs
@@ -0,0 +1,39 @@
+namespace Amazon.Lambda.DurableExecution;
+
+///
+/// Determines whether a failed step should be retried and with what delay.
+///
+public interface IRetryStrategy
+{
+ ///
+ /// Evaluates whether the given exception warrants a retry.
+ ///
+ /// The exception that caused the step to fail.
+ /// The 1-based attempt number that just failed.
+ /// A decision indicating whether to retry and the delay before the next attempt.
+ RetryDecision ShouldRetry(Exception exception, int attemptNumber);
+}
+
+///
+/// The outcome of a retry evaluation.
+///
+public readonly struct RetryDecision
+{
+ /// Whether the step should be retried.
+ public bool ShouldRetry { get; }
+
+ /// The delay before the next retry attempt.
+ public TimeSpan Delay { get; }
+
+ private RetryDecision(bool shouldRetry, TimeSpan delay)
+ {
+ ShouldRetry = shouldRetry;
+ Delay = delay;
+ }
+
+ /// Indicates the step should not be retried.
+ public static RetryDecision DoNotRetry() => new(false, TimeSpan.Zero);
+
+ /// Indicates the step should be retried after the specified delay.
+ public static RetryDecision RetryAfter(TimeSpan delay) => new(true, delay);
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/RetryStrategy.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/RetryStrategy.cs
new file mode 100644
index 000000000..b8688ca0c
--- /dev/null
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/RetryStrategy.cs
@@ -0,0 +1,185 @@
+using System.Text.RegularExpressions;
+
+namespace Amazon.Lambda.DurableExecution;
+
+///
+/// Jitter strategy for exponential backoff to prevent thundering-herd scenarios.
+///
+public enum JitterStrategy
+{
+ /// No randomization — delay is exactly the calculated backoff value.
+ None,
+ /// Random delay between 0 and the calculated backoff value (recommended).
+ Full,
+ /// Random delay between 50% and 100% of the calculated backoff value.
+ Half
+}
+
+///
+/// Controls whether a step re-executes if the Lambda is re-invoked mid-attempt.
+///
+public enum StepSemantics
+{
+ ///
+ /// Default. The step may re-execute if the Lambda is re-invoked during execution.
+ /// Use for idempotent operations.
+ ///
+ AtLeastOncePerRetry,
+
+ ///
+ /// The step executes at most once per retry attempt. A START checkpoint is written
+ /// before execution; on replay with an existing START, the SDK skips re-execution
+ /// and proceeds to the retry handler.
+ ///
+ AtMostOncePerRetry
+}
+
+///
+/// Factory methods for common retry strategies.
+///
+public static class RetryStrategy
+{
+ /// 6 attempts, 2x backoff, 5s initial delay, 60s max, Full jitter.
+ public static IRetryStrategy Default { get; } = Exponential(
+ maxAttempts: 6,
+ initialDelay: TimeSpan.FromSeconds(5),
+ maxDelay: TimeSpan.FromSeconds(60),
+ backoffRate: 2.0,
+ jitter: JitterStrategy.Full);
+
+ /// 3 attempts, 2x backoff, 1s initial delay, 5s max, Half jitter.
+ public static IRetryStrategy Transient { get; } = Exponential(
+ maxAttempts: 3,
+ initialDelay: TimeSpan.FromSeconds(1),
+ maxDelay: TimeSpan.FromSeconds(5),
+ backoffRate: 2.0,
+ jitter: JitterStrategy.Half);
+
+ /// No retry — 1 attempt only.
+ public static IRetryStrategy None { get; } = Exponential(maxAttempts: 1);
+
+ ///
+ /// Creates an exponential backoff retry strategy.
+ ///
+ public static IRetryStrategy Exponential(
+ int maxAttempts = 3,
+ TimeSpan? initialDelay = null,
+ TimeSpan? maxDelay = null,
+ double backoffRate = 2.0,
+ JitterStrategy jitter = JitterStrategy.Full,
+ Type[]? retryableExceptions = null,
+ string[]? retryableMessagePatterns = null)
+ {
+ return new ExponentialRetryStrategy(
+ maxAttempts,
+ initialDelay ?? TimeSpan.FromSeconds(5),
+ maxDelay ?? TimeSpan.FromSeconds(300),
+ backoffRate,
+ jitter,
+ retryableExceptions,
+ retryableMessagePatterns);
+ }
+
+ ///
+ /// Creates a retry strategy from a delegate.
+ ///
+ public static IRetryStrategy FromDelegate(Func strategy)
+ => new DelegateRetryStrategy(strategy);
+}
+
+internal sealed class ExponentialRetryStrategy : IRetryStrategy
+{
+ private readonly int _maxAttempts;
+ private readonly TimeSpan _initialDelay;
+ private readonly TimeSpan _maxDelay;
+ private readonly double _backoffRate;
+ private readonly JitterStrategy _jitter;
+ private readonly Type[]? _retryableExceptions;
+ private readonly Regex[]? _retryableMessagePatterns;
+
+ [ThreadStatic]
+ private static Random? t_random;
+ private static Random Random => t_random ??= new Random();
+
+ public ExponentialRetryStrategy(
+ int maxAttempts,
+ TimeSpan initialDelay,
+ TimeSpan maxDelay,
+ double backoffRate,
+ JitterStrategy jitter,
+ Type[]? retryableExceptions,
+ string[]? retryableMessagePatterns)
+ {
+ _maxAttempts = maxAttempts;
+ _initialDelay = initialDelay;
+ _maxDelay = maxDelay;
+ _backoffRate = backoffRate;
+ _jitter = jitter;
+ _retryableExceptions = retryableExceptions;
+ _retryableMessagePatterns = retryableMessagePatterns?
+ .Select(p => new Regex(p, RegexOptions.Compiled))
+ .ToArray();
+ }
+
+ public RetryDecision ShouldRetry(Exception exception, int attemptNumber)
+ {
+ if (attemptNumber >= _maxAttempts)
+ return RetryDecision.DoNotRetry();
+
+ if (!IsRetryable(exception))
+ return RetryDecision.DoNotRetry();
+
+ var delay = CalculateDelay(attemptNumber);
+ return RetryDecision.RetryAfter(delay);
+ }
+
+ private bool IsRetryable(Exception exception)
+ {
+ if (_retryableExceptions == null && _retryableMessagePatterns == null)
+ return true;
+
+ if (_retryableExceptions != null)
+ {
+ var exType = exception.GetType();
+ if (_retryableExceptions.Any(t => t.IsAssignableFrom(exType)))
+ return true;
+ }
+
+ if (_retryableMessagePatterns != null)
+ {
+ var message = exception.Message;
+ if (_retryableMessagePatterns.Any(p => p.IsMatch(message)))
+ return true;
+ }
+
+ return false;
+ }
+
+ internal TimeSpan CalculateDelay(int attemptNumber)
+ {
+ var baseDelay = _initialDelay.TotalSeconds * Math.Pow(_backoffRate, attemptNumber - 1);
+ var cappedDelay = Math.Min(baseDelay, _maxDelay.TotalSeconds);
+
+ var finalDelay = _jitter switch
+ {
+ JitterStrategy.Full => Random.NextDouble() * cappedDelay,
+ JitterStrategy.Half => cappedDelay * (0.5 + 0.5 * Random.NextDouble()),
+ _ => cappedDelay
+ };
+
+ return TimeSpan.FromSeconds(Math.Max(1, Math.Ceiling(finalDelay)));
+ }
+}
+
+internal sealed class DelegateRetryStrategy : IRetryStrategy
+{
+ private readonly Func _strategy;
+
+ public DelegateRetryStrategy(Func strategy)
+ {
+ _strategy = strategy;
+ }
+
+ public RetryDecision ShouldRetry(Exception exception, int attemptNumber)
+ => _strategy(exception, attemptNumber);
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/StepConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/StepConfig.cs
index 2380967de..362867c09 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Config/StepConfig.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/StepConfig.cs
@@ -5,9 +5,14 @@ namespace Amazon.Lambda.DurableExecution;
///
public sealed class StepConfig
{
- // TODO: Retry support is deferred to a follow-up PR. When added, this is
- // where RetryStrategy and Semantics (AtLeastOncePerRetry / AtMostOncePerRetry)
- // will live. The follow-up needs to use service-mediated retries (checkpoint
- // a RETRY operation + suspend the Lambda) rather than an in-process Task.Delay
- // loop, to avoid billing Lambda compute time during retry backoff.
+ ///
+ /// Retry strategy for failed steps. When null (default), failures are not retried.
+ ///
+ public IRetryStrategy? RetryStrategy { get; set; }
+
+ ///
+ /// Controls whether a step may re-execute if the Lambda is re-invoked mid-attempt.
+ /// Default is .
+ ///
+ public StepSemantics Semantics { get; set; } = StepSemantics.AtLeastOncePerRetry;
}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs
index 8039e7c56..b800ef55d 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CheckpointBatcher.cs
@@ -11,13 +11,13 @@ namespace Amazon.Lambda.DurableExecution.Internal;
/// call awaits the flush of its containing batch (sync semantics).
///
///
-/// TODO: when Map / Parallel / ChildContext / WaitForCondition land — or when
-/// AtLeastOncePerRetry step START gets a non-blocking variant — they will need
-/// a fire-and-forget overload like
-/// Task EnqueueAsync(SdkOperationUpdate update, bool sync) where
-/// sync=false returns as soon as the item is queued. Java's
-/// sendOperationUpdate vs sendOperationUpdateAsync is the model.
-/// Today every call site is sync, so the API stays minimal.
+/// Fire-and-forget semantics are achieved by simply not awaiting the returned
+/// Task — matching Java/Python/JS SDKs which use the same one-method pattern.
+/// Errors still surface deterministically via _terminalError: the next
+/// sync or rethrows.
+/// Callers using fire-and-forget should observe the discarded Task's exception
+/// (see StepOperation.FireAndForget) so it doesn't trip the runtime's
+/// UnobservedTaskException event.
///
internal sealed class CheckpointBatcher : IAsyncDisposable
{
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs
index 2decdb309..54e52005d 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs
@@ -1,20 +1,27 @@
using Microsoft.Extensions.Logging;
using SdkErrorObject = Amazon.Lambda.Model.ErrorObject;
using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate;
+using SdkStepOptions = Amazon.Lambda.Model.StepOptions;
namespace Amazon.Lambda.DurableExecution.Internal;
///
-/// Durable step operation. Runs the user's function once across the lifetime
-/// of a durable execution, persisting its result so subsequent invocations
-/// replay the cached value without re-executing.
+/// Durable step operation. Runs the user's function (with retry support),
+/// persisting its result so subsequent invocations replay the cached value
+/// without re-executing.
///
///
-/// Replay semantics — example: await ctx.StepAsync(ChargeCard, "charge")
+/// Replay branches — example: await ctx.StepAsync(ChargeCard, "charge")
///
-/// - Fresh: no prior state → run func → emit SUCCEED → return result.
-/// - Replay (SUCCEEDED): return cached result; func is NOT re-executed.
-/// - Replay (FAILED): re-throw the recorded exception.
+/// - Fresh: no prior state → run func → emit SUCCEED → return.
+/// - SUCCEEDED: return cached result; func is NOT re-executed.
+/// - FAILED: re-throw the recorded exception.
+/// - PENDING (retry timer not yet fired): re-suspend without
+/// running func; service re-invokes once NextAttemptTimestamp elapses.
+/// - STARTED + AtMostOncePerRetry: crash recovery — treat as a
+/// failed attempt, route through retry strategy.
+/// - READY: service has post-PENDING re-invoked us; the retry
+/// timer fired and the next attempt is up. Run it.
///
/// Serialization is delegated to the supplied ;
/// the AOT-safe overloads of IDurableContext.StepAsync wire in a
@@ -50,7 +57,7 @@ public StepOperation(
protected override string OperationType => OperationTypes.Step;
protected override Task StartAsync(CancellationToken cancellationToken)
- => ExecuteFunc(cancellationToken);
+ => ExecuteFunc(attemptNumber: 1, cancellationToken);
protected override Task ReplayAsync(Operation existing, CancellationToken cancellationToken)
{
@@ -66,31 +73,122 @@ protected override Task ReplayAsync(Operation existing, CancellationToken can
// user's catch-block flow matches the original execution.
throw CreateStepException(existing);
+ case OperationStatuses.Pending:
+ return ReplayPending(existing, cancellationToken);
+
+ case OperationStatuses.Started:
+ return ReplayStarted(existing, cancellationToken);
+
+ case OperationStatuses.Ready:
+ return ReplayReady(existing, cancellationToken);
+
default:
- // STARTED/READY/PENDING from a prior invocation — no retry logic
- // in this commit, so fall through and execute fresh. (Future work
- // on retries will replace this default with explicit arms.)
- return ExecuteFunc(cancellationToken);
+ // Unknown status — treat as fresh.
+ return ExecuteFunc(attemptNumber: 1, cancellationToken);
+ }
+ }
+
+ ///
+ /// READY means the service has post-PENDING re-invoked us — the retry
+ /// timer fired and the step is eligible to run its next attempt. No
+ /// timer check is needed (the service has already decided we're up);
+ /// just advance the attempt counter and execute. Matches Java's
+ /// case READY -> executeStepLogic(attempt).
+ ///
+ private Task ReplayReady(Operation ready, CancellationToken cancellationToken)
+ {
+ var attemptNumber = (ready.StepDetails?.Attempt ?? 0) + 1;
+ return ExecuteFunc(attemptNumber, cancellationToken);
+ }
+
+ ///
+ /// PENDING means a retry was scheduled (RETRY checkpoint). If
+ /// NextAttemptTimestamp is in the future, re-suspend; otherwise the timer
+ /// has fired and we run the next attempt.
+ ///
+ private Task ReplayPending(Operation pending, CancellationToken cancellationToken)
+ {
+ var nextAttemptTs = pending.StepDetails?.NextAttemptTimestamp;
+ var attemptNumber = (pending.StepDetails?.Attempt ?? 0) + 1;
+
+ if (nextAttemptTs is { } scheduledMs &&
+ DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() < scheduledMs)
+ {
+ // Retry timer hasn't fired yet — re-suspend so we don't bill compute
+ // while the timer ticks. Service re-invokes once the timer elapses.
+ return Termination.SuspendAndAwait(
+ TerminationReason.RetryScheduled, $"retry:{Name ?? OperationId}");
+ }
+
+ return ExecuteFunc(attemptNumber, cancellationToken);
+ }
+
+ ///
+ /// STARTED means a START checkpoint was written but no SUCCEED/FAIL exists.
+ /// For AtMostOncePerRetry this signals a crash mid-step — treat as failure
+ /// and route through retry. For AtLeastOncePerRetry just re-execute.
+ ///
+ private Task ReplayStarted(Operation started, CancellationToken cancellationToken)
+ {
+ var attemptNumber = (started.StepDetails?.Attempt ?? 0) + 1;
+
+ if (_config?.Semantics == StepSemantics.AtMostOncePerRetry)
+ {
+ // Re-running func would risk a duplicate side effect (e.g. double
+ // charge). Treat the lost result as a failure; let the retry
+ // strategy decide whether to try again or give up.
+ var error = started.StepDetails?.Error;
+ var ex = error != null
+ ? new StepException(error.ErrorMessage ?? "Step failed on previous attempt") { ErrorType = error.ErrorType }
+ : new StepException("Step result lost during AtMostOncePerRetry replay");
+ return HandleStepFailureAsync(ex, attemptNumber, cancellationToken);
}
+
+ return ExecuteFunc(attemptNumber, cancellationToken);
}
- private async Task ExecuteFunc(CancellationToken cancellationToken)
+ private async Task ExecuteFunc(int attemptNumber, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
- // TODO: emit a STEP_STARTED checkpoint (action = "START") here when retries
- // and/or AtMostOncePerRetry semantics land. AtMostOncePerRetry needs the
- // START to be sync-flushed before user code runs (so replay can detect
- // "we already attempted this and must not re-run"). AtLeastOncePerRetry
- // wants it fire-and-forget for telemetry (attempt timing, retry count in
- // history). Both require the async-flush overload in CheckpointBatcher
- // (see TODO in CheckpointBatcher.cs). Today neither feature is wired up,
- // so the START is intentionally omitted — SUCCEED alone is sufficient
- // for replay correctness in the AtLeastOncePerRetry-only world this PR
- // ships. Java SDK precedent: StepOperation.checkpointStarted().
+ // Emit a START checkpoint before running user code, unless we're already
+ // resuming a STARTED record (which means an earlier attempt wrote it).
+ //
+ // AtMostOncePerRetry: SYNC flush. If Lambda crashes before SUCCEED is
+ // flushed, ReplayStarted routes through retry instead of re-executing.
+ // A queued-but-unflushed START is indistinguishable from "never ran" if
+ // we die, so the sync flush is correctness-load-bearing here.
+ //
+ // AtLeastOncePerRetry (default): FIRE-AND-FORGET. Replay correctness
+ // doesn't depend on the START — SUCCEED alone is sufficient — so this
+ // is purely telemetry (attempt timing, retry count visible in history).
+ // Java/Python/JS SDKs all use the same pattern: one enqueue API, sync
+ // for AtMostOnce, async for AtLeastOnce.
+ if (State.GetOperation(OperationId)?.Status != OperationStatuses.Started)
+ {
+ var startUpdate = new SdkOperationUpdate
+ {
+ Id = OperationId,
+ Type = OperationTypes.Step,
+ Action = "START",
+ SubType = "Step",
+ Name = Name
+ };
+
+ if (_config?.Semantics == StepSemantics.AtMostOncePerRetry)
+ {
+ await EnqueueAsync(startUpdate, cancellationToken);
+ }
+ else
+ {
+ FireAndForget(EnqueueAsync(startUpdate, cancellationToken));
+ }
+ }
+
+
try
{
- var stepContext = new StepContext(OperationId, attemptNumber: 1, _logger);
+ var stepContext = new StepContext(OperationId, attemptNumber, _logger);
var result = await _func(stepContext);
await EnqueueAsync(new SdkOperationUpdate
@@ -111,24 +209,54 @@ await EnqueueAsync(new SdkOperationUpdate
}
catch (Exception ex)
{
- // No retry logic in this commit: any thrown exception becomes a
- // FAIL checkpoint and is re-thrown as a StepException. On replay,
- // the FAILED branch above will re-throw without re-executing.
- await EnqueueAsync(new SdkOperationUpdate
- {
- Id = OperationId,
- Type = OperationTypes.Step,
- Action = "FAIL",
- SubType = "Step",
- Name = Name,
- Error = ToSdkError(ex)
- }, cancellationToken);
+ // Funnel into the retry/fail decision tree. May checkpoint RETRY and
+ // suspend (Pending), or checkpoint FAIL and rethrow to user.
+ return await HandleStepFailureAsync(ex, attemptNumber, cancellationToken);
+ }
+ }
- throw new StepException(ex.Message, ex)
+ ///
+ /// Funnels a step failure into the retry/fail decision. May checkpoint
+ /// RETRY and suspend (Pending), or checkpoint FAIL and rethrow.
+ ///
+ private async Task HandleStepFailureAsync(Exception ex, int attemptNumber, CancellationToken cancellationToken)
+ {
+ var retryStrategy = _config?.RetryStrategy;
+ if (retryStrategy != null)
+ {
+ var decision = retryStrategy.ShouldRetry(ex, attemptNumber);
+ if (decision.ShouldRetry)
{
- ErrorType = ex.GetType().FullName
- };
+ var delaySeconds = (int)Math.Max(1, Math.Ceiling(decision.Delay.TotalSeconds));
+ await EnqueueAsync(new SdkOperationUpdate
+ {
+ Id = OperationId,
+ Type = OperationTypes.Step,
+ Action = "RETRY",
+ SubType = "Step",
+ Name = Name,
+ Error = ToSdkError(ex),
+ StepOptions = new SdkStepOptions { NextAttemptDelaySeconds = delaySeconds }
+ }, cancellationToken);
+ return await Termination.SuspendAndAwait(
+ TerminationReason.RetryScheduled, $"retry:{Name ?? OperationId}");
+ }
}
+
+ await EnqueueAsync(new SdkOperationUpdate
+ {
+ Id = OperationId,
+ Type = OperationTypes.Step,
+ Action = "FAIL",
+ SubType = "Step",
+ Name = Name,
+ Error = ToSdkError(ex)
+ }, cancellationToken);
+
+ throw new StepException(ex.Message, ex)
+ {
+ ErrorType = ex.GetType().FullName
+ };
}
private T DeserializeResult(string? serialized)
@@ -157,4 +285,20 @@ private static StepException CreateStepException(Operation failedOp)
ErrorMessage = ex.Message,
StackTrace = ex.StackTrace?.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries).ToList()
};
+
+ ///
+ /// Discards a Task but observes any exception so it doesn't surface as an
+ /// UnobservedTaskException. Used for fire-and-forget START checkpoints
+ /// under AtLeastOncePerRetry semantics. The actual error still propagates
+ /// via CheckpointBatcher._terminalError: the next sync EnqueueAsync
+ /// or DrainAsync will rethrow with the original cause.
+ ///
+ private static void FireAndForget(Task task)
+ {
+ _ = task.ContinueWith(
+ static t => _ = t.Exception,
+ CancellationToken.None,
+ TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
+ TaskScheduler.Default);
+ }
}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/TerminationManager.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/TerminationManager.cs
index 1350c3d70..5d61e611b 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/TerminationManager.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/TerminationManager.cs
@@ -6,6 +6,7 @@ namespace Amazon.Lambda.DurableExecution.Internal;
internal enum TerminationReason
{
WaitScheduled,
+ RetryScheduled,
CallbackPending,
InvokePending,
CheckpointFailed
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs
index 8b5bb2e1b..b2ba4bb1a 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs
@@ -263,8 +263,16 @@ public async Task WaitForHistoryAsync(
{
last = await GetHistoryAsync(durableExecutionArn, includeExecutionData);
var eventCount = last.Events?.Count ?? 0;
- _output.WriteLine($"[WaitForHistory] attempt {attempt}: {eventCount} events");
- if (predicate(last)) return last;
+ var typeCounts = last.Events?
+ .GroupBy(e => e.EventType?.Value ?? "")
+ .Select(g => $"{g.Key}:{g.Count()}")
+ .OrderBy(s => s);
+ _output.WriteLine($"[WaitForHistory] attempt {attempt}: {eventCount} events [{string.Join(",", typeCounts ?? Enumerable.Empty())}]");
+ if (predicate(last))
+ {
+ DumpEvents(last);
+ return last;
+ }
}
catch (Exception ex)
{
@@ -274,9 +282,21 @@ public async Task WaitForHistoryAsync(
}
_output.WriteLine($"[WaitForHistory] gave up after {attempt} attempts; returning last response with {last?.Events?.Count ?? 0} events");
+ if (last != null) DumpEvents(last);
return last ?? throw new TimeoutException($"GetDurableExecutionHistory never succeeded within {timeout.TotalSeconds}s");
}
+ private void DumpEvents(GetDurableExecutionHistoryResponse history)
+ {
+ var events = history.Events ?? new List();
+ _output.WriteLine($"[WaitForHistory] event dump ({events.Count} total):");
+ for (int i = 0; i < events.Count; i++)
+ {
+ var e = events[i];
+ _output.WriteLine($" [{i}] type={e.EventType?.Value ?? ""} name={e.Name ?? ""} ts={e.EventTimestamp:O}");
+ }
+ }
+
public string? ExtractDurableExecutionArn(string responsePayload)
{
try
@@ -375,14 +395,18 @@ await Task.WhenAny(
var stdout = await stdoutTask;
var stderr = await stderrTask;
- if (!string.IsNullOrWhiteSpace(stdout))
- _output.WriteLine($"stdout: {stdout[..Math.Min(stdout.Length, 1000)]}");
-
if (process.ExitCode != 0)
{
+ // Dump the FULL streams on failure — diagnosing build errors with
+ // truncated output is painful, and these only fire on test failure.
+ _output.WriteLine($"stdout: {stdout}");
_output.WriteLine($"stderr: {stderr}");
- throw new Exception($"{fileName} failed (exit {process.ExitCode}): {stderr}");
+ var detail = !string.IsNullOrWhiteSpace(stderr) ? stderr : stdout;
+ throw new Exception($"{fileName} failed (exit {process.ExitCode}): {detail}");
}
+
+ if (!string.IsNullOrWhiteSpace(stdout))
+ _output.WriteLine($"stdout: {stdout[..Math.Min(stdout.Length, 1000)]}");
}
public async ValueTask DisposeAsync()
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongerWaitTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongerWaitTest.cs
index 0592d0d44..bfc2913ed 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongerWaitTest.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/LongerWaitTest.cs
@@ -30,11 +30,14 @@ public async Task LongerWait_ExpiresAndCompletes()
var history = await deployment.WaitForHistoryAsync(
arn!,
- h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2
+ h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 2
+ && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2
&& (h.Events?.Any(e => e.WaitSucceededDetails != null) ?? false),
TimeSpan.FromSeconds(60));
var events = history.Events ?? new List();
+ Assert.Equal(2, events.Count(e => e.EventType == EventType.StepStarted));
+
// Steps before and after the wait both ran, with the post-wait step seeing
// the pre-wait step's value via replay.
var stepResults = events
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MultipleStepsTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MultipleStepsTest.cs
index 573ecc082..6b0ae0bc7 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MultipleStepsTest.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MultipleStepsTest.cs
@@ -32,10 +32,13 @@ public async Task MultipleSteps_AllCheckpointed()
// all events are indexed. Wait until we see all 5 step-succeeded events.
var history = await deployment.WaitForHistoryAsync(
arn!,
- h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 5,
+ h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 5
+ && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 5,
TimeSpan.FromSeconds(60));
var events = history.Events ?? new List();
+ Assert.Equal(5, events.Count(e => e.EventType == EventType.StepStarted));
+
// Each step ran exactly once (no replay-induced duplicates) in declaration order,
// and each step's output chained from the previous one.
var stepResults = events
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayDeterminismTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayDeterminismTest.cs
index 0fd7aa569..137bb28b8 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayDeterminismTest.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ReplayDeterminismTest.cs
@@ -31,10 +31,13 @@ public async Task ReplayDeterminism_SameGuidAcrossInvocations()
// History is eventually consistent — wait until both step-succeeded events are visible.
var history = await deployment.WaitForHistoryAsync(
arn!,
- h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2,
+ h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 2
+ && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2,
TimeSpan.FromSeconds(60));
var events = history.Events ?? new List();
+ Assert.Equal(2, events.Count(e => e.EventType == EventType.StepStarted));
+
// Each step succeeded exactly once — generate_id was NOT re-executed on replay
// (a duplicate would show up as two succeeded events for the same name).
var stepSucceededEvents = events.Where(e => e.StepSucceededDetails != null).ToList();
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryTest.cs
new file mode 100644
index 000000000..82be3d105
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/RetryTest.cs
@@ -0,0 +1,78 @@
+using System.Linq;
+using System.Text;
+using Amazon.Lambda.Model;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Amazon.Lambda.DurableExecution.IntegrationTests;
+
+public class RetryTest
+{
+ private readonly ITestOutputHelper _output;
+ public RetryTest(ITestOutputHelper output) => _output = output;
+
+ ///
+ /// End-to-end retry: step throws on attempts 1 and 2, succeeds on attempt 3.
+ /// Validates that the service honors the RETRY checkpoint, schedules the
+ /// requested delay, and re-invokes the Lambda — none of which the unit
+ /// tests can prove (they fake state transitions in-memory).
+ ///
+ [Fact]
+ public async Task FlakyStep_RetriesAndSucceedsOnThirdAttempt()
+ {
+ await using var deployment = await DurableFunctionDeployment.CreateAsync(
+ DurableFunctionDeployment.FindTestFunctionDir("RetryFunction"),
+ "retry", _output);
+
+ var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "x"}""");
+ var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray());
+ _output.WriteLine($"Response: {responsePayload}");
+
+ // Initial invoke returns when the SDK suspends after the first failure.
+ // The execution continues asynchronously via service-driven re-invokes.
+ var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60));
+ Assert.NotNull(arn);
+
+ // Total expected wall time: 2s + 4s of retry delay + execution overhead.
+ // Allow generous headroom for service scheduling latency.
+ var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120));
+ Assert.Equal("SUCCEEDED", status, ignoreCase: true);
+
+ var history = await deployment.WaitForHistoryAsync(
+ arn!,
+ h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 3
+ && (h.Events?.Any(e => e.StepSucceededDetails != null) ?? false),
+ TimeSpan.FromSeconds(60));
+ var events = history.Events ?? new List();
+
+ // Three attempts ran (attempts 1, 2, 3).
+ Assert.Equal(3, events.Count(e => e.EventType == EventType.StepStarted));
+
+ // Two failed attempts recorded retry metadata; the final attempt succeeded.
+ Assert.Equal(2, events.Count(e => e.StepFailedDetails != null && e.Name == "flaky_step"));
+ var succeeded = events.SingleOrDefault(e => e.StepSucceededDetails != null && e.Name == "flaky_step");
+ Assert.NotNull(succeeded);
+ Assert.Equal("\"ok on attempt 3\"", succeeded!.StepSucceededDetails.Result?.Payload);
+
+ // The two recorded failure messages reflect the per-attempt exception.
+ var failures = events
+ .Where(e => e.StepFailedDetails != null && e.Name == "flaky_step")
+ .Select(e => e.StepFailedDetails.Error?.Payload?.ErrorMessage ?? string.Empty)
+ .ToList();
+ Assert.Contains(failures, m => m.Contains("attempt 1"));
+ Assert.Contains(failures, m => m.Contains("attempt 2"));
+
+ // Timing check: the service must have actually waited between attempts.
+ // With initialDelay=2s, backoffRate=2.0, no jitter: delays are 2s and 4s.
+ // The gap between the first and last StepStarted should be >= 6s.
+ var startedTimestamps = events
+ .Where(e => e.EventType == EventType.StepStarted && e.EventTimestamp.HasValue)
+ .OrderBy(e => e.EventTimestamp!.Value)
+ .Select(e => e.EventTimestamp!.Value)
+ .ToList();
+ var totalGap = startedTimestamps[^1] - startedTimestamps[0];
+ _output.WriteLine($"Time between first and last attempt: {totalGap.TotalSeconds:F1}s");
+ Assert.True(totalGap >= TimeSpan.FromSeconds(6),
+ $"Service did not honor retry delays: {totalGap.TotalSeconds:F1}s gap (expected >= 6s)");
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepFailsTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepFailsTest.cs
index 7b2afd427..b51e26b2d 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepFailsTest.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepFailsTest.cs
@@ -36,10 +36,13 @@ public async Task StepFails_PropagatesAsFailedStatus()
var history = await deployment.WaitForHistoryAsync(
arn!,
- h => h.Events?.Any(e => e.StepFailedDetails != null) ?? false,
+ h => (h.Events?.Any(e => e.EventType == EventType.StepStarted) ?? false)
+ && (h.Events?.Any(e => e.StepFailedDetails != null) ?? false),
TimeSpan.FromSeconds(60));
var events = history.Events ?? new List();
+ Assert.Equal(1, events.Count(e => e.EventType == EventType.StepStarted));
+
// The failing step recorded a StepFailed event with the exception message.
var stepFailed = events.FirstOrDefault(e => e.StepFailedDetails != null && e.Name == "fail_step");
Assert.NotNull(stepFailed);
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepWaitStepTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepWaitStepTest.cs
index 684486dd9..05e2bfc72 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepWaitStepTest.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/StepWaitStepTest.cs
@@ -32,11 +32,14 @@ public async Task StepWaitStep_CompletesViaService()
var history = await deployment.WaitForHistoryAsync(
arn!,
- h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2
+ h => (h.Events?.Count(e => e.EventType == EventType.StepStarted) ?? 0) >= 2
+ && (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2
&& (h.Events?.Any(e => e.WaitSucceededDetails != null) ?? false),
TimeSpan.FromSeconds(60));
var events = history.Events ?? new List();
+ Assert.Equal(2, events.Count(e => e.EventType == EventType.StepStarted));
+
// Both steps ran in order and produced the expected chained outputs.
var stepResults = events
.Where(e => e.StepSucceededDetails != null)
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Dockerfile
new file mode 100644
index 000000000..c1913d56a
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Dockerfile
@@ -0,0 +1,7 @@
+FROM public.ecr.aws/lambda/provided:al2023
+
+RUN dnf install -y libicu
+
+COPY bin/publish/ ${LAMBDA_TASK_ROOT}
+
+ENTRYPOINT ["/var/task/bootstrap"]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs
new file mode 100644
index 000000000..9ebffdf11
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs
@@ -0,0 +1,49 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+
+namespace DurableExecutionTestFunction;
+
+public class Function
+{
+ public static async Task Main(string[] args)
+ {
+ var handler = new Function();
+ var serializer = new DefaultLambdaJsonSerializer();
+ using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer);
+ using var bootstrap = new LambdaBootstrap(handlerWrapper);
+ await bootstrap.RunAsync();
+ }
+
+ public Task Handler(
+ DurableExecutionInvocationInput input, ILambdaContext context)
+ => DurableFunction.WrapAsync(Workflow, input, context);
+
+ private async Task Workflow(TestEvent input, IDurableContext context)
+ {
+ var result = await context.StepAsync(
+ async (ctx) =>
+ {
+ await Task.CompletedTask;
+ if (ctx.AttemptNumber < 3)
+ throw new InvalidOperationException($"flake on attempt {ctx.AttemptNumber}");
+ return $"ok on attempt {ctx.AttemptNumber}";
+ },
+ name: "flaky_step",
+ config: new StepConfig
+ {
+ RetryStrategy = RetryStrategy.Exponential(
+ maxAttempts: 3,
+ initialDelay: TimeSpan.FromSeconds(2),
+ maxDelay: TimeSpan.FromSeconds(10),
+ backoffRate: 2.0,
+ jitter: JitterStrategy.None)
+ });
+
+ return new TestResult { Status = "completed", Data = result };
+ }
+}
+
+public class TestEvent { public string? OrderId { get; set; } }
+public class TestResult { public string? Status { get; set; } public string? Data { get; set; } }
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/RetryFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/RetryFunction.csproj
new file mode 100644
index 000000000..6f5f657e4
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/RetryFunction.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net8.0
+ Exe
+ true
+ bootstrap
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs
index 806ebd844..cc3b9a460 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs
@@ -666,4 +666,306 @@ public TestPerson Deserialize(string data, SerializationContext context)
return new TestPerson { Name = parts[0], Age = int.Parse(parts[1]) };
}
}
+
+ #region StepAsync Retry Tests
+
+ [Fact]
+ public async Task StepAsync_FailsWithRetryStrategy_CheckpointsRetryAndSuspends()
+ {
+ var tm = new TerminationManager();
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(null);
+ var idGen = new OperationIdGenerator();
+ var lambdaContext = new TestLambdaContext();
+ var recorder = new RecordingBatcher();
+ var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher);
+
+ var stepTask = context.StepAsync(
+ async (_) => { await Task.CompletedTask; throw new InvalidOperationException("transient"); },
+ name: "flaky_step",
+ config: new StepConfig
+ {
+ RetryStrategy = RetryStrategy.Exponential(
+ maxAttempts: 3,
+ initialDelay: TimeSpan.FromSeconds(5),
+ jitter: JitterStrategy.None)
+ });
+
+ await Task.Delay(50);
+
+ Assert.True(tm.IsTerminated);
+ Assert.False(stepTask.IsCompleted);
+
+ // Fresh attempt 1 emits a fire-and-forget START (telemetry under
+ // AtLeastOncePerRetry), then a RETRY when the user code throws and
+ // the retry strategy decides to retry.
+ var checkpoints = recorder.Flushed;
+ Assert.Equal(2, checkpoints.Count);
+ Assert.Equal("START", checkpoints[0].Action);
+ Assert.Equal("RETRY", checkpoints[1].Action);
+ Assert.Equal(IdAt(1), checkpoints[1].Id);
+ Assert.Equal(5, checkpoints[1].StepOptions.NextAttemptDelaySeconds);
+ }
+
+ [Fact]
+ public async Task StepAsync_FailsNoRetryStrategy_CheckpointsFail()
+ {
+ var context = CreateContext();
+
+ var ex = await Assert.ThrowsAsync(() =>
+ context.StepAsync(
+ async (_) => { await Task.CompletedTask; throw new InvalidOperationException("permanent"); },
+ name: "fail_step"));
+
+ Assert.Equal("permanent", ex.Message);
+ }
+
+ [Fact]
+ public async Task StepAsync_RetryExhausted_CheckpointsFail()
+ {
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Step,
+ Status = OperationStatuses.Pending,
+ StepDetails = new StepDetails
+ {
+ Attempt = 2,
+ NextAttemptTimestamp = DateTimeOffset.UtcNow.AddSeconds(-10).ToUnixTimeMilliseconds()
+ }
+ }
+ }
+ });
+ var tm = new TerminationManager();
+ var idGen = new OperationIdGenerator();
+ var lambdaContext = new TestLambdaContext();
+ var recorder = new RecordingBatcher();
+ var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher);
+
+ // Attempt 3 (last one) — should fail after this
+ var ex = await Assert.ThrowsAsync(() =>
+ context.StepAsync(
+ async (_) => { await Task.CompletedTask; throw new InvalidOperationException("still failing"); },
+ name: "exhaust_step",
+ config: new StepConfig
+ {
+ RetryStrategy = RetryStrategy.Exponential(maxAttempts: 3, jitter: JitterStrategy.None)
+ }));
+
+ Assert.Equal("still failing", ex.Message);
+
+ // Fresh attempt 3 emits a fire-and-forget START (telemetry under
+ // AtLeastOncePerRetry), then a FAIL after the retry strategy gives up.
+ var checkpoints = recorder.Flushed;
+ Assert.Equal(2, checkpoints.Count);
+ Assert.Equal("START", checkpoints[0].Action);
+ Assert.Equal("FAIL", checkpoints[1].Action);
+ }
+
+ [Fact]
+ public async Task StepAsync_PendingWithFutureTimestamp_Suspends()
+ {
+ var futureMs = DateTimeOffset.UtcNow.AddSeconds(300).ToUnixTimeMilliseconds();
+ var tm = new TerminationManager();
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Step,
+ Status = OperationStatuses.Pending,
+ StepDetails = new StepDetails
+ {
+ Attempt = 1,
+ NextAttemptTimestamp = futureMs
+ }
+ }
+ }
+ });
+ var idGen = new OperationIdGenerator();
+ var lambdaContext = new TestLambdaContext();
+ var recorder = new RecordingBatcher();
+ var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher);
+
+ var stepTask = context.StepAsync(
+ async (_) => { await Task.CompletedTask; return "should not run"; },
+ name: "pending_step",
+ config: new StepConfig { RetryStrategy = RetryStrategy.Default });
+
+ await Task.Delay(50);
+
+ Assert.True(tm.IsTerminated);
+ Assert.False(stepTask.IsCompleted);
+ Assert.Empty(recorder.Flushed);
+ }
+
+ [Fact]
+ public async Task StepAsync_PendingWithPastTimestamp_ReExecutes()
+ {
+ var pastMs = DateTimeOffset.UtcNow.AddSeconds(-10).ToUnixTimeMilliseconds();
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Step,
+ Status = OperationStatuses.Pending,
+ StepDetails = new StepDetails
+ {
+ Attempt = 1,
+ NextAttemptTimestamp = pastMs
+ }
+ }
+ }
+ });
+ var tm = new TerminationManager();
+ var idGen = new OperationIdGenerator();
+ var lambdaContext = new TestLambdaContext();
+ var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext);
+
+ var result = await context.StepAsync(
+ async (ctx) =>
+ {
+ await Task.CompletedTask;
+ Assert.Equal(2, ctx.AttemptNumber);
+ return "retry success";
+ },
+ name: "retry_step",
+ config: new StepConfig { RetryStrategy = RetryStrategy.Default });
+
+ Assert.Equal("retry success", result);
+ }
+
+ [Fact]
+ public async Task StepAsync_ReadyReplay_AdvancesAttemptAndExecutes()
+ {
+ // READY = service has post-PENDING re-invoked us; the retry timer
+ // already fired so no timestamp check is needed. Just advance the
+ // attempt counter and run. Matches Java's case READY -> executeStepLogic.
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Step,
+ Status = OperationStatuses.Ready,
+ StepDetails = new StepDetails { Attempt = 2 }
+ }
+ }
+ });
+ var tm = new TerminationManager();
+ var idGen = new OperationIdGenerator();
+ var lambdaContext = new TestLambdaContext();
+ var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext);
+
+ var executed = false;
+ var result = await context.StepAsync(
+ async (ctx) =>
+ {
+ executed = true;
+ Assert.Equal(3, ctx.AttemptNumber);
+ await Task.CompletedTask;
+ return "ok";
+ },
+ name: "ready_step",
+ config: new StepConfig { RetryStrategy = RetryStrategy.Default });
+
+ Assert.True(executed);
+ Assert.Equal("ok", result);
+ Assert.False(tm.IsTerminated);
+ Assert.Equal(ExecutionMode.Execution, state.Mode);
+ }
+
+ [Fact]
+ public async Task StepAsync_AtMostOnce_FlushesStartBeforeExecution()
+ {
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(null);
+ var tm = new TerminationManager();
+ var idGen = new OperationIdGenerator();
+ var lambdaContext = new TestLambdaContext();
+ var recorder = new RecordingBatcher();
+ var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher);
+
+ IReadOnlyList? flushedAtFuncEntry = null;
+
+ var result = await context.StepAsync(
+ async (_) =>
+ {
+ flushedAtFuncEntry = recorder.Flushed.Select(o => o.Action.ToString()).ToArray();
+ await Task.CompletedTask;
+ return "done";
+ },
+ name: "amo_step",
+ config: new StepConfig { Semantics = StepSemantics.AtMostOncePerRetry });
+
+ Assert.Equal("done", result);
+
+ // START must be flushed before user func runs (AtMostOnce invariant).
+ Assert.NotNull(flushedAtFuncEntry);
+ Assert.Equal(new[] { "START" }, flushedAtFuncEntry);
+
+ // After step returns, SUCCEED has also been flushed.
+ var actions = recorder.Flushed.Select(o => o.Action.ToString()).ToArray();
+ Assert.Equal(new[] { "START", "SUCCEED" }, actions);
+ }
+
+ [Fact]
+ public async Task StepAsync_AtMostOnce_StartedReplay_TriggersRetryHandler()
+ {
+ var tm = new TerminationManager();
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Step,
+ Status = OperationStatuses.Started
+ }
+ }
+ });
+ var idGen = new OperationIdGenerator();
+ var lambdaContext = new TestLambdaContext();
+ var recorder = new RecordingBatcher();
+ var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher);
+
+ var executed = false;
+ var stepTask = context.StepAsync(
+ async (_) => { executed = true; await Task.CompletedTask; return "should not run"; },
+ name: "amo_replay",
+ config: new StepConfig
+ {
+ Semantics = StepSemantics.AtMostOncePerRetry,
+ RetryStrategy = RetryStrategy.Exponential(maxAttempts: 3, jitter: JitterStrategy.None)
+ });
+
+ await Task.Delay(50);
+
+ Assert.False(executed);
+ Assert.True(tm.IsTerminated);
+ Assert.False(stepTask.IsCompleted);
+
+ var checkpoints = recorder.Flushed;
+ Assert.Single(checkpoints);
+ Assert.Equal("RETRY", checkpoints[0].Action);
+ }
+
+ #endregion
}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs
index 032a25a66..b624766eb 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs
@@ -196,30 +196,37 @@ public async Task WrapAsync_CheckpointsAreSentToService()
mockClient);
Assert.Equal(InvocationStatus.Pending, output.Status);
- Assert.Equal(2, mockClient.CheckpointCalls.Count);
-
- // First flush: step SUCCEED (the user awaits StepAsync, which awaits
- // its SUCCEED enqueue, which blocks until the batcher flushes it).
- var firstCall = mockClient.CheckpointCalls[0];
- Assert.Equal("arn:aws:lambda:us-east-1:123:durable-execution:checkpoint-test", firstCall.DurableExecutionArn);
- Assert.Equal("initial-token", firstCall.CheckpointToken);
- Assert.Single(firstCall.Updates);
- var stepUpdate = firstCall.Updates[0];
- Assert.Equal("STEP", stepUpdate.Type);
- Assert.Equal("SUCCEED", stepUpdate.Action);
- Assert.Equal("validate", stepUpdate.Name);
- Assert.NotNull(stepUpdate.Payload);
-
- // Second flush: wait START (blocks until the service has the timer
- // recorded before WaitAsync suspends).
- var secondCall = mockClient.CheckpointCalls[1];
- Assert.Single(secondCall.Updates);
- var waitUpdate = secondCall.Updates[0];
- Assert.Equal("WAIT", waitUpdate.Type);
- Assert.Equal("START", waitUpdate.Action);
- Assert.Equal("delay", waitUpdate.Name);
- Assert.NotNull(waitUpdate.WaitOptions);
- Assert.Equal(30, waitUpdate.WaitOptions.WaitSeconds);
+
+ // Each StepAsync emits a fire-and-forget START before user code runs
+ // (telemetry under AtLeastOncePerRetry). With FlushInterval = 0 the
+ // worker may flush the START on its own before SUCCEED arrives, so the
+ // exact batching of START vs SUCCEED is timing-dependent. Assert on
+ // the flat sequence of updates instead.
+ var allUpdates = mockClient.CheckpointCalls
+ .SelectMany(c => c.Updates)
+ .ToList();
+
+ // Expect: step START, step SUCCEED, wait START (in that order).
+ Assert.Equal(3, allUpdates.Count);
+
+ Assert.Equal("STEP", allUpdates[0].Type);
+ Assert.Equal("START", allUpdates[0].Action);
+ Assert.Equal("validate", allUpdates[0].Name);
+
+ Assert.Equal("STEP", allUpdates[1].Type);
+ Assert.Equal("SUCCEED", allUpdates[1].Action);
+ Assert.Equal("validate", allUpdates[1].Name);
+ Assert.NotNull(allUpdates[1].Payload);
+
+ Assert.Equal("WAIT", allUpdates[2].Type);
+ Assert.Equal("START", allUpdates[2].Action);
+ Assert.Equal("delay", allUpdates[2].Name);
+ Assert.NotNull(allUpdates[2].WaitOptions);
+ Assert.Equal(30, allUpdates[2].WaitOptions.WaitSeconds);
+
+ // The first call sends the initial checkpoint token.
+ Assert.Equal("arn:aws:lambda:us-east-1:123:durable-execution:checkpoint-test", mockClient.CheckpointCalls[0].DurableExecutionArn);
+ Assert.Equal("initial-token", mockClient.CheckpointCalls[0].CheckpointToken);
}
[Fact]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RetryStrategyTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RetryStrategyTests.cs
new file mode 100644
index 000000000..e5a277fb6
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RetryStrategyTests.cs
@@ -0,0 +1,202 @@
+using Amazon.Lambda.DurableExecution;
+using Xunit;
+
+namespace Amazon.Lambda.DurableExecution.Tests;
+
+public class RetryStrategyTests
+{
+ [Fact]
+ public void ExponentialDefault_RetriesUpToMaxAttempts()
+ {
+ var strategy = RetryStrategy.Default;
+
+ // Attempts 1-5 should retry (maxAttempts=6 means 6 total attempts)
+ for (int i = 1; i < 6; i++)
+ {
+ var decision = strategy.ShouldRetry(new InvalidOperationException("fail"), i);
+ Assert.True(decision.ShouldRetry);
+ Assert.True(decision.Delay >= TimeSpan.FromSeconds(1));
+ }
+
+ // Attempt 6 should not retry (exhausted)
+ var lastDecision = strategy.ShouldRetry(new InvalidOperationException("fail"), 6);
+ Assert.False(lastDecision.ShouldRetry);
+ }
+
+ [Fact]
+ public void None_NeverRetries()
+ {
+ var strategy = RetryStrategy.None;
+
+ var decision = strategy.ShouldRetry(new Exception("fail"), 1);
+ Assert.False(decision.ShouldRetry);
+ }
+
+ [Fact]
+ public void Transient_RetriesUpTo3Attempts()
+ {
+ var strategy = RetryStrategy.Transient;
+
+ Assert.True(strategy.ShouldRetry(new Exception("fail"), 1).ShouldRetry);
+ Assert.True(strategy.ShouldRetry(new Exception("fail"), 2).ShouldRetry);
+ Assert.False(strategy.ShouldRetry(new Exception("fail"), 3).ShouldRetry);
+ }
+
+ [Fact]
+ public void Exponential_DelayIncreases()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 5,
+ initialDelay: TimeSpan.FromSeconds(2),
+ maxDelay: TimeSpan.FromSeconds(120),
+ backoffRate: 2.0,
+ jitter: JitterStrategy.None);
+
+ var d1 = strategy.ShouldRetry(new Exception(), 1).Delay;
+ var d2 = strategy.ShouldRetry(new Exception(), 2).Delay;
+ var d3 = strategy.ShouldRetry(new Exception(), 3).Delay;
+
+ // With no jitter: 2s, 4s, 8s (ceiling to whole seconds)
+ Assert.Equal(TimeSpan.FromSeconds(2), d1);
+ Assert.Equal(TimeSpan.FromSeconds(4), d2);
+ Assert.Equal(TimeSpan.FromSeconds(8), d3);
+ }
+
+ [Fact]
+ public void Exponential_DelayCapsAtMax()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 10,
+ initialDelay: TimeSpan.FromSeconds(10),
+ maxDelay: TimeSpan.FromSeconds(30),
+ backoffRate: 3.0,
+ jitter: JitterStrategy.None);
+
+ // Attempt 3: 10 * 3^2 = 90, capped to 30
+ var decision = strategy.ShouldRetry(new Exception(), 3);
+ Assert.Equal(TimeSpan.FromSeconds(30), decision.Delay);
+ }
+
+ [Fact]
+ public void Exponential_FullJitter_BoundedByDelay()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 5,
+ initialDelay: TimeSpan.FromSeconds(10),
+ maxDelay: TimeSpan.FromSeconds(100),
+ backoffRate: 2.0,
+ jitter: JitterStrategy.Full);
+
+ // Run multiple times to check bounds
+ for (int i = 0; i < 50; i++)
+ {
+ var decision = strategy.ShouldRetry(new Exception(), 1);
+ Assert.True(decision.Delay >= TimeSpan.FromSeconds(1));
+ Assert.True(decision.Delay <= TimeSpan.FromSeconds(10));
+ }
+ }
+
+ [Fact]
+ public void Exponential_HalfJitter_BoundedBetween50And100Percent()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 5,
+ initialDelay: TimeSpan.FromSeconds(10),
+ maxDelay: TimeSpan.FromSeconds(100),
+ backoffRate: 2.0,
+ jitter: JitterStrategy.Half);
+
+ for (int i = 0; i < 50; i++)
+ {
+ var decision = strategy.ShouldRetry(new Exception(), 1);
+ Assert.True(decision.Delay >= TimeSpan.FromSeconds(5));
+ Assert.True(decision.Delay <= TimeSpan.FromSeconds(10));
+ }
+ }
+
+ [Fact]
+ public void Exponential_RetryableExceptions_FiltersCorrectly()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 3,
+ retryableExceptions: new[] { typeof(TimeoutException), typeof(HttpRequestException) });
+
+ Assert.True(strategy.ShouldRetry(new TimeoutException(), 1).ShouldRetry);
+ Assert.True(strategy.ShouldRetry(new HttpRequestException(), 1).ShouldRetry);
+ Assert.False(strategy.ShouldRetry(new InvalidOperationException(), 1).ShouldRetry);
+ }
+
+ [Fact]
+ public void Exponential_RetryableExceptions_MatchesDerivedTypes()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 3,
+ retryableExceptions: new[] { typeof(IOException) });
+
+ Assert.True(strategy.ShouldRetry(new FileNotFoundException(), 1).ShouldRetry);
+ }
+
+ [Fact]
+ public void Exponential_MessagePatterns_FiltersCorrectly()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 3,
+ retryableMessagePatterns: new[] { "timeout", "throttl", "5\\d{2}" });
+
+ Assert.True(strategy.ShouldRetry(new Exception("connection timeout"), 1).ShouldRetry);
+ Assert.True(strategy.ShouldRetry(new Exception("request throttled"), 1).ShouldRetry);
+ Assert.True(strategy.ShouldRetry(new Exception("HTTP 503"), 1).ShouldRetry);
+ Assert.False(strategy.ShouldRetry(new Exception("not found"), 1).ShouldRetry);
+ }
+
+ [Fact]
+ public void Exponential_BothFilters_EitherMatches()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 3,
+ retryableExceptions: new[] { typeof(TimeoutException) },
+ retryableMessagePatterns: new[] { "throttl" });
+
+ // Matches exception type
+ Assert.True(strategy.ShouldRetry(new TimeoutException("any message"), 1).ShouldRetry);
+ // Matches message pattern
+ Assert.True(strategy.ShouldRetry(new Exception("throttled"), 1).ShouldRetry);
+ // Matches neither
+ Assert.False(strategy.ShouldRetry(new InvalidOperationException("bad state"), 1).ShouldRetry);
+ }
+
+ [Fact]
+ public void Exponential_NoFilters_RetriesAllExceptions()
+ {
+ var strategy = RetryStrategy.Exponential(maxAttempts: 3);
+
+ Assert.True(strategy.ShouldRetry(new Exception("anything"), 1).ShouldRetry);
+ Assert.True(strategy.ShouldRetry(new InvalidOperationException(), 1).ShouldRetry);
+ Assert.True(strategy.ShouldRetry(new OutOfMemoryException(), 1).ShouldRetry);
+ }
+
+ [Fact]
+ public void Exponential_MinimumDelayIsOneSecond()
+ {
+ var strategy = RetryStrategy.Exponential(
+ maxAttempts: 3,
+ initialDelay: TimeSpan.FromMilliseconds(100),
+ jitter: JitterStrategy.None);
+
+ var decision = strategy.ShouldRetry(new Exception(), 1);
+ Assert.True(decision.Delay >= TimeSpan.FromSeconds(1));
+ }
+
+ [Fact]
+ public void FromDelegate_UsesProvidedFunction()
+ {
+ var strategy = RetryStrategy.FromDelegate((ex, attempt) =>
+ attempt < 2 && ex is TimeoutException
+ ? RetryDecision.RetryAfter(TimeSpan.FromSeconds(5))
+ : RetryDecision.DoNotRetry());
+
+ Assert.True(strategy.ShouldRetry(new TimeoutException(), 1).ShouldRetry);
+ Assert.False(strategy.ShouldRetry(new TimeoutException(), 2).ShouldRetry);
+ Assert.False(strategy.ShouldRetry(new Exception(), 1).ShouldRetry);
+ }
+}