diff --git a/Docs/durable-execution-design.md b/Docs/durable-execution-design.md
index 402d689af..3d0517339 100644
--- a/Docs/durable-execution-design.md
+++ b/Docs/durable-execution-design.md
@@ -1108,18 +1108,40 @@ public interface IDurableContext
CancellationToken cancellationToken = default);
///
- /// Create a callback for external system integration.
+ /// Create a callback for external system integration. The result is
+ /// deserialized using reflection-based System.Text.Json; an AOT-safe
+ /// overload taking ICheckpointSerializer<T> is also provided.
///
+ [RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")]
+ [RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")]
+ Task> CreateCallbackAsync(
+ string? name = null,
+ CallbackConfig? config = null,
+ CancellationToken cancellationToken = default);
+
+ /// AOT-safe overload of CreateCallbackAsync.
Task> CreateCallbackAsync(
+ ICheckpointSerializer serializer,
string? name = null,
CallbackConfig? config = null,
CancellationToken cancellationToken = default);
///
- /// Wait for an external system to respond via callback.
+ /// Wait for an external system to respond via callback. Composes
+ /// CreateCallback + Step(submitter) + GetResult inside a child context.
///
+ [RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")]
+ [RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")]
Task WaitForCallbackAsync(
- Func submitter,
+ Func submitter,
+ string? name = null,
+ WaitForCallbackConfig? config = null,
+ CancellationToken cancellationToken = default);
+
+ /// AOT-safe overload of WaitForCallbackAsync.
+ Task WaitForCallbackAsync(
+ Func submitter,
+ ICheckpointSerializer serializer,
string? name = null,
WaitForCallbackConfig? config = null,
CancellationToken cancellationToken = default);
@@ -1208,6 +1230,20 @@ public interface IStepContext
string OperationId { get; }
}
+///
+/// Context passed to the submitter delegate of WaitForCallbackAsync.
+/// Distinct from so the submitter API can evolve
+/// independently. Mirrors WaitForCallbackContext in the Python and
+/// JavaScript SDKs (logger-only surface).
+///
+public interface IWaitForCallbackContext
+{
+ ///
+ /// Logger scoped to the submitter step (replay-safe).
+ ///
+ ILogger Logger { get; }
+}
+
///
/// A named branch for parallel execution. Named branches appear in execution
/// traces and can be inspected by name in the test runner.
@@ -1568,7 +1604,9 @@ public interface ICallback
/// Wait for and return the callback result.
/// Suspends execution until the result is available.
///
- Task GetResultAsync(CancellationToken cancellationToken = default);
+ /// External system reported failure.
+ /// Service marked the callback TIMED_OUT.
+ Task GetResultAsync(CancellationToken cancellationToken = default);
}
///
@@ -1605,14 +1643,31 @@ public class StepException : DurableExecutionException
}
///
-/// Thrown when a callback fails or times out.
+/// Base exception for callback failures. Concrete subclasses distinguish
+/// failure modes — pattern-match the subclass type rather than inspecting
+/// a flag.
///
public class CallbackException : DurableExecutionException
{
- public string? CallbackId { get; }
- public bool IsTimeout { get; }
+ public string? CallbackId { get; init; }
+ public string? ErrorType { get; init; }
+ public string? ErrorData { get; init; }
+ public IReadOnlyList? OriginalStackTrace { get; init; }
}
+/// External system reported a failure result for the callback.
+public class CallbackFailedException : CallbackException { }
+
+/// Service marked the callback TIMED_OUT (overall or heartbeat).
+public class CallbackTimeoutException : CallbackException { }
+
+///
+/// Submitter step (the inner step inside WaitForCallbackAsync) failed
+/// after retries are exhausted. Wraps the underlying StepException.
+/// Only thrown from WaitForCallbackAsync.
+///
+public class CallbackSubmitterException : CallbackException { }
+
///
/// Thrown when an invoked function fails.
///
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/CallbackConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/CallbackConfig.cs
new file mode 100644
index 000000000..4f30c5ab8
--- /dev/null
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/CallbackConfig.cs
@@ -0,0 +1,83 @@
+namespace Amazon.Lambda.DurableExecution;
+
+///
+/// Configuration for callback operations created via
+/// .
+///
+///
+/// Custom serializers are not supplied here — pass an
+/// directly to the AOT-safe
+/// CreateCallbackAsync overload instead, matching the established
+/// StepAsync pattern.
+///
+public class CallbackConfig
+{
+ private TimeSpan _timeout = TimeSpan.Zero;
+ private TimeSpan _heartbeatTimeout = TimeSpan.Zero;
+
+ ///
+ /// Maximum total time the service will wait for the external system to
+ /// complete the callback. (default) means no
+ /// overall timeout — only applies (if set).
+ ///
+ ///
+ /// The service's timer granularity is 1 second, so values strictly between
+ /// and 1 second are rejected to avoid silent
+ /// rounding. Use to disable the timeout, or a
+ /// value of at least 1 second.
+ ///
+ ///
+ /// Thrown when set to a positive value less than 1 second.
+ ///
+ public TimeSpan Timeout
+ {
+ get => _timeout;
+ set
+ {
+ ValidateTimeout(value, nameof(Timeout));
+ _timeout = value;
+ }
+ }
+
+ ///
+ /// Maximum gap between heartbeat signals from the external system before
+ /// the service marks the callback as timed-out.
+ /// (default) means no heartbeat timeout.
+ ///
+ ///
+ /// The service's timer granularity is 1 second, so values strictly between
+ /// and 1 second are rejected to avoid silent
+ /// rounding. Use to disable the heartbeat
+ /// timeout, or a value of at least 1 second.
+ ///
+ ///
+ /// Thrown when set to a positive value less than 1 second.
+ ///
+ public TimeSpan HeartbeatTimeout
+ {
+ get => _heartbeatTimeout;
+ set
+ {
+ ValidateTimeout(value, nameof(HeartbeatTimeout));
+ _heartbeatTimeout = value;
+ }
+ }
+
+ private static void ValidateTimeout(TimeSpan value, string paramName)
+ {
+ // Allow Zero (means "not set"); reject negative; reject sub-second
+ // positive values to mirror WaitAsync's behavior and prevent silent
+ // rounding-up inside BuildCallbackOptions.
+ if (value < TimeSpan.Zero)
+ {
+ throw new ArgumentOutOfRangeException(
+ paramName, value, $"{paramName} must be non-negative.");
+ }
+ if (value > TimeSpan.Zero && value < TimeSpan.FromSeconds(1))
+ {
+ throw new ArgumentOutOfRangeException(
+ paramName, value,
+ $"{paramName} must be at least 1 second (or TimeSpan.Zero to disable).");
+ }
+ }
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitForCallbackConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitForCallbackConfig.cs
new file mode 100644
index 000000000..1dceb8746
--- /dev/null
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitForCallbackConfig.cs
@@ -0,0 +1,18 @@
+namespace Amazon.Lambda.DurableExecution;
+
+///
+/// Configuration for the composite
+///
+/// operation. Inherits the callback's and
+/// ; adds a
+/// for the submitter step.
+///
+public class WaitForCallbackConfig : CallbackConfig
+{
+ ///
+ /// Retry strategy applied to the submitter step. When null (default),
+ /// submitter failures are not retried — the submitter step fails terminally
+ /// and surfaces as .
+ ///
+ public IRetryStrategy? RetryStrategy { get; set; }
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
index 69e5e580c..e814a7c1b 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
@@ -162,6 +162,235 @@ private Task RunChildContext(
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}
+
+ [RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")]
+ [RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")]
+ public Task> CreateCallbackAsync(
+ string? name = null,
+ CallbackConfig? config = null,
+ CancellationToken cancellationToken = default)
+ => RunCallback(new ReflectionJsonCheckpointSerializer(), name, config, cancellationToken);
+
+ public Task> CreateCallbackAsync(
+ ICheckpointSerializer serializer,
+ string? name = null,
+ CallbackConfig? config = null,
+ CancellationToken cancellationToken = default)
+ => RunCallback(serializer, name, config, cancellationToken);
+
+ private Task> RunCallback(
+ ICheckpointSerializer serializer,
+ string? name,
+ CallbackConfig? config,
+ CancellationToken cancellationToken)
+ {
+ var operationId = _idGenerator.NextId();
+ var op = new CallbackOperation(
+ operationId, name, config, serializer,
+ _state, _terminationManager, _durableExecutionArn, _batcher);
+ return op.ExecuteAsync(cancellationToken);
+ }
+
+ [RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")]
+ [RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")]
+ public Task WaitForCallbackAsync(
+ Func submitter,
+ string? name = null,
+ WaitForCallbackConfig? config = null,
+ CancellationToken cancellationToken = default)
+ => RunWaitForCallback(submitter, new ReflectionJsonCheckpointSerializer(), name, config, cancellationToken);
+
+ public Task WaitForCallbackAsync(
+ Func submitter,
+ ICheckpointSerializer serializer,
+ string? name = null,
+ WaitForCallbackConfig? config = null,
+ CancellationToken cancellationToken = default)
+ => RunWaitForCallback(submitter, serializer, name, config, cancellationToken);
+
+ ///
+ /// Composes WaitForCallback over RunInChildContextAsync + CreateCallbackAsync
+ /// + StepAsync(submitter) + callback.GetResultAsync. Mirrors the pattern used
+ /// by the Python, JS, and Java reference SDKs.
+ ///
+ ///
+ /// Sub-operation naming follows kebab-style: "{name}-callback" and
+ /// "{name}-submitter". When the parent is null,
+ /// the inner ops are also nameless (no leading hyphen).
+ ///
+ /// remaps a submitter
+ /// to .
+ /// Callback errors () pass through unchanged.
+ ///
+ ///
+ private Task RunWaitForCallback(
+ Func submitter,
+ ICheckpointSerializer serializer,
+ string? name,
+ WaitForCallbackConfig? config,
+ CancellationToken cancellationToken)
+ {
+ var callbackName = name == null ? null : $"{name}-callback";
+ var submitterName = name == null ? null : $"{name}-submitter";
+
+ var callbackConfig = config == null ? null : new CallbackConfig
+ {
+ Timeout = config.Timeout,
+ HeartbeatTimeout = config.HeartbeatTimeout,
+ };
+
+ var stepConfig = config?.RetryStrategy == null
+ ? null
+ : new StepConfig { RetryStrategy = config.RetryStrategy };
+
+ // Use the AOT-safe child-context overload so the wrapper itself doesn't
+ // need [RequiresUnreferencedCode] beyond the public WaitForCallbackAsync
+ // attribute. The user-supplied serializer (or the reflection serializer
+ // wired in by the reflection overload) is used to deserialize the inner
+ // callback's payload AND the child-context's own result (which is just
+ // an unwrap of T returned from the await — same shape).
+ return RunChildContext(
+ async childCtx =>
+ {
+ var callback = await childCtx.CreateCallbackAsync(
+ serializer,
+ name: callbackName,
+ config: callbackConfig,
+ cancellationToken: cancellationToken);
+
+ await childCtx.StepAsync(
+ async (stepCtx) =>
+ {
+ var submitterCtx = new WaitForCallbackContext(stepCtx.Logger);
+ await submitter(callback.CallbackId, submitterCtx);
+ },
+ name: submitterName,
+ config: stepConfig,
+ cancellationToken: cancellationToken);
+
+ return await callback.GetResultAsync(cancellationToken);
+ },
+ serializer,
+ name,
+ new ChildContextConfig
+ {
+ SubType = OperationSubTypes.WaitForCallback,
+ ErrorMapping = MapWaitForCallbackException,
+ },
+ cancellationToken);
+ }
+
+ private static Exception MapWaitForCallbackException(Exception ex)
+ {
+ // Callback errors are already user-meaningful (CallbackFailed/Timeout
+ // from inside the callback await). Pass through.
+ if (ex is CallbackException) return ex;
+
+ // The ChildContextOperation wraps thrown exceptions in
+ // ChildContextException; unwrap to surface the underlying cause.
+ if (ex is ChildContextException childEx)
+ {
+ // CallbackException thrown from GetResultAsync (callback completed
+ // with FAILED/TIMED_OUT) — surface directly.
+ //
+ // Fresh-execution path: InnerException is the live exception object.
+ // Replay path: InnerException is null but ErrorType carries the string.
+ if (childEx.InnerException is CallbackException nestedLive)
+ return nestedLive;
+ if (IsCallbackErrorTypeString(childEx.ErrorType))
+ {
+ // Replay-side reconstruction: preserve subclass fidelity by
+ // dispatching on the stored ErrorType FullName so a stored
+ // CallbackTimeoutException remaps to CallbackTimeoutException
+ // (not the more generic CallbackFailedException).
+ return BuildCallbackExceptionForReplay(childEx);
+ }
+
+ // Submitter step exhausted retries → wrap as CallbackSubmitterException.
+ // Fresh path: InnerException is the live StepException.
+ if (childEx.InnerException is StepException stepLive)
+ {
+ return new CallbackSubmitterException(stepLive.Message, stepLive)
+ {
+ ErrorType = stepLive.ErrorType,
+ ErrorData = stepLive.ErrorData,
+ OriginalStackTrace = stepLive.OriginalStackTrace,
+ };
+ }
+ // Replay path: InnerException is null; ErrorType is the type string.
+ if (childEx.ErrorType == typeof(StepException).FullName)
+ {
+ return new CallbackSubmitterException(childEx.Message, childEx)
+ {
+ ErrorType = childEx.ErrorType,
+ ErrorData = childEx.ErrorData,
+ OriginalStackTrace = childEx.OriginalStackTrace,
+ };
+ }
+ }
+
+ // Anything else — surface unchanged so the user sees the original cause.
+ return ex;
+ }
+
+ private static CallbackException BuildCallbackExceptionForReplay(ChildContextException childEx)
+ {
+ // Dispatch on the stored ErrorType FullName to preserve the original
+ // subclass across replays. Caller has already verified
+ // IsCallbackErrorTypeString(childEx.ErrorType) is true.
+ if (childEx.ErrorType == typeof(CallbackTimeoutException).FullName)
+ {
+ return new CallbackTimeoutException(childEx.Message, childEx)
+ {
+ ErrorType = childEx.ErrorType,
+ ErrorData = childEx.ErrorData,
+ OriginalStackTrace = childEx.OriginalStackTrace,
+ };
+ }
+ if (childEx.ErrorType == typeof(CallbackSubmitterException).FullName)
+ {
+ return new CallbackSubmitterException(childEx.Message, childEx)
+ {
+ ErrorType = childEx.ErrorType,
+ ErrorData = childEx.ErrorData,
+ OriginalStackTrace = childEx.OriginalStackTrace,
+ };
+ }
+ if (childEx.ErrorType == typeof(CallbackException).FullName)
+ {
+ return new CallbackException(childEx.Message, childEx)
+ {
+ ErrorType = childEx.ErrorType,
+ ErrorData = childEx.ErrorData,
+ OriginalStackTrace = childEx.OriginalStackTrace,
+ };
+ }
+ // CallbackFailedException.FullName (or any future callback subtype not
+ // listed above) defaults to CallbackFailedException — the most general
+ // "callback failed" surface that preserves user-catchable behavior.
+ return new CallbackFailedException(childEx.Message, childEx)
+ {
+ ErrorType = childEx.ErrorType,
+ ErrorData = childEx.ErrorData,
+ OriginalStackTrace = childEx.OriginalStackTrace,
+ };
+ }
+
+ private static bool IsCallbackErrorTypeString(string? errorType) =>
+ errorType == typeof(CallbackFailedException).FullName
+ || errorType == typeof(CallbackTimeoutException).FullName
+ || errorType == typeof(CallbackSubmitterException).FullName
+ || errorType == typeof(CallbackException).FullName;
+}
+
+internal sealed class WaitForCallbackContext : IWaitForCallbackContext
+{
+ public WaitForCallbackContext(ILogger logger)
+ {
+ Logger = logger;
+ }
+
+ public ILogger Logger { get; }
}
///
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs
index d629a0b2e..0ed3e57a8 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs
@@ -171,7 +171,12 @@ private static async Task WrapAsyncCore serviceClient.CheckpointAsync(
- invocationInput.DurableExecutionArn, token, ops, ct));
+ invocationInput.DurableExecutionArn, token, ops,
+ // The service stamps a freshly-allocated CallbackId onto a started
+ // CALLBACK op (and may emit terminal-state callbacks/timers); merge
+ // those back into ExecutionState so the next ExecuteAsync sees them.
+ onNewOperations: state.AddOperations,
+ cancellationToken: ct));
var context = new DurableContext(
state, terminationManager, idGenerator,
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/CallbackException.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/CallbackException.cs
new file mode 100644
index 000000000..ff878a0a9
--- /dev/null
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/CallbackException.cs
@@ -0,0 +1,86 @@
+namespace Amazon.Lambda.DurableExecution;
+
+///
+/// Base exception type for callback failures surfaced from
+///
+/// or
+/// .
+/// Concrete subclasses distinguish failure modes — pattern-match
+/// , ,
+/// or in catch clauses.
+///
+public class CallbackException : DurableExecutionException
+{
+ /// The callback ID associated with the failure (if known).
+ public string? CallbackId { get; init; }
+
+ /// The fully-qualified type name of the original error, if known.
+ public string? ErrorType { get; init; }
+
+ /// Optional structured error data attached by the external system.
+ public string? ErrorData { get; init; }
+
+ /// Stack trace of the original error, captured before serialization.
+ public IReadOnlyList? OriginalStackTrace { get; init; }
+
+ /// Creates an empty .
+ public CallbackException() { }
+
+ /// Creates a with the given message.
+ public CallbackException(string message) : base(message) { }
+
+ /// Creates a wrapping an inner exception.
+ public CallbackException(string message, Exception innerException) : base(message, innerException) { }
+}
+
+///
+/// Thrown when the external system reports a failure result for a callback
+/// (via SendDurableExecutionCallbackFailure).
+///
+public class CallbackFailedException : CallbackException
+{
+ /// Creates an empty .
+ public CallbackFailedException() { }
+
+ /// Creates a with the given message.
+ public CallbackFailedException(string message) : base(message) { }
+
+ /// Creates a wrapping an inner exception.
+ public CallbackFailedException(string message, Exception innerException) : base(message, innerException) { }
+}
+
+///
+/// Thrown when the durable execution service marks a callback as timed-out —
+/// either the overall or the
+/// elapsed.
+///
+public class CallbackTimeoutException : CallbackException
+{
+ /// Creates an empty .
+ public CallbackTimeoutException() { }
+
+ /// Creates a with the given message.
+ public CallbackTimeoutException(string message) : base(message) { }
+
+ /// Creates a wrapping an inner exception.
+ public CallbackTimeoutException(string message, Exception innerException) : base(message, innerException) { }
+}
+
+///
+/// Thrown only from
+///
+/// when the user-supplied submitter delegate (the step that hands the callback
+/// ID to the external system) fails after retries are exhausted. Wraps the
+/// underlying as .
+///
+public class CallbackSubmitterException : CallbackException
+{
+ /// Creates an empty .
+ public CallbackSubmitterException() { }
+
+ /// Creates a with the given message.
+ public CallbackSubmitterException(string message) : base(message) { }
+
+ /// Creates a wrapping an inner exception.
+ public CallbackSubmitterException(string message, Exception innerException) : base(message, innerException) { }
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/ICallback.cs b/Libraries/src/Amazon.Lambda.DurableExecution/ICallback.cs
new file mode 100644
index 000000000..a00b8e3c4
--- /dev/null
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/ICallback.cs
@@ -0,0 +1,39 @@
+namespace Amazon.Lambda.DurableExecution;
+
+///
+/// A pending callback created by
+/// .
+/// Hands back a for external systems to use, plus a
+/// hook that
+/// suspends the workflow until the external system completes the callback.
+///
+/// The callback result type.
+public interface ICallback
+{
+ ///
+ /// The callback ID generated by the durable execution service. External
+ /// systems pass this ID to SendDurableExecutionCallbackSuccess /
+ /// SendDurableExecutionCallbackFailure /
+ /// SendDurableExecutionCallbackHeartbeat to deliver a result.
+ ///
+ string CallbackId { get; }
+
+ ///
+ /// Suspends the workflow until the callback is completed, then returns the
+ /// deserialized result.
+ ///
+ ///
+ /// On the first invocation that reaches this call, the workflow suspends
+ /// (Lambda terminates). When the external system completes the callback
+ /// the service re-invokes Lambda; this call then returns the cached result
+ /// without re-executing user code.
+ ///
+ ///
+ /// Thrown when the external system reported a failure result.
+ ///
+ ///
+ /// Thrown when the service timed out the callback (overall timeout or
+ /// heartbeat timeout elapsed).
+ ///
+ Task GetResultAsync(CancellationToken cancellationToken = default);
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs
index eb10a0ffe..9bdc8059a 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs
@@ -119,6 +119,78 @@ Task RunInChildContextAsync(
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default);
+
+ ///
+ /// Create a callback for an external system to complete. Returns an
+ /// handle exposing the service-allocated
+ /// (pass to the external system) and
+ ///
+ /// (await to suspend until a result arrives).
+ ///
+ ///
+ /// The result is deserialized using reflection-based System.Text.Json.
+ /// For NativeAOT or trimmed deployments, use the overload that takes an
+ /// .
+ ///
+ /// Errors are deferred to ;
+ /// CreateCallbackAsync always returns successfully so user code
+ /// between CreateCallbackAsync and the result-await runs deterministically
+ /// across replays.
+ ///
+ ///
+ [RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")]
+ [RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")]
+ Task> CreateCallbackAsync(
+ string? name = null,
+ CallbackConfig? config = null,
+ CancellationToken cancellationToken = default);
+
+ ///
+ /// Create a callback with AOT-safe checkpoint deserialization. The supplied
+ /// is used in place of reflection-based JSON
+ /// when
+ /// resolves the callback's payload.
+ ///
+ Task> CreateCallbackAsync(
+ ICheckpointSerializer serializer,
+ string? name = null,
+ CallbackConfig? config = null,
+ CancellationToken cancellationToken = default);
+
+ ///
+ /// Composite operation that creates a callback, runs the supplied submitter
+ /// (which hands the callbackId to an external system), and suspends
+ /// until the external system delivers a result. Equivalent to manually
+ /// composing
+ /// +
+ /// +
+ /// inside a child context.
+ ///
+ ///
+ /// Submitter failures (after retries are exhausted) surface as
+ /// . Callback failures and timeouts
+ /// surface as /
+ /// .
+ ///
+ [RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")]
+ [RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")]
+ Task WaitForCallbackAsync(
+ Func submitter,
+ string? name = null,
+ WaitForCallbackConfig? config = null,
+ CancellationToken cancellationToken = default);
+
+ ///
+ /// AOT-safe overload of
+ /// .
+ /// The supplied is used in place of reflection-based JSON.
+ ///
+ Task WaitForCallbackAsync(
+ Func submitter,
+ ICheckpointSerializer serializer,
+ string? name = null,
+ WaitForCallbackConfig? config = null,
+ CancellationToken cancellationToken = default);
}
///
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/IWaitForCallbackContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/IWaitForCallbackContext.cs
new file mode 100644
index 000000000..4ed8cd4ba
--- /dev/null
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/IWaitForCallbackContext.cs
@@ -0,0 +1,21 @@
+using Microsoft.Extensions.Logging;
+
+namespace Amazon.Lambda.DurableExecution;
+
+///
+/// Context passed to the submitter delegate of
+/// .
+/// Provides a replay-safe logger scoped to the submitter step.
+///
+///
+/// Distinct from so the submitter API can evolve
+/// independently. Mirrors WaitForCallbackContext in the Python and
+/// JavaScript SDKs (logger-only surface).
+///
+public interface IWaitForCallbackContext
+{
+ ///
+ /// Logger scoped to the submitter step.
+ ///
+ ILogger Logger { get; }
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CallbackOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CallbackOperation.cs
new file mode 100644
index 000000000..ee5c842bb
--- /dev/null
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CallbackOperation.cs
@@ -0,0 +1,240 @@
+using SdkCallbackOptions = Amazon.Lambda.Model.CallbackOptions;
+using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate;
+
+namespace Amazon.Lambda.DurableExecution.Internal;
+
+///
+/// Durable callback operation. Sync-flushes a CALLBACK START checkpoint
+/// (the service stamps a freshly-allocated CallbackId onto the response,
+/// which the batcher merges back into ), then hands
+/// the user an they can later
+///
+/// to suspend on.
+///
+///
+/// Replay branches — example:
+///
+/// var cb = await ctx.CreateCallbackAsync<ApprovalResult>(name: "approval");
+/// // ... external system told to use cb.CallbackId ...
+/// var result = await cb.GetResultAsync();
+///
+///
+/// - Fresh: no prior state → sync-flush CALLBACK START;
+/// the service responds with a CallbackId (merged into state by the
+/// batcher); construct the and return it.
+/// then suspends.
+/// - STARTED: a CallbackId is already on the checkpoint; reuse it.
+/// suspends (the external system hasn't
+/// responded yet) — service re-invokes once it does.
+/// - SUCCEEDED / FAILED / TIMED_OUT: terminal — construct the
+/// with the cached state and return.
+/// immediately deserializes / throws.
+///
+/// CRITICAL: CreateCallbackAsync always succeeds — it returns the
+/// handle regardless of terminal state. Errors are
+/// deferred to
+/// so user code between CreateCallbackAsync and the result-await runs
+/// deterministically across replays. Mirrors Python CallbackOperationExecutor's
+/// "do NOT raise on FAILED" rule.
+///
+internal sealed class CallbackOperation : DurableOperation>, ICallback
+{
+ private readonly CallbackConfig? _config;
+ private readonly ICheckpointSerializer _serializer;
+
+ private string? _callbackId;
+
+ public CallbackOperation(
+ string operationId,
+ string? name,
+ CallbackConfig? config,
+ ICheckpointSerializer serializer,
+ ExecutionState state,
+ TerminationManager termination,
+ string durableExecutionArn,
+ CheckpointBatcher? batcher = null)
+ : base(operationId, name, state, termination, durableExecutionArn, batcher)
+ {
+ _config = config;
+ _serializer = serializer;
+ }
+
+ protected override string OperationType => OperationTypes.Callback;
+
+ ///
+ /// Set when an existing terminal-state checkpoint was observed during
+ /// dispatch. reads this directly to short-
+ /// circuit deserialization (or throw the recorded error) without suspending.
+ ///
+ private Operation? _terminalReplay;
+
+ ///
+ public string CallbackId => _callbackId
+ ?? throw new InvalidOperationException(
+ "CallbackId is unavailable. Ensure CreateCallbackAsync has completed before reading CallbackId.");
+
+ protected override async Task> StartAsync(CancellationToken cancellationToken)
+ {
+ // Sync-flush the START so the service can allocate a CallbackId for us.
+ // The batcher's onNewOperations hook merges the service's response into
+ // ExecutionState, so reading state.GetOperation(OperationId) right after
+ // the await sees the populated CallbackDetails.
+ await EnqueueAsync(new SdkOperationUpdate
+ {
+ Id = OperationId,
+ Type = OperationTypes.Callback,
+ Action = "START",
+ SubType = OperationSubTypes.Callback,
+ Name = Name,
+ CallbackOptions = BuildCallbackOptions()
+ }, cancellationToken);
+
+ var stamped = State.GetOperation(OperationId);
+ var callbackId = stamped?.CallbackDetails?.CallbackId;
+ if (string.IsNullOrEmpty(callbackId))
+ {
+ // Service didn't return a CallbackId — this is a service-contract
+ // violation, not user error. Surface as a non-deterministic error
+ // so the workflow fails fast rather than silently NRE-ing later.
+ throw new NonDeterministicExecutionException(
+ $"Callback operation '{Name ?? OperationId}' was started but the service did not return a CallbackId.");
+ }
+
+ _callbackId = callbackId;
+
+ // If the service already reported a terminal state on the START response
+ // (the external system replied synchronously, or timeout was instant),
+ // record it for GetResultAsync to short-circuit on.
+ if (IsTerminalStatus(stamped?.Status))
+ {
+ _terminalReplay = stamped;
+ }
+
+ return this;
+ }
+
+ protected override Task> ReplayAsync(Operation existing, CancellationToken cancellationToken)
+ {
+ var callbackId = existing.CallbackDetails?.CallbackId;
+ if (string.IsNullOrEmpty(callbackId))
+ {
+ throw new NonDeterministicExecutionException(
+ $"Callback operation '{Name ?? OperationId}' has no CallbackId on its checkpoint.");
+ }
+
+ _callbackId = callbackId;
+
+ // CRITICAL (matches Python CallbackOperationExecutor): we must NOT
+ // raise on terminal state here. CreateCallbackAsync always returns the
+ // ICallback handle so any user code between create and GetResult runs
+ // deterministically across replays. Defer status inspection to
+ // GetResultAsync below.
+ switch (existing.Status)
+ {
+ case OperationStatuses.Succeeded:
+ case OperationStatuses.Failed:
+ case OperationStatuses.TimedOut:
+ _terminalReplay = existing;
+ break;
+
+ case OperationStatuses.Started:
+ case OperationStatuses.Pending:
+ // External system hasn't responded yet — GetResultAsync will
+ // suspend so the service can re-invoke once it does.
+ break;
+
+ default:
+ throw new NonDeterministicExecutionException(
+ $"Callback operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay.");
+ }
+
+ return Task.FromResult>(this);
+ }
+
+ ///
+ public async Task GetResultAsync(CancellationToken cancellationToken = default)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+
+ // Terminal-state checkpoint already observed by Start/Replay — return
+ // (or throw) immediately without suspending.
+ if (_terminalReplay != null)
+ {
+ return ResolveTerminal(_terminalReplay);
+ }
+
+ // No terminal state yet. Suspend the workflow; the service re-invokes
+ // when the external system delivers a result.
+ return await Termination.SuspendAndAwait(
+ TerminationReason.CallbackPending,
+ $"callback:{Name ?? OperationId}");
+ }
+
+ private T ResolveTerminal(Operation op)
+ {
+ switch (op.Status)
+ {
+ case OperationStatuses.Succeeded:
+ var serialized = op.CallbackDetails?.Result;
+ if (serialized == null) return default!;
+ return _serializer.Deserialize(
+ serialized,
+ new SerializationContext(OperationId, DurableExecutionArn));
+
+ case OperationStatuses.Failed:
+ throw BuildFailedException(op);
+
+ case OperationStatuses.TimedOut:
+ throw BuildTimeoutException(op);
+
+ default:
+ // Should be unreachable — _terminalReplay is only set for terminal statuses.
+ throw new NonDeterministicExecutionException(
+ $"Callback operation '{Name ?? OperationId}' has unexpected status '{op.Status}' on result resolution.");
+ }
+ }
+
+ private CallbackFailedException BuildFailedException(Operation op)
+ {
+ var err = op.CallbackDetails?.Error;
+ var message = err?.ErrorMessage ?? "Callback failed";
+ return new CallbackFailedException(message)
+ {
+ CallbackId = op.CallbackDetails?.CallbackId,
+ ErrorType = err?.ErrorType,
+ ErrorData = err?.ErrorData,
+ OriginalStackTrace = err?.StackTrace,
+ };
+ }
+
+ private CallbackTimeoutException BuildTimeoutException(Operation op)
+ {
+ var err = op.CallbackDetails?.Error;
+ var message = err?.ErrorMessage ?? "Callback timed out";
+ return new CallbackTimeoutException(message)
+ {
+ CallbackId = op.CallbackDetails?.CallbackId,
+ ErrorType = err?.ErrorType,
+ ErrorData = err?.ErrorData,
+ OriginalStackTrace = err?.StackTrace,
+ };
+ }
+
+ private SdkCallbackOptions? BuildCallbackOptions()
+ {
+ if (_config == null) return null;
+ if (_config.Timeout == TimeSpan.Zero && _config.HeartbeatTimeout == TimeSpan.Zero) return null;
+
+ var options = new SdkCallbackOptions();
+ if (_config.Timeout > TimeSpan.Zero)
+ options.TimeoutSeconds = (int)Math.Max(1, Math.Ceiling(_config.Timeout.TotalSeconds));
+ if (_config.HeartbeatTimeout > TimeSpan.Zero)
+ options.HeartbeatTimeoutSeconds = (int)Math.Max(1, Math.Ceiling(_config.HeartbeatTimeout.TotalSeconds));
+ return options;
+ }
+
+ private static bool IsTerminalStatus(string? status) =>
+ status == OperationStatuses.Succeeded
+ || status == OperationStatuses.Failed
+ || status == OperationStatuses.TimedOut;
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs
index dba534f6b..946760413 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs
@@ -19,11 +19,17 @@ public LambdaDurableServiceClient(IAmazonLambda lambdaClient)
///
/// Flushes pending checkpoint operations to the durable execution service.
+ /// When is supplied, any
+ /// NewExecutionState.Operations the service returns (e.g. a freshly
+ /// allocated CallbackId after a callback START checkpoint, or a
+ /// timer-fired SUCCEEDED) are forwarded to the callback so the caller can
+ /// merge them into its in-memory .
///
public async Task CheckpointAsync(
string durableExecutionArn,
string? checkpointToken,
IReadOnlyList pendingOperations,
+ Action>? onNewOperations = null,
CancellationToken cancellationToken = default)
{
if (pendingOperations.Count == 0)
@@ -37,6 +43,22 @@ public LambdaDurableServiceClient(IAmazonLambda lambdaClient)
};
var response = await _lambdaClient.CheckpointDurableExecutionAsync(request, cancellationToken);
+
+ // The service returns NewExecutionState carrying any operations updated
+ // since the last checkpoint — most importantly, the callback ID stamped
+ // onto a freshly-started CALLBACK op, plus any externally-completed
+ // callbacks/timers. Hand them to the caller (DurableFunction wires this
+ // back into ExecutionState) so subsequent replay-style lookups see the
+ // updated state immediately.
+ var updated = response.NewExecutionState?.Operations;
+ if (onNewOperations != null && updated != null && updated.Count > 0)
+ {
+ var mapped = new List(updated.Count);
+ foreach (var sdkOp in updated)
+ mapped.Add(MapFromSdkOperation(sdkOp));
+ onNewOperations(mapped);
+ }
+
return response.CheckpointToken;
}
@@ -111,6 +133,16 @@ private static Internal.Operation MapFromSdkOperation(SdkOperation sdkOp)
ErrorType = sdkOp.ContextDetails.Error.ErrorType,
ErrorMessage = sdkOp.ContextDetails.Error.ErrorMessage
} : null
+ } : null,
+ CallbackDetails = sdkOp.CallbackDetails != null ? new Internal.CallbackDetails
+ {
+ CallbackId = sdkOp.CallbackDetails.CallbackId,
+ Result = sdkOp.CallbackDetails.Result,
+ Error = sdkOp.CallbackDetails.Error != null ? new ErrorObject
+ {
+ ErrorType = sdkOp.CallbackDetails.Error.ErrorType,
+ ErrorMessage = sdkOp.CallbackDetails.Error.ErrorMessage
+ } : null
} : null
};
}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CallbackFailedTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CallbackFailedTest.cs
new file mode 100644
index 000000000..de643c99f
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CallbackFailedTest.cs
@@ -0,0 +1,89 @@
+using System.Linq;
+using System.Text;
+using Amazon.Lambda.Model;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Amazon.Lambda.DurableExecution.IntegrationTests;
+
+public class CallbackFailedTest
+{
+ private readonly ITestOutputHelper _output;
+ public CallbackFailedTest(ITestOutputHelper output) => _output = output;
+
+ ///
+ /// End-to-end failure path for CreateCallbackAsync:
+ /// the test acts as the external system and reports a failure via
+ /// SendDurableExecutionCallbackFailure. The SDK should raise
+ /// from GetResultAsync, and the
+ /// workflow surfaces FAILED with that exception type recorded.
+ ///
+ [Fact]
+ public async Task CallbackFailed_SurfacesAsCallbackFailedException()
+ {
+ await using var deployment = await DurableFunctionDeployment.CreateAsync(
+ DurableFunctionDeployment.FindTestFunctionDir("CallbackFailedFunction"),
+ "cb-failed", _output);
+
+ var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "x"}""");
+ var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray());
+ _output.WriteLine($"Initial response: {responsePayload}");
+
+ var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60));
+ Assert.NotNull(arn);
+
+ var callbackId = await WaitForCallbackIdAsync(deployment, arn!, TimeSpan.FromSeconds(60));
+ Assert.False(string.IsNullOrEmpty(callbackId));
+ _output.WriteLine($"Service-allocated CallbackId: {callbackId}");
+
+ // Act as the external system: report a failure.
+ await deployment.LambdaClient.SendDurableExecutionCallbackFailureAsync(
+ new SendDurableExecutionCallbackFailureRequest
+ {
+ CallbackId = callbackId!,
+ Error = new Amazon.Lambda.Model.ErrorObject
+ {
+ ErrorType = "ApprovalRejected",
+ ErrorMessage = "external system rejected the request",
+ }
+ });
+
+ var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120));
+ Assert.Equal("FAILED", status, ignoreCase: true);
+
+ // The workflow's surfaced exception is CallbackFailedException — the SDK
+ // wraps the external error message into the exception's Message. Verify
+ // the recorded error type is the SDK's CallbackFailedException and that
+ // the original failure message survives.
+ var execution = await deployment.GetExecutionAsync(arn!);
+ Assert.NotNull(execution.Error);
+ Assert.Equal(typeof(CallbackFailedException).FullName, execution.Error.ErrorType);
+ Assert.Contains("rejected", execution.Error.ErrorMessage);
+
+ // History records both Started and Failed for the same callback.
+ var history = await deployment.WaitForHistoryAsync(
+ arn!,
+ h => (h.Events?.Any(e => e.EventType == EventType.CallbackStarted) ?? false)
+ && (h.Events?.Any(e => e.EventType == EventType.CallbackFailed) ?? false),
+ TimeSpan.FromSeconds(60));
+ var events = history.Events ?? new List();
+ Assert.Single(events.Where(e => e.EventType == EventType.CallbackStarted));
+ Assert.Single(events.Where(e => e.EventType == EventType.CallbackFailed));
+ }
+
+ private static async Task WaitForCallbackIdAsync(
+ DurableFunctionDeployment deployment, string arn, TimeSpan timeout)
+ {
+ var history = await deployment.WaitForHistoryAsync(
+ arn,
+ h => h.Events?.Any(e =>
+ e.CallbackStartedDetails != null
+ && !string.IsNullOrEmpty(e.CallbackStartedDetails.CallbackId)) ?? false,
+ timeout);
+ return history.Events?
+ .Where(e => e.CallbackStartedDetails != null
+ && !string.IsNullOrEmpty(e.CallbackStartedDetails.CallbackId))
+ .Select(e => e.CallbackStartedDetails.CallbackId)
+ .FirstOrDefault();
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CallbackTimeoutTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CallbackTimeoutTest.cs
new file mode 100644
index 000000000..84a504bce
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CallbackTimeoutTest.cs
@@ -0,0 +1,81 @@
+using System.Linq;
+using System.Text;
+using Amazon.Lambda.Model;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Amazon.Lambda.DurableExecution.IntegrationTests;
+
+public class CallbackTimeoutTest
+{
+ private readonly ITestOutputHelper _output;
+ public CallbackTimeoutTest(ITestOutputHelper output) => _output = output;
+
+ ///
+ /// End-to-end timeout path for CreateCallbackAsync:
+ /// the workflow waits on a callback whose
+ /// elapses before any result is delivered. The service marks the callback as
+ /// TIMED_OUT, the SDK throws , and the
+ /// workflow surfaces FAILED with that exception type recorded.
+ ///
+ [Fact]
+ public async Task CallbackTimeout_SurfacesAsCallbackTimeoutException()
+ {
+ await using var deployment = await DurableFunctionDeployment.CreateAsync(
+ DurableFunctionDeployment.FindTestFunctionDir("CallbackTimeoutFunction"),
+ "cb-timeout", _output);
+
+ var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "x"}""");
+ var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray());
+ _output.WriteLine($"Initial response: {responsePayload}");
+
+ var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60));
+ Assert.NotNull(arn);
+
+ // Capture the CallbackId before the timeout fires so we can assert it
+ // on the surfaced exception. CallbackStarted has the ID; CallbackTimedOut
+ // typically does not echo it back on the event.
+ var callbackId = await WaitForCallbackIdAsync(deployment, arn!, TimeSpan.FromSeconds(30));
+ Assert.False(string.IsNullOrEmpty(callbackId));
+ _output.WriteLine($"Service-allocated CallbackId: {callbackId}");
+
+ // The configured timeout is 10s; allow generous headroom for service
+ // latency (timer scheduling + re-invoke + Lambda cold start).
+ var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120));
+ Assert.Equal("FAILED", status, ignoreCase: true);
+
+ // The execution surfaces the SDK's CallbackTimeoutException to the user.
+ // ErrorObject.FromException records ErrorType as the FullName; verify both
+ // the type and that the recorded message mentions "timed out".
+ var execution = await deployment.GetExecutionAsync(arn!);
+ Assert.NotNull(execution.Error);
+ Assert.Equal(typeof(CallbackTimeoutException).FullName, execution.Error.ErrorType);
+ Assert.Contains("timed out", execution.Error.ErrorMessage, StringComparison.OrdinalIgnoreCase);
+
+ // History records both Started and TimedOut for the same callback.
+ var history = await deployment.WaitForHistoryAsync(
+ arn!,
+ h => (h.Events?.Any(e => e.EventType == EventType.CallbackStarted) ?? false)
+ && (h.Events?.Any(e => e.EventType == EventType.CallbackTimedOut) ?? false),
+ TimeSpan.FromSeconds(60));
+ var events = history.Events ?? new List();
+ Assert.Single(events.Where(e => e.EventType == EventType.CallbackStarted));
+ Assert.Single(events.Where(e => e.EventType == EventType.CallbackTimedOut));
+ }
+
+ private static async Task WaitForCallbackIdAsync(
+ DurableFunctionDeployment deployment, string arn, TimeSpan timeout)
+ {
+ var history = await deployment.WaitForHistoryAsync(
+ arn,
+ h => h.Events?.Any(e =>
+ e.CallbackStartedDetails != null
+ && !string.IsNullOrEmpty(e.CallbackStartedDetails.CallbackId)) ?? false,
+ timeout);
+ return history.Events?
+ .Where(e => e.CallbackStartedDetails != null
+ && !string.IsNullOrEmpty(e.CallbackStartedDetails.CallbackId))
+ .Select(e => e.CallbackStartedDetails.CallbackId)
+ .FirstOrDefault();
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CreateCallbackHappyPathTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CreateCallbackHappyPathTest.cs
new file mode 100644
index 000000000..97f098a41
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CreateCallbackHappyPathTest.cs
@@ -0,0 +1,97 @@
+using System.IO;
+using System.Linq;
+using System.Text;
+using Amazon.Lambda.Model;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Amazon.Lambda.DurableExecution.IntegrationTests;
+
+public class CreateCallbackHappyPathTest
+{
+ private readonly ITestOutputHelper _output;
+ public CreateCallbackHappyPathTest(ITestOutputHelper output) => _output = output;
+
+ ///
+ /// End-to-end happy path for CreateCallbackAsync:
+ /// the workflow suspends inside GetResultAsync; the test acts as the
+ /// external system and delivers a result via SendDurableExecutionCallbackSuccess;
+ /// the workflow resumes and returns the delivered payload.
+ ///
+ [Fact]
+ public async Task CreateCallback_DeliversResultViaSendSuccess()
+ {
+ await using var deployment = await DurableFunctionDeployment.CreateAsync(
+ DurableFunctionDeployment.FindTestFunctionDir("CreateCallbackHappyPathFunction"),
+ "cb-happy", _output);
+
+ var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "approve-123"}""");
+ var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray());
+ _output.WriteLine($"Initial response: {responsePayload}");
+
+ // The workflow suspends after CreateCallback's START checkpoint; locate the
+ // execution by name and pull the service-allocated CallbackId from history.
+ var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60));
+ Assert.NotNull(arn);
+
+ var callbackId = await WaitForCallbackIdAsync(deployment, arn!, TimeSpan.FromSeconds(60));
+ Assert.False(string.IsNullOrEmpty(callbackId), "CallbackStarted event never appeared with a CallbackId");
+ _output.WriteLine($"Service-allocated CallbackId: {callbackId}");
+
+ // Act as the external system: deliver a result. The service will re-invoke the
+ // Lambda with CALLBACK SUCCEEDED, GetResultAsync deserializes it, and the
+ // workflow returns.
+ var resultJson = """{"Status":"approved","ApprovedBy":"integ-test"}""";
+ await deployment.LambdaClient.SendDurableExecutionCallbackSuccessAsync(
+ new SendDurableExecutionCallbackSuccessRequest
+ {
+ CallbackId = callbackId!,
+ Result = new MemoryStream(Encoding.UTF8.GetBytes(resultJson))
+ });
+
+ var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120));
+ Assert.Equal("SUCCEEDED", status, ignoreCase: true);
+
+ // The execution result mirrors the payload we sent — proves GetResultAsync
+ // deserialized the wire-level callback Result and the workflow returned it.
+ var execution = await deployment.GetExecutionAsync(arn!);
+ Assert.NotNull(execution.Result);
+ Assert.Contains("approved", execution.Result);
+ Assert.Contains("integ-test", execution.Result);
+
+ // History shows the canonical callback lifecycle: Started then Succeeded.
+ var history = await deployment.WaitForHistoryAsync(
+ arn!,
+ h => (h.Events?.Any(e => e.EventType == EventType.CallbackStarted) ?? false)
+ && (h.Events?.Any(e => e.EventType == EventType.CallbackSucceeded) ?? false),
+ TimeSpan.FromSeconds(60));
+ var events = history.Events ?? new List();
+
+ Assert.Single(events.Where(e => e.EventType == EventType.CallbackStarted));
+ Assert.Single(events.Where(e => e.EventType == EventType.CallbackSucceeded));
+
+ var succeeded = events.First(e => e.CallbackSucceededDetails != null);
+ Assert.Equal("approve", succeeded.Name);
+ }
+
+ ///
+ /// Polls execution history until a CallbackStarted event surfaces a
+ /// CallbackId. The history endpoint is eventually consistent and the
+ /// callback ID isn't allocated until the service processes the START checkpoint.
+ ///
+ private static async Task WaitForCallbackIdAsync(
+ DurableFunctionDeployment deployment, string arn, TimeSpan timeout)
+ {
+ var history = await deployment.WaitForHistoryAsync(
+ arn,
+ h => h.Events?.Any(e =>
+ e.CallbackStartedDetails != null
+ && !string.IsNullOrEmpty(e.CallbackStartedDetails.CallbackId)) ?? false,
+ timeout);
+ return history.Events?
+ .Where(e => e.CallbackStartedDetails != null
+ && !string.IsNullOrEmpty(e.CallbackStartedDetails.CallbackId))
+ .Select(e => e.CallbackStartedDetails.CallbackId)
+ .FirstOrDefault();
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/CallbackFailedFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/CallbackFailedFunction.csproj
new file mode 100644
index 000000000..6f5f657e4
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/CallbackFailedFunction.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net8.0
+ Exe
+ true
+ bootstrap
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/Dockerfile
new file mode 100644
index 000000000..c1913d56a
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/Dockerfile
@@ -0,0 +1,7 @@
+FROM public.ecr.aws/lambda/provided:al2023
+
+RUN dnf install -y libicu
+
+COPY bin/publish/ ${LAMBDA_TASK_ROOT}
+
+ENTRYPOINT ["/var/task/bootstrap"]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/Function.cs
new file mode 100644
index 000000000..f74f28374
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/Function.cs
@@ -0,0 +1,35 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+
+namespace DurableExecutionTestFunction;
+
+public class Function
+{
+ public static async Task Main(string[] args)
+ {
+ var handler = new Function();
+ var serializer = new DefaultLambdaJsonSerializer();
+ using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer);
+ using var bootstrap = new LambdaBootstrap(handlerWrapper);
+ await bootstrap.RunAsync();
+ }
+
+ public Task Handler(
+ DurableExecutionInvocationInput input, ILambdaContext context)
+ => DurableFunction.WrapAsync(Workflow, input, context);
+
+ private async Task Workflow(TestEvent input, IDurableContext context)
+ {
+ // Test will deliver a failure via SendDurableExecutionCallbackFailure;
+ // GetResultAsync should raise CallbackFailedException, which the
+ // workflow does not catch — workflow surfaces FAILED.
+ var cb = await context.CreateCallbackAsync(name: "approve");
+ var result = await cb.GetResultAsync();
+ return result;
+ }
+}
+
+public class TestEvent { public string? OrderId { get; set; } }
+public class MyResult { public string? Status { get; set; } public string? ApprovedBy { get; set; } }
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/CallbackTimeoutFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/CallbackTimeoutFunction.csproj
new file mode 100644
index 000000000..6f5f657e4
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/CallbackTimeoutFunction.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net8.0
+ Exe
+ true
+ bootstrap
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/Dockerfile
new file mode 100644
index 000000000..c1913d56a
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/Dockerfile
@@ -0,0 +1,7 @@
+FROM public.ecr.aws/lambda/provided:al2023
+
+RUN dnf install -y libicu
+
+COPY bin/publish/ ${LAMBDA_TASK_ROOT}
+
+ENTRYPOINT ["/var/task/bootstrap"]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/Function.cs
new file mode 100644
index 000000000..6e43f4d6c
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/Function.cs
@@ -0,0 +1,37 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+
+namespace DurableExecutionTestFunction;
+
+public class Function
+{
+ public static async Task Main(string[] args)
+ {
+ var handler = new Function();
+ var serializer = new DefaultLambdaJsonSerializer();
+ using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer);
+ using var bootstrap = new LambdaBootstrap(handlerWrapper);
+ await bootstrap.RunAsync();
+ }
+
+ public Task Handler(
+ DurableExecutionInvocationInput input, ILambdaContext context)
+ => DurableFunction.WrapAsync(Workflow, input, context);
+
+ private async Task Workflow(TestEvent input, IDurableContext context)
+ {
+ // The test deliberately never delivers the callback. The service should
+ // fire the timeout, mark the callback TIMED_OUT, and the SDK should
+ // surface CallbackTimeoutException to the workflow.
+ var cb = await context.CreateCallbackAsync(
+ name: "approve",
+ config: new CallbackConfig { Timeout = TimeSpan.FromSeconds(10) });
+ var result = await cb.GetResultAsync();
+ return result;
+ }
+}
+
+public class TestEvent { public string? OrderId { get; set; } }
+public class MyResult { public string? Status { get; set; } public string? ApprovedBy { get; set; } }
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/CreateCallbackHappyPathFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/CreateCallbackHappyPathFunction.csproj
new file mode 100644
index 000000000..6f5f657e4
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/CreateCallbackHappyPathFunction.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net8.0
+ Exe
+ true
+ bootstrap
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/Dockerfile
new file mode 100644
index 000000000..c1913d56a
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/Dockerfile
@@ -0,0 +1,7 @@
+FROM public.ecr.aws/lambda/provided:al2023
+
+RUN dnf install -y libicu
+
+COPY bin/publish/ ${LAMBDA_TASK_ROOT}
+
+ENTRYPOINT ["/var/task/bootstrap"]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/Function.cs
new file mode 100644
index 000000000..a966a516e
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/Function.cs
@@ -0,0 +1,34 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+
+namespace DurableExecutionTestFunction;
+
+public class Function
+{
+ public static async Task Main(string[] args)
+ {
+ var handler = new Function();
+ var serializer = new DefaultLambdaJsonSerializer();
+ using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer);
+ using var bootstrap = new LambdaBootstrap(handlerWrapper);
+ await bootstrap.RunAsync();
+ }
+
+ public Task Handler(
+ DurableExecutionInvocationInput input, ILambdaContext context)
+ => DurableFunction.WrapAsync(Workflow, input, context);
+
+ private async Task Workflow(TestEvent input, IDurableContext context)
+ {
+ // External system will call SendDurableExecutionCallbackSuccess with the
+ // approval result. Until then this Lambda suspends on GetResultAsync.
+ var cb = await context.CreateCallbackAsync(name: "approve");
+ var result = await cb.GetResultAsync();
+ return result;
+ }
+}
+
+public class TestEvent { public string? OrderId { get; set; } }
+public class MyResult { public string? Status { get; set; } public string? ApprovedBy { get; set; } }
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/Dockerfile
new file mode 100644
index 000000000..c1913d56a
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/Dockerfile
@@ -0,0 +1,7 @@
+FROM public.ecr.aws/lambda/provided:al2023
+
+RUN dnf install -y libicu
+
+COPY bin/publish/ ${LAMBDA_TASK_ROOT}
+
+ENTRYPOINT ["/var/task/bootstrap"]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/Function.cs
new file mode 100644
index 000000000..7dc148564
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/Function.cs
@@ -0,0 +1,56 @@
+using System.IO;
+using System.Text;
+using Amazon.Lambda;
+using Amazon.Lambda.Core;
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.Model;
+using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+
+namespace DurableExecutionTestFunction;
+
+public class Function
+{
+ // Reuse a single Lambda client across submitter invocations.
+ private static readonly IAmazonLambda LambdaClient = new AmazonLambdaClient();
+
+ public static async Task Main(string[] args)
+ {
+ var handler = new Function();
+ var serializer = new DefaultLambdaJsonSerializer();
+ using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer);
+ using var bootstrap = new LambdaBootstrap(handlerWrapper);
+ await bootstrap.RunAsync();
+ }
+
+ public Task Handler(
+ DurableExecutionInvocationInput input, ILambdaContext context)
+ => DurableFunction.WrapAsync(Workflow, input, context);
+
+ private async Task Workflow(TestEvent input, IDurableContext context)
+ {
+ // The submitter is called once with a freshly-allocated callback ID.
+ // For a deterministic happy-path integration test we self-resolve the
+ // callback by calling SendDurableExecutionCallbackSuccess directly —
+ // modelling a "fire-and-resolve" external system. After the submitter
+ // returns the SDK suspends; the service re-invokes once it has processed
+ // the resolution, and WaitForCallbackAsync returns the deserialized result.
+ var result = await context.WaitForCallbackAsync(
+ submitter: async (callbackId, cbCtx) =>
+ {
+ var resultJson = $$"""{"Status":"approved","ApprovedBy":"{{input.OrderId}}"}""";
+ await LambdaClient.SendDurableExecutionCallbackSuccessAsync(
+ new SendDurableExecutionCallbackSuccessRequest
+ {
+ CallbackId = callbackId,
+ Result = new MemoryStream(Encoding.UTF8.GetBytes(resultJson))
+ });
+ },
+ name: "approve");
+
+ return result;
+ }
+}
+
+public class TestEvent { public string? OrderId { get; set; } }
+public class MyResult { public string? Status { get; set; } public string? ApprovedBy { get; set; } }
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/WaitForCallbackHappyPathFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/WaitForCallbackHappyPathFunction.csproj
new file mode 100644
index 000000000..6f5f657e4
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/WaitForCallbackHappyPathFunction.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net8.0
+ Exe
+ true
+ bootstrap
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/Dockerfile
new file mode 100644
index 000000000..c1913d56a
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/Dockerfile
@@ -0,0 +1,7 @@
+FROM public.ecr.aws/lambda/provided:al2023
+
+RUN dnf install -y libicu
+
+COPY bin/publish/ ${LAMBDA_TASK_ROOT}
+
+ENTRYPOINT ["/var/task/bootstrap"]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/Function.cs
new file mode 100644
index 000000000..a940d9dd8
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/Function.cs
@@ -0,0 +1,43 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+
+namespace DurableExecutionTestFunction;
+
+public class Function
+{
+ public static async Task Main(string[] args)
+ {
+ var handler = new Function();
+ var serializer = new DefaultLambdaJsonSerializer();
+ using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer);
+ using var bootstrap = new LambdaBootstrap(handlerWrapper);
+ await bootstrap.RunAsync();
+ }
+
+ public Task Handler(
+ DurableExecutionInvocationInput input, ILambdaContext context)
+ => DurableFunction.WrapAsync(Workflow, input, context);
+
+ private async Task Workflow(TestEvent input, IDurableContext context)
+ {
+ // The submitter throws on every attempt. With RetryStrategy.None the
+ // SDK should fail terminally on the first attempt and surface the
+ // failure as CallbackSubmitterException. The workflow does not catch
+ // it, so the durable execution surfaces FAILED with that exception.
+ var result = await context.WaitForCallbackAsync(
+ submitter: async (callbackId, cbCtx) =>
+ {
+ await Task.CompletedTask;
+ throw new InvalidOperationException("submitter intentional failure");
+ },
+ name: "approve",
+ config: new WaitForCallbackConfig { RetryStrategy = RetryStrategy.None });
+
+ return result;
+ }
+}
+
+public class TestEvent { public string? OrderId { get; set; } }
+public class MyResult { public string? Status { get; set; } public string? ApprovedBy { get; set; } }
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/WaitForCallbackSubmitterFailsFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/WaitForCallbackSubmitterFailsFunction.csproj
new file mode 100644
index 000000000..6f5f657e4
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/WaitForCallbackSubmitterFailsFunction.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net8.0
+ Exe
+ true
+ bootstrap
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForCallbackHappyPathTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForCallbackHappyPathTest.cs
new file mode 100644
index 000000000..ef883c043
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForCallbackHappyPathTest.cs
@@ -0,0 +1,69 @@
+using System.Linq;
+using System.Text;
+using Amazon.Lambda.Model;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Amazon.Lambda.DurableExecution.IntegrationTests;
+
+public class WaitForCallbackHappyPathTest
+{
+ private readonly ITestOutputHelper _output;
+ public WaitForCallbackHappyPathTest(ITestOutputHelper output) => _output = output;
+
+ ///
+ /// End-to-end happy path for WaitForCallbackAsync:
+ /// the submitter step inside the workflow self-resolves the callback by
+ /// calling SendDurableExecutionCallbackSuccess. The composite
+ /// operation suspends after the submitter step completes, the service
+ /// re-invokes the Lambda once the callback is resolved, and
+ /// WaitForCallbackAsync returns the deserialized result.
+ ///
+ [Fact]
+ public async Task WaitForCallback_SubmitterDeliversResult_WorkflowCompletes()
+ {
+ await using var deployment = await DurableFunctionDeployment.CreateAsync(
+ DurableFunctionDeployment.FindTestFunctionDir("WaitForCallbackHappyPathFunction"),
+ "wfcb-happy", _output);
+
+ var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "approver-1"}""");
+ var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray());
+ _output.WriteLine($"Initial response: {responsePayload}");
+
+ var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60));
+ Assert.NotNull(arn);
+
+ var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120));
+ Assert.Equal("SUCCEEDED", status, ignoreCase: true);
+
+ // The execution returns the payload the submitter delivered.
+ var execution = await deployment.GetExecutionAsync(arn!);
+ Assert.NotNull(execution.Result);
+ Assert.Contains("approved", execution.Result);
+ Assert.Contains("approver-1", execution.Result);
+
+ // History records the canonical WaitForCallback lifecycle:
+ // submitter step Started + Succeeded, callback Started + Succeeded,
+ // and a containing context (CONTEXT operation) wrapping the pair.
+ var history = await deployment.WaitForHistoryAsync(
+ arn!,
+ h => (h.Events?.Any(e => e.EventType == EventType.CallbackStarted) ?? false)
+ && (h.Events?.Any(e => e.EventType == EventType.CallbackSucceeded) ?? false)
+ && (h.Events?.Any(e => e.EventType == EventType.StepSucceeded) ?? false),
+ TimeSpan.FromSeconds(60));
+ var events = history.Events ?? new List();
+
+ Assert.Single(events.Where(e => e.EventType == EventType.CallbackStarted));
+ Assert.Single(events.Where(e => e.EventType == EventType.CallbackSucceeded));
+
+ // The submitter ran exactly once and succeeded — the SDK's "callback
+ // already resolved" branch must NOT have re-run it on replay. Filter
+ // on a name that the SDK uses for the submitter step (typically
+ // matches the WaitForCallback name).
+ var submitterSteps = events
+ .Where(e => e.EventType == EventType.StepSucceeded
+ || e.EventType == EventType.StepStarted)
+ .ToList();
+ Assert.NotEmpty(submitterSteps);
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForCallbackSubmitterFailsTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForCallbackSubmitterFailsTest.cs
new file mode 100644
index 000000000..4c74fc8fe
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForCallbackSubmitterFailsTest.cs
@@ -0,0 +1,66 @@
+using System.Linq;
+using System.Text;
+using Amazon.Lambda.Model;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Amazon.Lambda.DurableExecution.IntegrationTests;
+
+public class WaitForCallbackSubmitterFailsTest
+{
+ private readonly ITestOutputHelper _output;
+ public WaitForCallbackSubmitterFailsTest(ITestOutputHelper output) => _output = output;
+
+ ///
+ /// End-to-end submitter-failure path for WaitForCallbackAsync:
+ /// the submitter throws on attempt 1 with ;
+ /// the SDK fails the composite operation terminally and surfaces
+ /// . The workflow surfaces FAILED.
+ ///
+ [Fact]
+ public async Task WaitForCallback_SubmitterThrows_SurfacesAsCallbackSubmitterException()
+ {
+ await using var deployment = await DurableFunctionDeployment.CreateAsync(
+ DurableFunctionDeployment.FindTestFunctionDir("WaitForCallbackSubmitterFailsFunction"),
+ "wfcb-fail", _output);
+
+ var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "x"}""");
+ var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray());
+ _output.WriteLine($"Initial response: {responsePayload}");
+
+ var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60));
+ Assert.NotNull(arn);
+
+ var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120));
+ Assert.Equal("FAILED", status, ignoreCase: true);
+
+ // The workflow surfaces CallbackSubmitterException — the SDK's wrapper
+ // type around the failed submitter step. Verify both the recorded
+ // ErrorType and that the original "submitter intentional failure"
+ // message survives in the error chain.
+ var execution = await deployment.GetExecutionAsync(arn!);
+ Assert.NotNull(execution.Error);
+ Assert.Equal(typeof(CallbackSubmitterException).FullName, execution.Error.ErrorType);
+ // ErrorObject.FromException records the outer exception's Message; that
+ // message should reference the submitter failure context. Be lenient
+ // about exact wording since the SDK may prepend / wrap the inner.
+ Assert.False(string.IsNullOrEmpty(execution.Error.ErrorMessage));
+
+ // History records the submitter step failed exactly once — RetryStrategy.None
+ // means no retries — and no callback was ever started since the submitter
+ // never delivered the ID.
+ var history = await deployment.WaitForHistoryAsync(
+ arn!,
+ h => h.Events?.Any(e => e.StepFailedDetails != null) ?? false,
+ TimeSpan.FromSeconds(60));
+ var events = history.Events ?? new List();
+
+ var stepFailures = events.Where(e => e.StepFailedDetails != null).ToList();
+ Assert.Single(stepFailures);
+ var failureMessage = stepFailures[0].StepFailedDetails.Error?.Payload?.ErrorMessage ?? string.Empty;
+ Assert.Contains("submitter intentional failure", failureMessage);
+
+ // No SUCCEEDED step events — the submitter never succeeded.
+ Assert.Empty(events.Where(e => e.StepSucceededDetails != null));
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/CallbackOperationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/CallbackOperationTests.cs
new file mode 100644
index 000000000..b89b17fee
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/CallbackOperationTests.cs
@@ -0,0 +1,497 @@
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.DurableExecution.Internal;
+using Amazon.Lambda.TestUtilities;
+using Xunit;
+
+namespace Amazon.Lambda.DurableExecution.Tests;
+
+public class CallbackOperationTests
+{
+ /// Reproduces the Id that emits for the n-th root-level operation.
+ private static string IdAt(int position) => OperationIdGenerator.HashOperationId(position.ToString());
+
+ private static (DurableContext context, RecordingBatcher recorder, TerminationManager tm, ExecutionState state)
+ CreateContext(InitialExecutionState? initialState = null)
+ {
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(initialState);
+ var tm = new TerminationManager();
+ var idGen = new OperationIdGenerator();
+ var lambdaContext = new TestLambdaContext();
+ var recorder = new RecordingBatcher();
+ var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher);
+ return (context, recorder, tm, state);
+ }
+
+ ///
+ /// Wires a recorder so that the next CALLBACK START flush stamps the given
+ /// callback ID into — modeling the durable-execution
+ /// service's NewExecutionState response that allocates the ID.
+ ///
+ private static void WireServiceCallbackIdAllocation(
+ RecordingBatcher recorder, ExecutionState state, string callbackId)
+ {
+ recorder.OnFlush = ops =>
+ {
+ foreach (var op in ops)
+ {
+ if (op.Type == OperationTypes.Callback && op.Action == "START")
+ {
+ state.AddOperations(new[]
+ {
+ new Operation
+ {
+ Id = op.Id,
+ Type = OperationTypes.Callback,
+ Status = OperationStatuses.Started,
+ Name = op.Name,
+ CallbackDetails = new CallbackDetails { CallbackId = callbackId }
+ }
+ });
+ }
+ }
+ };
+ }
+
+ [Fact]
+ public async Task CreateCallbackAsync_FreshExecution_FlushesStartAndReturnsCallbackId()
+ {
+ var (context, recorder, tm, state) = CreateContext();
+ WireServiceCallbackIdAllocation(recorder, state, "cb-abc-123");
+
+ var callback = await context.CreateCallbackAsync(name: "approval");
+
+ Assert.Equal("cb-abc-123", callback.CallbackId);
+ Assert.False(tm.IsTerminated);
+
+ await recorder.Batcher.DrainAsync();
+
+ // CreateCallbackAsync sync-flushes a single START checkpoint.
+ var single = Assert.Single(recorder.Flushed);
+ Assert.Equal(OperationTypes.Callback, single.Type);
+ Assert.Equal("START", single.Action);
+ Assert.Equal(OperationSubTypes.Callback, single.SubType);
+ Assert.Equal("approval", single.Name);
+ Assert.Equal(IdAt(1), single.Id);
+ }
+
+ [Fact]
+ public async Task CreateCallbackAsync_FreshExecution_NoConfig_DoesNotEmitCallbackOptions()
+ {
+ var (context, recorder, _, state) = CreateContext();
+ WireServiceCallbackIdAllocation(recorder, state, "cb-1");
+
+ await context.CreateCallbackAsync(name: "no_options");
+
+ await recorder.Batcher.DrainAsync();
+
+ var single = Assert.Single(recorder.Flushed);
+ Assert.Null(single.CallbackOptions);
+ }
+
+ [Fact]
+ public async Task CreateCallbackAsync_FreshExecution_WithConfig_EmitsCallbackOptions()
+ {
+ var (context, recorder, _, state) = CreateContext();
+ WireServiceCallbackIdAllocation(recorder, state, "cb-1");
+
+ await context.CreateCallbackAsync(
+ name: "with_options",
+ config: new CallbackConfig
+ {
+ Timeout = TimeSpan.FromHours(1),
+ HeartbeatTimeout = TimeSpan.FromMinutes(5)
+ });
+
+ await recorder.Batcher.DrainAsync();
+
+ var single = Assert.Single(recorder.Flushed);
+ Assert.NotNull(single.CallbackOptions);
+ Assert.Equal(3600, single.CallbackOptions.TimeoutSeconds);
+ Assert.Equal(300, single.CallbackOptions.HeartbeatTimeoutSeconds);
+ }
+
+ [Fact]
+ public async Task CreateCallbackAsync_FreshExecution_OnlyTimeout_EmitsOnlyTimeout()
+ {
+ var (context, recorder, _, state) = CreateContext();
+ WireServiceCallbackIdAllocation(recorder, state, "cb-1");
+
+ await context.CreateCallbackAsync(
+ config: new CallbackConfig { Timeout = TimeSpan.FromSeconds(45) });
+
+ await recorder.Batcher.DrainAsync();
+
+ var single = Assert.Single(recorder.Flushed);
+ Assert.NotNull(single.CallbackOptions);
+ Assert.Equal(45, single.CallbackOptions.TimeoutSeconds);
+ // HeartbeatTimeout was not set → property remains at its default
+ // (the AWS SDK Marshaller will not serialize the field).
+ Assert.True(
+ single.CallbackOptions.HeartbeatTimeoutSeconds == null
+ || single.CallbackOptions.HeartbeatTimeoutSeconds == 0);
+ }
+
+ [Fact]
+ public async Task CreateCallbackAsync_ServiceMissingCallbackId_ThrowsNonDeterministic()
+ {
+ // Service doesn't stamp a CallbackId — RecordingBatcher's OnFlush left unset.
+ var (context, _, _, _) = CreateContext();
+
+ var ex = await Assert.ThrowsAsync(() =>
+ context.CreateCallbackAsync(name: "broken"));
+ Assert.Contains("CallbackId", ex.Message);
+ }
+
+ [Fact]
+ public async Task GetResultAsync_FreshExecution_SuspendsExecution()
+ {
+ var (context, recorder, tm, state) = CreateContext();
+ WireServiceCallbackIdAllocation(recorder, state, "cb-1");
+
+ var callback = await context.CreateCallbackAsync(name: "approval");
+
+ // GetResultAsync should signal termination and return a never-completing task.
+ var resultTask = callback.GetResultAsync();
+ await Task.Delay(10);
+
+ Assert.True(tm.IsTerminated);
+ Assert.False(resultTask.IsCompleted);
+ }
+
+ [Fact]
+ public async Task ReplayStarted_DoesNotReFlushStart_AndSuspendsOnGetResult()
+ {
+ // STARTED on replay = service has stamped CallbackId but no terminal yet.
+ var (context, recorder, tm, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Callback,
+ Status = OperationStatuses.Started,
+ Name = "approval",
+ CallbackDetails = new CallbackDetails { CallbackId = "cb-replay-1" }
+ }
+ }
+ });
+
+ var callback = await context.CreateCallbackAsync(name: "approval");
+ Assert.Equal("cb-replay-1", callback.CallbackId);
+ Assert.False(tm.IsTerminated);
+
+ var resultTask = callback.GetResultAsync();
+ await Task.Delay(10);
+
+ Assert.True(tm.IsTerminated);
+ Assert.False(resultTask.IsCompleted);
+
+ // No new checkpoints — replay path doesn't re-flush START.
+ await recorder.Batcher.DrainAsync();
+ Assert.Empty(recorder.Flushed);
+ }
+
+ [Fact]
+ public async Task ReplaySucceeded_GetResultDeserializes_NoSuspension()
+ {
+ var (context, recorder, tm, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Callback,
+ Status = OperationStatuses.Succeeded,
+ Name = "approval",
+ CallbackDetails = new CallbackDetails
+ {
+ CallbackId = "cb-done-1",
+ Result = "\"approved\""
+ }
+ }
+ }
+ });
+
+ var callback = await context.CreateCallbackAsync(name: "approval");
+ var result = await callback.GetResultAsync();
+
+ Assert.Equal("cb-done-1", callback.CallbackId);
+ Assert.Equal("approved", result);
+ Assert.False(tm.IsTerminated);
+
+ await recorder.Batcher.DrainAsync();
+ Assert.Empty(recorder.Flushed);
+ }
+
+ [Fact]
+ public async Task ReplaySucceeded_NullResultReturnsDefault()
+ {
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Callback,
+ Status = OperationStatuses.Succeeded,
+ Name = "no_payload",
+ CallbackDetails = new CallbackDetails { CallbackId = "cb-1" }
+ }
+ }
+ });
+
+ var callback = await context.CreateCallbackAsync(name: "no_payload");
+ var result = await callback.GetResultAsync();
+ Assert.Null(result);
+ }
+
+ [Fact]
+ public async Task ReplayFailed_GetResultThrowsCallbackFailedException()
+ {
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Callback,
+ Status = OperationStatuses.Failed,
+ Name = "approval",
+ CallbackDetails = new CallbackDetails
+ {
+ CallbackId = "cb-fail-1",
+ Error = new ErrorObject
+ {
+ ErrorType = "ExternalSystemError",
+ ErrorMessage = "rejected by reviewer",
+ ErrorData = "{\"reviewer\":\"jane\"}"
+ }
+ }
+ }
+ }
+ });
+
+ var callback = await context.CreateCallbackAsync(name: "approval");
+
+ var ex = await Assert.ThrowsAsync(() => callback.GetResultAsync());
+ Assert.IsAssignableFrom(ex);
+ Assert.Equal("rejected by reviewer", ex.Message);
+ Assert.Equal("cb-fail-1", ex.CallbackId);
+ Assert.Equal("ExternalSystemError", ex.ErrorType);
+ Assert.Equal("{\"reviewer\":\"jane\"}", ex.ErrorData);
+ }
+
+ [Fact]
+ public async Task ReplayTimedOut_GetResultThrowsCallbackTimeoutException()
+ {
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Callback,
+ Status = OperationStatuses.TimedOut,
+ Name = "approval",
+ CallbackDetails = new CallbackDetails
+ {
+ CallbackId = "cb-to-1",
+ Error = new ErrorObject
+ {
+ ErrorMessage = "callback timed out after 24h"
+ }
+ }
+ }
+ }
+ });
+
+ var callback = await context.CreateCallbackAsync(name: "approval");
+
+ var ex = await Assert.ThrowsAsync(() => callback.GetResultAsync());
+ Assert.IsAssignableFrom(ex);
+ Assert.Equal("callback timed out after 24h", ex.Message);
+ Assert.Equal("cb-to-1", ex.CallbackId);
+ }
+
+ [Fact]
+ public async Task ReplayTimedOut_NoErrorDetails_DefaultMessage()
+ {
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Callback,
+ Status = OperationStatuses.TimedOut,
+ Name = "approval",
+ CallbackDetails = new CallbackDetails { CallbackId = "cb-1" }
+ }
+ }
+ });
+
+ var callback = await context.CreateCallbackAsync(name: "approval");
+ var ex = await Assert.ThrowsAsync(() => callback.GetResultAsync());
+ Assert.Equal("Callback timed out", ex.Message);
+ }
+
+ [Fact]
+ public async Task ReplayUnknownStatus_ThrowsNonDeterministic()
+ {
+ // Replay must throw on unexpected statuses (CANCELLED, garbage, etc.)
+ // rather than silently degrading to a suspend. Mirrors WaitOperation
+ // and ChildContextOperation's `default:` arms.
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Callback,
+ Status = "CANCELLED",
+ Name = "approval",
+ CallbackDetails = new CallbackDetails { CallbackId = "cb-1" }
+ }
+ }
+ });
+
+ var ex = await Assert.ThrowsAsync(() =>
+ context.CreateCallbackAsync(name: "approval"));
+ Assert.Contains("unexpected status", ex.Message);
+ Assert.Contains("CANCELLED", ex.Message);
+ }
+
+ [Fact]
+ public async Task ReplayMissingCallbackId_ThrowsNonDeterministic()
+ {
+ // Replay path expects the CallbackId to be present. If it's absent, surface
+ // a clear non-deterministic error rather than letting users see a NRE later.
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Callback,
+ Status = OperationStatuses.Started,
+ Name = "broken",
+ CallbackDetails = new CallbackDetails { CallbackId = null }
+ }
+ }
+ });
+
+ await Assert.ThrowsAsync(() =>
+ context.CreateCallbackAsync(name: "broken"));
+ }
+
+ [Fact]
+ public async Task ReplayDeterministic_CallbackIdStableAcrossReplays()
+ {
+ // Round-trip: STARTED checkpoint with CallbackId X must yield the same X
+ // on replay so external systems' references remain valid.
+ const string id = "stable-cb-id-12345";
+
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Callback,
+ Status = OperationStatuses.Started,
+ Name = "approval",
+ CallbackDetails = new CallbackDetails { CallbackId = id }
+ }
+ }
+ });
+
+ var callback = await context.CreateCallbackAsync(name: "approval");
+ Assert.Equal(id, callback.CallbackId);
+ }
+
+ [Fact]
+ public async Task ReplayTypeMismatch_ThrowsNonDeterministic()
+ {
+ // What was a CALLBACK on a previous invocation is now arriving as something
+ // else — code drift detection. ExecutionState.ValidateReplayConsistency
+ // is the gate.
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Step,
+ Status = OperationStatuses.Succeeded,
+ Name = "approval",
+ StepDetails = new StepDetails { Result = "\"ok\"" }
+ }
+ }
+ });
+
+ await Assert.ThrowsAsync(() =>
+ context.CreateCallbackAsync(name: "approval"));
+ }
+
+ [Fact]
+ public async Task CreateCallbackAsync_AOTOverload_UsesSuppliedSerializer()
+ {
+ // Custom serializer that prefixes deserialized strings — proves the AOT-safe
+ // overload is wired through and not silently using reflection JSON.
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Callback,
+ Status = OperationStatuses.Succeeded,
+ Name = "marker",
+ CallbackDetails = new CallbackDetails
+ {
+ CallbackId = "cb-1",
+ Result = "raw-payload"
+ }
+ }
+ }
+ });
+
+ var callback = await context.CreateCallbackAsync(
+ new MarkerSerializer(),
+ name: "marker");
+ var result = await callback.GetResultAsync();
+
+ Assert.Equal("custom:raw-payload", result);
+ }
+
+ [Fact]
+ public async Task CreateCallbackAsync_CallbackIdAccessBeforeStart_Throws()
+ {
+ // Direct construction of the CallbackOperation without going through
+ // ExecuteAsync — guard against bugs that try to read CallbackId early.
+ var op = new CallbackOperation(
+ "op-id", "name", null, new MarkerSerializer(),
+ new ExecutionState(), new TerminationManager(), "arn", batcher: null);
+
+ Assert.Throws(() => _ = ((ICallback)op).CallbackId);
+ await Task.CompletedTask;
+ }
+
+ private sealed class MarkerSerializer : ICheckpointSerializer
+ {
+ public string Serialize(string value, SerializationContext context) => $"custom:{value}";
+ public string Deserialize(string data, SerializationContext context) => $"custom:{data}";
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs
index b624766eb..81a2275c8 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs
@@ -10,6 +10,7 @@
using StepDetails = Amazon.Lambda.DurableExecution.Internal.StepDetails;
using WaitDetails = Amazon.Lambda.DurableExecution.Internal.WaitDetails;
using ExecutionDetails = Amazon.Lambda.DurableExecution.Internal.ExecutionDetails;
+using CallbackDetails = Amazon.Lambda.DurableExecution.Internal.CallbackDetails;
namespace Amazon.Lambda.DurableExecution.Tests;
@@ -561,6 +562,183 @@ private static async Task SingleStepWorkflow(OrderEvent input, IDur
return new OrderResult { Status = "done" };
}
+ [Fact]
+ public async Task WrapAsync_CreateCallbackThenWait_AllocatesCallbackIdAndSuspends()
+ {
+ // End-to-end through the real LambdaDurableServiceClient: the mock
+ // client returns NewExecutionState carrying a CallbackId on the
+ // CALLBACK START checkpoint response, and the SDK plumbs it through.
+ var input = new DurableExecutionInvocationInput
+ {
+ DurableExecutionArn = "arn:aws:lambda:us-east-1:123:durable-execution:cb-test",
+ InitialExecutionState = new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = "exec-0",
+ Type = OperationTypes.Execution,
+ Status = OperationStatuses.Started,
+ ExecutionDetails = new ExecutionDetails { InputPayload = "{\"OrderId\":\"o-1\"}" }
+ }
+ }
+ }
+ };
+
+ var capturedCallbackId = (string?)null;
+ var mockClient = new MockLambdaClient
+ {
+ CheckpointHandler = req =>
+ {
+ // Echo back any CALLBACK START as a STARTED op with a service-allocated id.
+ var newOps = new List();
+ foreach (var u in req.Updates)
+ {
+ if (u.Type == OperationTypes.Callback && u.Action == "START")
+ {
+ newOps.Add(new Amazon.Lambda.Model.Operation
+ {
+ Id = u.Id,
+ Type = OperationTypes.Callback,
+ Status = OperationStatuses.Started,
+ Name = u.Name,
+ CallbackDetails = new Amazon.Lambda.Model.CallbackDetails
+ {
+ CallbackId = "servicealloccbid"
+ }
+ });
+ }
+ }
+ return new Amazon.Lambda.Model.CheckpointDurableExecutionResponse
+ {
+ NewExecutionState = newOps.Count == 0
+ ? null
+ : new Amazon.Lambda.Model.CheckpointUpdatedExecutionState { Operations = newOps }
+ };
+ }
+ };
+
+ var output = await DurableFunction.WrapAsync(
+ async (e, ctx) =>
+ {
+ var cb = await ctx.CreateCallbackAsync(name: "approval");
+ capturedCallbackId = cb.CallbackId;
+ var status = await cb.GetResultAsync();
+ return new OrderResult { Status = status, OrderId = e.OrderId };
+ },
+ input,
+ new TestLambdaContext(),
+ mockClient);
+
+ Assert.Equal(InvocationStatus.Pending, output.Status);
+ Assert.Equal("servicealloccbid", capturedCallbackId);
+ }
+
+ [Fact]
+ public async Task WrapAsync_ReplayCallbackSucceeded_ReturnsResultAfterSuspend()
+ {
+ // Second invocation: the callback's checkpoint is now SUCCEEDED;
+ // the workflow returns the deserialized result.
+ var input = new DurableExecutionInvocationInput
+ {
+ DurableExecutionArn = "arn:aws:lambda:us-east-1:123:durable-execution:cb-test",
+ InitialExecutionState = new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = "exec-0",
+ Type = OperationTypes.Execution,
+ Status = OperationStatuses.Started,
+ ExecutionDetails = new ExecutionDetails { InputPayload = "{\"OrderId\":\"o-1\"}" }
+ },
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Callback,
+ Status = OperationStatuses.Succeeded,
+ Name = "approval",
+ CallbackDetails = new CallbackDetails
+ {
+ CallbackId = "servicealloccbid",
+ Result = "\"approved\""
+ }
+ }
+ }
+ }
+ };
+
+ var output = await DurableFunction.WrapAsync(
+ async (e, ctx) =>
+ {
+ var cb = await ctx.CreateCallbackAsync(name: "approval");
+ var status = await cb.GetResultAsync();
+ return new OrderResult { Status = status, OrderId = e.OrderId };
+ },
+ input,
+ new TestLambdaContext(),
+ new MockLambdaClient());
+
+ Assert.Equal(InvocationStatus.Succeeded, output.Status);
+ Assert.NotNull(output.Result);
+ var result = JsonSerializer.Deserialize(output.Result!);
+ Assert.Equal("approved", result!.Status);
+ }
+
+ [Fact]
+ public async Task WrapAsync_ReplayDeterminism_CallbackIdStableAcrossInvocations()
+ {
+ // First invocation allocates a callback ID via the mock; in a real run
+ // that ID would be persisted in the service's checkpoint state and
+ // returned to the second invocation via InitialExecutionState. Verify
+ // the same ID survives that round-trip (we model "round-trip" by
+ // replaying with a STARTED checkpoint that carries the same ID).
+ const string id = "stablecbidreplay";
+ var input = new DurableExecutionInvocationInput
+ {
+ DurableExecutionArn = "arn:test",
+ InitialExecutionState = new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = "exec-0",
+ Type = OperationTypes.Execution,
+ Status = OperationStatuses.Started,
+ ExecutionDetails = new ExecutionDetails { InputPayload = "{\"OrderId\":\"o-1\"}" }
+ },
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Callback,
+ Status = OperationStatuses.Started,
+ Name = "approval",
+ CallbackDetails = new CallbackDetails { CallbackId = id }
+ }
+ }
+ }
+ };
+
+ string? observed = null;
+ var output = await DurableFunction.WrapAsync(
+ async (e, ctx) =>
+ {
+ var cb = await ctx.CreateCallbackAsync(name: "approval");
+ observed = cb.CallbackId;
+ var status = await cb.GetResultAsync();
+ return new OrderResult { Status = status, OrderId = e.OrderId };
+ },
+ input,
+ new TestLambdaContext(),
+ new MockLambdaClient());
+
+ Assert.Equal(InvocationStatus.Pending, output.Status);
+ Assert.Equal(id, observed);
+ }
+
private static async Task MyWorkflow(OrderEvent input, IDurableContext context)
{
var validation = await context.StepAsync(
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExceptionsTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExceptionsTests.cs
index 7105849bb..4bca933be 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExceptionsTests.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExceptionsTests.cs
@@ -65,4 +65,87 @@ public void StepException_HasErrorProperties()
Assert.Equal("operation timed out", ex.ErrorData);
Assert.Equal(2, ex.OriginalStackTrace!.Count);
}
+
+ [Fact]
+ public void CallbackException_BaseClassCtors()
+ {
+ var empty = new CallbackException();
+ Assert.IsAssignableFrom(empty);
+
+ var withMsg = new CallbackException("cb error");
+ Assert.Equal("cb error", withMsg.Message);
+
+ var inner = new InvalidOperationException("inner");
+ var wrapping = new CallbackException("outer", inner);
+ Assert.Same(inner, wrapping.InnerException);
+ }
+
+ [Fact]
+ public void CallbackException_InitProperties()
+ {
+ var ex = new CallbackException("rejected")
+ {
+ CallbackId = "cb-1",
+ ErrorType = "ExternalSystemError",
+ ErrorData = "{\"reviewer\":\"jane\"}",
+ OriginalStackTrace = new[] { "at A.B()" }
+ };
+
+ Assert.Equal("cb-1", ex.CallbackId);
+ Assert.Equal("ExternalSystemError", ex.ErrorType);
+ Assert.Equal("{\"reviewer\":\"jane\"}", ex.ErrorData);
+ Assert.Single(ex.OriginalStackTrace!);
+ }
+
+ [Fact]
+ public void CallbackFailedException_IsCallbackException()
+ {
+ var ex = new CallbackFailedException("rejected") { CallbackId = "cb-1" };
+ Assert.IsAssignableFrom(ex);
+ Assert.IsAssignableFrom(ex);
+ Assert.Equal("rejected", ex.Message);
+ Assert.Equal("cb-1", ex.CallbackId);
+ }
+
+ [Fact]
+ public void CallbackFailedException_AllCtors()
+ {
+ Assert.NotNull(new CallbackFailedException());
+ Assert.Equal("m", new CallbackFailedException("m").Message);
+ var inner = new Exception("inner");
+ Assert.Same(inner, new CallbackFailedException("m", inner).InnerException);
+ }
+
+ [Fact]
+ public void CallbackTimeoutException_IsCallbackException()
+ {
+ var ex = new CallbackTimeoutException("timed out") { CallbackId = "cb-1" };
+ Assert.IsAssignableFrom(ex);
+ Assert.Equal("timed out", ex.Message);
+ }
+
+ [Fact]
+ public void CallbackTimeoutException_AllCtors()
+ {
+ Assert.NotNull(new CallbackTimeoutException());
+ Assert.Equal("m", new CallbackTimeoutException("m").Message);
+ var inner = new Exception("inner");
+ Assert.Same(inner, new CallbackTimeoutException("m", inner).InnerException);
+ }
+
+ [Fact]
+ public void CallbackSubmitterException_IsCallbackException()
+ {
+ var inner = new StepException("submitter failed");
+ var ex = new CallbackSubmitterException("submitter failed", inner);
+ Assert.IsAssignableFrom(ex);
+ Assert.Same(inner, ex.InnerException);
+ }
+
+ [Fact]
+ public void CallbackSubmitterException_AllCtors()
+ {
+ Assert.NotNull(new CallbackSubmitterException());
+ Assert.Equal("m", new CallbackSubmitterException("m").Message);
+ }
}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/MockLambdaClient.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/MockLambdaClient.cs
index 8df98a67d..98ef36cb3 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/MockLambdaClient.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/MockLambdaClient.cs
@@ -38,12 +38,29 @@ public MockLambdaClient() : base("fake-access-key", "fake-secret-key", Amazon.Re
///
public Exception? GetExecutionStateThrows { get; set; }
+ ///
+ /// Optional handler that produces a
+ /// per request. Tests modeling the durable-execution service's
+ /// NewExecutionState response (e.g. stamping a CallbackId onto a
+ /// freshly-started CALLBACK op) wire this up. When null, a default
+ /// response is produced with only the auto-incremented checkpoint token.
+ ///
+ public Func? CheckpointHandler { get; set; }
+
public override Task CheckpointDurableExecutionAsync(
CheckpointDurableExecutionRequest request,
CancellationToken cancellationToken = default)
{
CheckpointCalls.Add(request);
if (CheckpointThrows != null) throw CheckpointThrows;
+ if (CheckpointHandler != null)
+ {
+ var resp = CheckpointHandler(request);
+ // Auto-fill token if the test left it blank.
+ if (string.IsNullOrEmpty(resp.CheckpointToken))
+ resp.CheckpointToken = $"token-{++_tokenCounter}";
+ return Task.FromResult(resp);
+ }
return Task.FromResult(new CheckpointDurableExecutionResponse
{
CheckpointToken = $"token-{++_tokenCounter}"
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RecordingBatcher.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RecordingBatcher.cs
index 8fe7b6d6d..11a34ffc6 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RecordingBatcher.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RecordingBatcher.cs
@@ -16,6 +16,15 @@ internal sealed class RecordingBatcher
public CheckpointBatcher Batcher { get; }
+ ///
+ /// Optional hook invoked synchronously after each batch flush, with that
+ /// batch's updates. Tests modeling the durable-execution service's
+ /// NewExecutionState response (e.g. stamping a CallbackId onto a
+ /// freshly-started CALLBACK op) wire this up to mutate the test's
+ /// .
+ ///
+ public Action>? OnFlush { get; set; }
+
public RecordingBatcher(CheckpointBatcherConfig? config = null)
{
Batcher = new CheckpointBatcher("test-token", Flush, config);
@@ -46,6 +55,7 @@ public IReadOnlyList FlushBatchSizes
_flushed.AddRange(ops);
_flushBatchSizes.Add(ops.Count);
}
+ OnFlush?.Invoke(ops);
return Task.FromResult(token);
}
}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForCallbackTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForCallbackTests.cs
new file mode 100644
index 000000000..d055475f4
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForCallbackTests.cs
@@ -0,0 +1,567 @@
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.DurableExecution.Internal;
+using Amazon.Lambda.TestUtilities;
+using Xunit;
+
+namespace Amazon.Lambda.DurableExecution.Tests;
+
+public class WaitForCallbackTests
+{
+ /// Reproduces the Id that emits for the n-th root-level operation.
+ private static string IdAt(int position) => OperationIdGenerator.HashOperationId(position.ToString());
+
+ /// The hashed ID of the n-th child operation under .
+ private static string ChildIdAt(string parentOpId, int position) =>
+ OperationIdGenerator.HashOperationId($"{parentOpId}-{position}");
+
+ private static (DurableContext context, RecordingBatcher recorder, TerminationManager tm, ExecutionState state)
+ CreateContext(InitialExecutionState? initialState = null)
+ {
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(initialState);
+ var tm = new TerminationManager();
+ var idGen = new OperationIdGenerator();
+ var lambdaContext = new TestLambdaContext();
+ var recorder = new RecordingBatcher();
+ var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher);
+ return (context, recorder, tm, state);
+ }
+
+ private static void WireServiceCallbackIdAllocation(
+ RecordingBatcher recorder, ExecutionState state, string callbackId)
+ {
+ recorder.OnFlush = ops =>
+ {
+ foreach (var op in ops)
+ {
+ if (op.Type == OperationTypes.Callback && op.Action == "START")
+ {
+ state.AddOperations(new[]
+ {
+ new Operation
+ {
+ Id = op.Id,
+ Type = OperationTypes.Callback,
+ Status = OperationStatuses.Started,
+ Name = op.Name,
+ CallbackDetails = new CallbackDetails { CallbackId = callbackId }
+ }
+ });
+ }
+ }
+ };
+ }
+
+ [Fact]
+ public async Task WaitForCallbackAsync_FreshExecution_RunsSubmitterAndSuspendsForCallback()
+ {
+ var (context, recorder, tm, state) = CreateContext();
+ WireServiceCallbackIdAllocation(recorder, state, "cb-wait-1");
+
+ string? receivedCallbackId = null;
+ var resultTask = context.WaitForCallbackAsync(
+ async (callbackId, ctx) =>
+ {
+ receivedCallbackId = callbackId;
+ Assert.NotNull(ctx.Logger);
+ await Task.CompletedTask;
+ },
+ name: "approval");
+
+ // Race the suspended user task against termination — same idiom as the
+ // production handler. Once Terminate() is called inside the inner
+ // GetResultAsync, this completes immediately.
+ var winner = await Task.WhenAny(resultTask, tm.TerminationTask);
+ Assert.Same(tm.TerminationTask, winner);
+
+ Assert.True(tm.IsTerminated);
+ Assert.False(resultTask.IsCompleted);
+ Assert.Equal("cb-wait-1", receivedCallbackId);
+
+ await recorder.Batcher.DrainAsync();
+
+ var actions = recorder.Flushed.Select(o => $"{o.Type}:{o.Action}:{o.SubType}").ToArray();
+ Assert.Equal(new[]
+ {
+ $"{OperationTypes.Context}:START:{OperationSubTypes.WaitForCallback}",
+ $"{OperationTypes.Callback}:START:{OperationSubTypes.Callback}",
+ $"{OperationTypes.Step}:START:{OperationSubTypes.Step}",
+ $"{OperationTypes.Step}:SUCCEED:{OperationSubTypes.Step}",
+ }, actions);
+ }
+
+ [Fact]
+ public async Task WaitForCallbackAsync_FreshExecution_KebabSuffixedSubOpNames()
+ {
+ var (context, recorder, tm, state) = CreateContext();
+ WireServiceCallbackIdAllocation(recorder, state, "cb-1");
+
+ var resultTask = context.WaitForCallbackAsync(
+ async (_, _) => await Task.CompletedTask,
+ name: "approval");
+
+ await Task.WhenAny(resultTask, tm.TerminationTask);
+ await recorder.Batcher.DrainAsync();
+
+ var callbackStart = recorder.Flushed.Single(o => o.Type == OperationTypes.Callback);
+ var stepSucceed = recorder.Flushed.Single(o => o.Type == OperationTypes.Step && o.Action == "SUCCEED");
+
+ Assert.Equal("approval-callback", callbackStart.Name);
+ Assert.Equal("approval-submitter", stepSucceed.Name);
+
+ // Avoid unobserved-task warning.
+ _ = resultTask;
+ }
+
+ [Fact]
+ public async Task WaitForCallbackAsync_FreshExecution_NullParentName_LeavesSubOpsNameless()
+ {
+ var (context, recorder, tm, state) = CreateContext();
+ WireServiceCallbackIdAllocation(recorder, state, "cb-1");
+
+ var resultTask = context.WaitForCallbackAsync(
+ async (_, _) => await Task.CompletedTask);
+
+ await Task.WhenAny(resultTask, tm.TerminationTask);
+ await recorder.Batcher.DrainAsync();
+
+ var callbackStart = recorder.Flushed.Single(o => o.Type == OperationTypes.Callback);
+ var stepSucceed = recorder.Flushed.Single(o => o.Type == OperationTypes.Step && o.Action == "SUCCEED");
+
+ Assert.Null(callbackStart.Name);
+ Assert.Null(stepSucceed.Name);
+
+ _ = resultTask;
+ }
+
+ [Fact]
+ public async Task WaitForCallbackAsync_ChildOperationIdsDeterministic()
+ {
+ var (context, recorder, tm, state) = CreateContext();
+ WireServiceCallbackIdAllocation(recorder, state, "cb-1");
+
+ var resultTask = context.WaitForCallbackAsync(
+ async (_, _) => await Task.CompletedTask,
+ name: "approval");
+
+ await Task.WhenAny(resultTask, tm.TerminationTask);
+ await recorder.Batcher.DrainAsync();
+
+ // Parent CONTEXT has IdAt(1); the inner callback is child #1, the inner
+ // submitter step is child #2 (under the same parent context op id).
+ var parentOpId = IdAt(1);
+ var callbackChildId = ChildIdAt(parentOpId, 1);
+ var submitterChildId = ChildIdAt(parentOpId, 2);
+
+ Assert.Equal(callbackChildId,
+ recorder.Flushed.Single(o => o.Type == OperationTypes.Callback).Id);
+ Assert.Equal(submitterChildId,
+ recorder.Flushed.Single(o => o.Type == OperationTypes.Step && o.Action == "SUCCEED").Id);
+
+ _ = resultTask;
+ }
+
+ [Fact]
+ public async Task WaitForCallbackAsync_CallbackTimeoutInheritsFromConfig()
+ {
+ var (context, recorder, tm, state) = CreateContext();
+ WireServiceCallbackIdAllocation(recorder, state, "cb-1");
+
+ var resultTask = context.WaitForCallbackAsync(
+ async (_, _) => await Task.CompletedTask,
+ name: "approval",
+ config: new WaitForCallbackConfig
+ {
+ Timeout = TimeSpan.FromHours(2),
+ HeartbeatTimeout = TimeSpan.FromMinutes(15),
+ });
+
+ await Task.WhenAny(resultTask, tm.TerminationTask);
+ await recorder.Batcher.DrainAsync();
+
+ var callbackStart = recorder.Flushed.Single(o => o.Type == OperationTypes.Callback);
+ Assert.NotNull(callbackStart.CallbackOptions);
+ Assert.Equal(7200, callbackStart.CallbackOptions.TimeoutSeconds);
+ Assert.Equal(900, callbackStart.CallbackOptions.HeartbeatTimeoutSeconds);
+
+ _ = resultTask;
+ }
+
+ [Fact]
+ public async Task WaitForCallbackAsync_ReplayWithCallbackSucceeded_ReturnsResult()
+ {
+ // Full replay: parent CONTEXT SUCCEEDED with the callback's deserialized
+ // payload as its checkpointed result.
+ var (context, recorder, tm, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Succeeded,
+ Name = "approval",
+ SubType = OperationSubTypes.WaitForCallback,
+ ContextDetails = new ContextDetails { Result = "\"approved\"" }
+ }
+ }
+ });
+
+ var executed = false;
+ var result = await context.WaitForCallbackAsync(
+ async (_, _) => { executed = true; await Task.CompletedTask; },
+ name: "approval");
+
+ Assert.False(executed); // Replay returns cached without re-running submitter.
+ Assert.Equal("approved", result);
+ Assert.False(tm.IsTerminated);
+
+ await recorder.Batcher.DrainAsync();
+ Assert.Empty(recorder.Flushed);
+ }
+
+ [Fact]
+ public async Task WaitForCallbackAsync_ReplayCallbackTimedOut_ThrowsCallbackTimeoutException()
+ {
+ // Inside-out replay: parent CONTEXT is STARTED (still in flight),
+ // inner callback is TIMED_OUT, inner submitter step has SUCCEEDED.
+ var parentId = IdAt(1);
+ var callbackChildId = ChildIdAt(parentId, 1);
+ var submitterChildId = ChildIdAt(parentId, 2);
+
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = parentId,
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Started,
+ Name = "approval",
+ SubType = OperationSubTypes.WaitForCallback,
+ },
+ new()
+ {
+ Id = callbackChildId,
+ Type = OperationTypes.Callback,
+ Status = OperationStatuses.TimedOut,
+ Name = "approval-callback",
+ ParentId = parentId,
+ CallbackDetails = new CallbackDetails
+ {
+ CallbackId = "cb-to-1",
+ Error = new ErrorObject { ErrorMessage = "callback timed out" }
+ }
+ },
+ new()
+ {
+ Id = submitterChildId,
+ Type = OperationTypes.Step,
+ Status = OperationStatuses.Succeeded,
+ Name = "approval-submitter",
+ ParentId = parentId,
+ StepDetails = new StepDetails { Result = "null" }
+ },
+ }
+ });
+
+ var ex = await Assert.ThrowsAsync(() =>
+ context.WaitForCallbackAsync(
+ async (_, _) => await Task.CompletedTask,
+ name: "approval"));
+
+ Assert.Equal("callback timed out", ex.Message);
+ Assert.Equal("cb-to-1", ex.CallbackId);
+ }
+
+ [Fact]
+ public async Task WaitForCallbackAsync_ReplayCallbackFailed_ThrowsCallbackFailedException()
+ {
+ var parentId = IdAt(1);
+ var callbackChildId = ChildIdAt(parentId, 1);
+ var submitterChildId = ChildIdAt(parentId, 2);
+
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = parentId,
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Started,
+ Name = "approval",
+ SubType = OperationSubTypes.WaitForCallback,
+ },
+ new()
+ {
+ Id = callbackChildId,
+ Type = OperationTypes.Callback,
+ Status = OperationStatuses.Failed,
+ Name = "approval-callback",
+ ParentId = parentId,
+ CallbackDetails = new CallbackDetails
+ {
+ CallbackId = "cb-fail-1",
+ Error = new ErrorObject
+ {
+ ErrorType = "ExternalSystemError",
+ ErrorMessage = "external rejected"
+ }
+ }
+ },
+ new()
+ {
+ Id = submitterChildId,
+ Type = OperationTypes.Step,
+ Status = OperationStatuses.Succeeded,
+ Name = "approval-submitter",
+ ParentId = parentId,
+ StepDetails = new StepDetails { Result = "null" }
+ },
+ }
+ });
+
+ var ex = await Assert.ThrowsAsync(() =>
+ context.WaitForCallbackAsync(
+ async (_, _) => await Task.CompletedTask,
+ name: "approval"));
+
+ Assert.Equal("external rejected", ex.Message);
+ Assert.Equal("cb-fail-1", ex.CallbackId);
+ Assert.Equal("ExternalSystemError", ex.ErrorType);
+ }
+
+ [Fact]
+ public async Task WaitForCallbackAsync_SubmitterFails_ThrowsCallbackSubmitterException()
+ {
+ // Replay: parent CONTEXT is FAILED with a Step-error inside.
+ var parentId = IdAt(1);
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = parentId,
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Failed,
+ Name = "approval",
+ SubType = OperationSubTypes.WaitForCallback,
+ ContextDetails = new ContextDetails
+ {
+ Error = new ErrorObject
+ {
+ ErrorType = typeof(StepException).FullName,
+ ErrorMessage = "submitter API failed",
+ ErrorData = "{\"code\":\"500\"}",
+ }
+ }
+ }
+ }
+ });
+
+ var ex = await Assert.ThrowsAsync(() =>
+ context.WaitForCallbackAsync(
+ async (_, _) => await Task.CompletedTask,
+ name: "approval"));
+
+ Assert.IsAssignableFrom(ex);
+ Assert.Equal("submitter API failed", ex.Message);
+ // On the replay path the live StepException was lost across invocations;
+ // we preserve the StepException type-name string and carry the
+ // ChildContextException as the InnerException for traceability.
+ Assert.NotNull(ex.InnerException);
+ Assert.Equal(typeof(StepException).FullName, ex.ErrorType);
+ Assert.Equal("{\"code\":\"500\"}", ex.ErrorData);
+ }
+
+ [Fact]
+ public async Task WaitForCallbackAsync_ReplayParentContextFailedWithCallbackTimeoutErrorType_PreservesSubclass()
+ {
+ // Subclass-fidelity guarantee: when the parent CONTEXT was checkpointed
+ // FAILED on a previous invocation with a CallbackTimeoutException
+ // ErrorType, replay must surface CallbackTimeoutException — not the
+ // more generic CallbackFailedException — so user catch blocks behave
+ // identically across live and replay paths.
+ var parentId = IdAt(1);
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = parentId,
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Failed,
+ Name = "approval",
+ SubType = OperationSubTypes.WaitForCallback,
+ ContextDetails = new ContextDetails
+ {
+ Error = new ErrorObject
+ {
+ ErrorType = typeof(CallbackTimeoutException).FullName,
+ ErrorMessage = "callback timed out after 24h",
+ }
+ }
+ }
+ }
+ });
+
+ var ex = await Assert.ThrowsAsync(() =>
+ context.WaitForCallbackAsync(
+ async (_, _) => await Task.CompletedTask,
+ name: "approval"));
+
+ // Concrete-type check: not just `is CallbackException` — must be the
+ // CallbackTimeoutException subclass exactly.
+ Assert.Equal(typeof(CallbackTimeoutException), ex.GetType());
+ Assert.Equal("callback timed out after 24h", ex.Message);
+ Assert.Equal(typeof(CallbackTimeoutException).FullName, ex.ErrorType);
+ }
+
+ [Fact]
+ public async Task WaitForCallbackAsync_ReplayParentContextFailedWithCallbackFailedErrorType_RemapsToCallbackFailed()
+ {
+ // Companion case: a stored CallbackFailedException ErrorType remaps to
+ // CallbackFailedException (not the base or CallbackTimeoutException).
+ var parentId = IdAt(1);
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = parentId,
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Failed,
+ Name = "approval",
+ SubType = OperationSubTypes.WaitForCallback,
+ ContextDetails = new ContextDetails
+ {
+ Error = new ErrorObject
+ {
+ ErrorType = typeof(CallbackFailedException).FullName,
+ ErrorMessage = "external rejected",
+ }
+ }
+ }
+ }
+ });
+
+ var ex = await Assert.ThrowsAsync(() =>
+ context.WaitForCallbackAsync(
+ async (_, _) => await Task.CompletedTask,
+ name: "approval"));
+
+ Assert.Equal(typeof(CallbackFailedException), ex.GetType());
+ Assert.Equal("external rejected", ex.Message);
+ }
+
+ [Fact]
+ public async Task WaitForCallbackAsync_RetryStrategyForwardedToSubmitterStep()
+ {
+ // Verifies the WaitForCallbackConfig.RetryStrategy gets passed into the
+ // submitter step's StepConfig (via the kebab "-submitter" inner step).
+ var (context, recorder, tm, state) = CreateContext();
+ WireServiceCallbackIdAllocation(recorder, state, "cb-1");
+
+ var seenAttempts = new List();
+ var resultTask = context.WaitForCallbackAsync(
+ async (_, ctx) =>
+ {
+ // The submitter receives an IWaitForCallbackContext (no AttemptNumber)
+ // — but this test doesn't need to verify retry mechanics, only
+ // that the StepConfig with a retry strategy is wired through.
+ seenAttempts.Add(seenAttempts.Count + 1);
+ await Task.CompletedTask;
+ },
+ name: "approval",
+ config: new WaitForCallbackConfig
+ {
+ RetryStrategy = new CountingRetryStrategy()
+ });
+
+ await Task.WhenAny(resultTask, tm.TerminationTask);
+ await recorder.Batcher.DrainAsync();
+
+ // Submitter ran exactly once (no failures to retry); a single STEP SUCCEED
+ // is sufficient evidence that the strategy was wired without throwing.
+ Assert.Single(recorder.Flushed.Where(o => o.Type == OperationTypes.Step && o.Action == "SUCCEED"));
+
+ _ = resultTask;
+ }
+
+ [Fact]
+ public async Task WaitForCallbackAsync_AOTOverload_UsesSuppliedSerializer()
+ {
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = IdAt(1),
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Succeeded,
+ Name = "approval",
+ SubType = OperationSubTypes.WaitForCallback,
+ ContextDetails = new ContextDetails { Result = "raw-payload" }
+ }
+ }
+ });
+
+ var result = await context.WaitForCallbackAsync(
+ async (_, _) => await Task.CompletedTask,
+ new MarkerSerializer(),
+ name: "approval");
+
+ Assert.Equal("custom:raw-payload", result);
+ }
+
+ [Fact]
+ public async Task WaitForCallbackAsync_SubmitterContext_IsIWaitForCallbackContext_NotIStepContext()
+ {
+ // Verifies the submitter delegate receives our distinct
+ // IWaitForCallbackContext type (not IStepContext) — protects the
+ // architectural decision against accidental conflation.
+ var (context, recorder, tm, state) = CreateContext();
+ WireServiceCallbackIdAllocation(recorder, state, "cb-1");
+
+ Type? observedContextType = null;
+ var resultTask = context.WaitForCallbackAsync(
+ async (_, ctx) =>
+ {
+ observedContextType = ctx.GetType();
+ await Task.CompletedTask;
+ },
+ name: "approval");
+
+ await Task.WhenAny(resultTask, tm.TerminationTask);
+ await recorder.Batcher.DrainAsync();
+
+ Assert.NotNull(observedContextType);
+ Assert.True(typeof(IWaitForCallbackContext).IsAssignableFrom(observedContextType));
+ Assert.False(typeof(IStepContext).IsAssignableFrom(observedContextType));
+
+ _ = resultTask;
+ }
+
+ private sealed class CountingRetryStrategy : IRetryStrategy
+ {
+ public int Attempts;
+ public RetryDecision ShouldRetry(Exception exception, int attemptNumber)
+ {
+ Attempts = attemptNumber;
+ return RetryDecision.DoNotRetry();
+ }
+ }
+
+ private sealed class MarkerSerializer : ICheckpointSerializer
+ {
+ public string Serialize(string value, SerializationContext context) => $"custom:{value}";
+ public string Deserialize(string data, SerializationContext context) => $"custom:{data}";
+ }
+}