diff --git a/Docs/durable-execution-design.md b/Docs/durable-execution-design.md index 402d689af..3d0517339 100644 --- a/Docs/durable-execution-design.md +++ b/Docs/durable-execution-design.md @@ -1108,18 +1108,40 @@ public interface IDurableContext CancellationToken cancellationToken = default); /// - /// Create a callback for external system integration. + /// Create a callback for external system integration. The result is + /// deserialized using reflection-based System.Text.Json; an AOT-safe + /// overload taking ICheckpointSerializer<T> is also provided. /// + [RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + [RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + Task> CreateCallbackAsync( + string? name = null, + CallbackConfig? config = null, + CancellationToken cancellationToken = default); + + /// AOT-safe overload of CreateCallbackAsync. Task> CreateCallbackAsync( + ICheckpointSerializer serializer, string? name = null, CallbackConfig? config = null, CancellationToken cancellationToken = default); /// - /// Wait for an external system to respond via callback. + /// Wait for an external system to respond via callback. Composes + /// CreateCallback + Step(submitter) + GetResult inside a child context. /// + [RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + [RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] Task WaitForCallbackAsync( - Func submitter, + Func submitter, + string? name = null, + WaitForCallbackConfig? config = null, + CancellationToken cancellationToken = default); + + /// AOT-safe overload of WaitForCallbackAsync. + Task WaitForCallbackAsync( + Func submitter, + ICheckpointSerializer serializer, string? name = null, WaitForCallbackConfig? config = null, CancellationToken cancellationToken = default); @@ -1208,6 +1230,20 @@ public interface IStepContext string OperationId { get; } } +/// +/// Context passed to the submitter delegate of WaitForCallbackAsync. +/// Distinct from so the submitter API can evolve +/// independently. Mirrors WaitForCallbackContext in the Python and +/// JavaScript SDKs (logger-only surface). +/// +public interface IWaitForCallbackContext +{ + /// + /// Logger scoped to the submitter step (replay-safe). + /// + ILogger Logger { get; } +} + /// /// A named branch for parallel execution. Named branches appear in execution /// traces and can be inspected by name in the test runner. @@ -1568,7 +1604,9 @@ public interface ICallback /// Wait for and return the callback result. /// Suspends execution until the result is available. /// - Task GetResultAsync(CancellationToken cancellationToken = default); + /// External system reported failure. + /// Service marked the callback TIMED_OUT. + Task GetResultAsync(CancellationToken cancellationToken = default); } /// @@ -1605,14 +1643,31 @@ public class StepException : DurableExecutionException } /// -/// Thrown when a callback fails or times out. +/// Base exception for callback failures. Concrete subclasses distinguish +/// failure modes — pattern-match the subclass type rather than inspecting +/// a flag. /// public class CallbackException : DurableExecutionException { - public string? CallbackId { get; } - public bool IsTimeout { get; } + public string? CallbackId { get; init; } + public string? ErrorType { get; init; } + public string? ErrorData { get; init; } + public IReadOnlyList? OriginalStackTrace { get; init; } } +/// External system reported a failure result for the callback. +public class CallbackFailedException : CallbackException { } + +/// Service marked the callback TIMED_OUT (overall or heartbeat). +public class CallbackTimeoutException : CallbackException { } + +/// +/// Submitter step (the inner step inside WaitForCallbackAsync) failed +/// after retries are exhausted. Wraps the underlying StepException. +/// Only thrown from WaitForCallbackAsync. +/// +public class CallbackSubmitterException : CallbackException { } + /// /// Thrown when an invoked function fails. /// diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/CallbackConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/CallbackConfig.cs new file mode 100644 index 000000000..4f30c5ab8 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/CallbackConfig.cs @@ -0,0 +1,83 @@ +namespace Amazon.Lambda.DurableExecution; + +/// +/// Configuration for callback operations created via +/// . +/// +/// +/// Custom serializers are not supplied here — pass an +/// directly to the AOT-safe +/// CreateCallbackAsync overload instead, matching the established +/// StepAsync pattern. +/// +public class CallbackConfig +{ + private TimeSpan _timeout = TimeSpan.Zero; + private TimeSpan _heartbeatTimeout = TimeSpan.Zero; + + /// + /// Maximum total time the service will wait for the external system to + /// complete the callback. (default) means no + /// overall timeout — only applies (if set). + /// + /// + /// The service's timer granularity is 1 second, so values strictly between + /// and 1 second are rejected to avoid silent + /// rounding. Use to disable the timeout, or a + /// value of at least 1 second. + /// + /// + /// Thrown when set to a positive value less than 1 second. + /// + public TimeSpan Timeout + { + get => _timeout; + set + { + ValidateTimeout(value, nameof(Timeout)); + _timeout = value; + } + } + + /// + /// Maximum gap between heartbeat signals from the external system before + /// the service marks the callback as timed-out. + /// (default) means no heartbeat timeout. + /// + /// + /// The service's timer granularity is 1 second, so values strictly between + /// and 1 second are rejected to avoid silent + /// rounding. Use to disable the heartbeat + /// timeout, or a value of at least 1 second. + /// + /// + /// Thrown when set to a positive value less than 1 second. + /// + public TimeSpan HeartbeatTimeout + { + get => _heartbeatTimeout; + set + { + ValidateTimeout(value, nameof(HeartbeatTimeout)); + _heartbeatTimeout = value; + } + } + + private static void ValidateTimeout(TimeSpan value, string paramName) + { + // Allow Zero (means "not set"); reject negative; reject sub-second + // positive values to mirror WaitAsync's behavior and prevent silent + // rounding-up inside BuildCallbackOptions. + if (value < TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException( + paramName, value, $"{paramName} must be non-negative."); + } + if (value > TimeSpan.Zero && value < TimeSpan.FromSeconds(1)) + { + throw new ArgumentOutOfRangeException( + paramName, value, + $"{paramName} must be at least 1 second (or TimeSpan.Zero to disable)."); + } + } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitForCallbackConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitForCallbackConfig.cs new file mode 100644 index 000000000..1dceb8746 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/WaitForCallbackConfig.cs @@ -0,0 +1,18 @@ +namespace Amazon.Lambda.DurableExecution; + +/// +/// Configuration for the composite +/// +/// operation. Inherits the callback's and +/// ; adds a +/// for the submitter step. +/// +public class WaitForCallbackConfig : CallbackConfig +{ + /// + /// Retry strategy applied to the submitter step. When null (default), + /// submitter failures are not retried — the submitter step fails terminally + /// and surfaces as . + /// + public IRetryStrategy? RetryStrategy { get; set; } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs index 69e5e580c..e814a7c1b 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs @@ -162,6 +162,235 @@ private Task RunChildContext( _state, _terminationManager, _durableExecutionArn, _batcher); return op.ExecuteAsync(cancellationToken); } + + [RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + [RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + public Task> CreateCallbackAsync( + string? name = null, + CallbackConfig? config = null, + CancellationToken cancellationToken = default) + => RunCallback(new ReflectionJsonCheckpointSerializer(), name, config, cancellationToken); + + public Task> CreateCallbackAsync( + ICheckpointSerializer serializer, + string? name = null, + CallbackConfig? config = null, + CancellationToken cancellationToken = default) + => RunCallback(serializer, name, config, cancellationToken); + + private Task> RunCallback( + ICheckpointSerializer serializer, + string? name, + CallbackConfig? config, + CancellationToken cancellationToken) + { + var operationId = _idGenerator.NextId(); + var op = new CallbackOperation( + operationId, name, config, serializer, + _state, _terminationManager, _durableExecutionArn, _batcher); + return op.ExecuteAsync(cancellationToken); + } + + [RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + [RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + public Task WaitForCallbackAsync( + Func submitter, + string? name = null, + WaitForCallbackConfig? config = null, + CancellationToken cancellationToken = default) + => RunWaitForCallback(submitter, new ReflectionJsonCheckpointSerializer(), name, config, cancellationToken); + + public Task WaitForCallbackAsync( + Func submitter, + ICheckpointSerializer serializer, + string? name = null, + WaitForCallbackConfig? config = null, + CancellationToken cancellationToken = default) + => RunWaitForCallback(submitter, serializer, name, config, cancellationToken); + + /// + /// Composes WaitForCallback over RunInChildContextAsync + CreateCallbackAsync + /// + StepAsync(submitter) + callback.GetResultAsync. Mirrors the pattern used + /// by the Python, JS, and Java reference SDKs. + /// + /// + /// Sub-operation naming follows kebab-style: "{name}-callback" and + /// "{name}-submitter". When the parent is null, + /// the inner ops are also nameless (no leading hyphen). + /// + /// remaps a submitter + /// to . + /// Callback errors () pass through unchanged. + /// + /// + private Task RunWaitForCallback( + Func submitter, + ICheckpointSerializer serializer, + string? name, + WaitForCallbackConfig? config, + CancellationToken cancellationToken) + { + var callbackName = name == null ? null : $"{name}-callback"; + var submitterName = name == null ? null : $"{name}-submitter"; + + var callbackConfig = config == null ? null : new CallbackConfig + { + Timeout = config.Timeout, + HeartbeatTimeout = config.HeartbeatTimeout, + }; + + var stepConfig = config?.RetryStrategy == null + ? null + : new StepConfig { RetryStrategy = config.RetryStrategy }; + + // Use the AOT-safe child-context overload so the wrapper itself doesn't + // need [RequiresUnreferencedCode] beyond the public WaitForCallbackAsync + // attribute. The user-supplied serializer (or the reflection serializer + // wired in by the reflection overload) is used to deserialize the inner + // callback's payload AND the child-context's own result (which is just + // an unwrap of T returned from the await — same shape). + return RunChildContext( + async childCtx => + { + var callback = await childCtx.CreateCallbackAsync( + serializer, + name: callbackName, + config: callbackConfig, + cancellationToken: cancellationToken); + + await childCtx.StepAsync( + async (stepCtx) => + { + var submitterCtx = new WaitForCallbackContext(stepCtx.Logger); + await submitter(callback.CallbackId, submitterCtx); + }, + name: submitterName, + config: stepConfig, + cancellationToken: cancellationToken); + + return await callback.GetResultAsync(cancellationToken); + }, + serializer, + name, + new ChildContextConfig + { + SubType = OperationSubTypes.WaitForCallback, + ErrorMapping = MapWaitForCallbackException, + }, + cancellationToken); + } + + private static Exception MapWaitForCallbackException(Exception ex) + { + // Callback errors are already user-meaningful (CallbackFailed/Timeout + // from inside the callback await). Pass through. + if (ex is CallbackException) return ex; + + // The ChildContextOperation wraps thrown exceptions in + // ChildContextException; unwrap to surface the underlying cause. + if (ex is ChildContextException childEx) + { + // CallbackException thrown from GetResultAsync (callback completed + // with FAILED/TIMED_OUT) — surface directly. + // + // Fresh-execution path: InnerException is the live exception object. + // Replay path: InnerException is null but ErrorType carries the string. + if (childEx.InnerException is CallbackException nestedLive) + return nestedLive; + if (IsCallbackErrorTypeString(childEx.ErrorType)) + { + // Replay-side reconstruction: preserve subclass fidelity by + // dispatching on the stored ErrorType FullName so a stored + // CallbackTimeoutException remaps to CallbackTimeoutException + // (not the more generic CallbackFailedException). + return BuildCallbackExceptionForReplay(childEx); + } + + // Submitter step exhausted retries → wrap as CallbackSubmitterException. + // Fresh path: InnerException is the live StepException. + if (childEx.InnerException is StepException stepLive) + { + return new CallbackSubmitterException(stepLive.Message, stepLive) + { + ErrorType = stepLive.ErrorType, + ErrorData = stepLive.ErrorData, + OriginalStackTrace = stepLive.OriginalStackTrace, + }; + } + // Replay path: InnerException is null; ErrorType is the type string. + if (childEx.ErrorType == typeof(StepException).FullName) + { + return new CallbackSubmitterException(childEx.Message, childEx) + { + ErrorType = childEx.ErrorType, + ErrorData = childEx.ErrorData, + OriginalStackTrace = childEx.OriginalStackTrace, + }; + } + } + + // Anything else — surface unchanged so the user sees the original cause. + return ex; + } + + private static CallbackException BuildCallbackExceptionForReplay(ChildContextException childEx) + { + // Dispatch on the stored ErrorType FullName to preserve the original + // subclass across replays. Caller has already verified + // IsCallbackErrorTypeString(childEx.ErrorType) is true. + if (childEx.ErrorType == typeof(CallbackTimeoutException).FullName) + { + return new CallbackTimeoutException(childEx.Message, childEx) + { + ErrorType = childEx.ErrorType, + ErrorData = childEx.ErrorData, + OriginalStackTrace = childEx.OriginalStackTrace, + }; + } + if (childEx.ErrorType == typeof(CallbackSubmitterException).FullName) + { + return new CallbackSubmitterException(childEx.Message, childEx) + { + ErrorType = childEx.ErrorType, + ErrorData = childEx.ErrorData, + OriginalStackTrace = childEx.OriginalStackTrace, + }; + } + if (childEx.ErrorType == typeof(CallbackException).FullName) + { + return new CallbackException(childEx.Message, childEx) + { + ErrorType = childEx.ErrorType, + ErrorData = childEx.ErrorData, + OriginalStackTrace = childEx.OriginalStackTrace, + }; + } + // CallbackFailedException.FullName (or any future callback subtype not + // listed above) defaults to CallbackFailedException — the most general + // "callback failed" surface that preserves user-catchable behavior. + return new CallbackFailedException(childEx.Message, childEx) + { + ErrorType = childEx.ErrorType, + ErrorData = childEx.ErrorData, + OriginalStackTrace = childEx.OriginalStackTrace, + }; + } + + private static bool IsCallbackErrorTypeString(string? errorType) => + errorType == typeof(CallbackFailedException).FullName + || errorType == typeof(CallbackTimeoutException).FullName + || errorType == typeof(CallbackSubmitterException).FullName + || errorType == typeof(CallbackException).FullName; +} + +internal sealed class WaitForCallbackContext : IWaitForCallbackContext +{ + public WaitForCallbackContext(ILogger logger) + { + Logger = logger; + } + + public ILogger Logger { get; } } /// diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs index d629a0b2e..0ed3e57a8 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs @@ -171,7 +171,12 @@ private static async Task WrapAsyncCore serviceClient.CheckpointAsync( - invocationInput.DurableExecutionArn, token, ops, ct)); + invocationInput.DurableExecutionArn, token, ops, + // The service stamps a freshly-allocated CallbackId onto a started + // CALLBACK op (and may emit terminal-state callbacks/timers); merge + // those back into ExecutionState so the next ExecuteAsync sees them. + onNewOperations: state.AddOperations, + cancellationToken: ct)); var context = new DurableContext( state, terminationManager, idGenerator, diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/CallbackException.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/CallbackException.cs new file mode 100644 index 000000000..ff878a0a9 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/CallbackException.cs @@ -0,0 +1,86 @@ +namespace Amazon.Lambda.DurableExecution; + +/// +/// Base exception type for callback failures surfaced from +/// +/// or +/// . +/// Concrete subclasses distinguish failure modes — pattern-match +/// , , +/// or in catch clauses. +/// +public class CallbackException : DurableExecutionException +{ + /// The callback ID associated with the failure (if known). + public string? CallbackId { get; init; } + + /// The fully-qualified type name of the original error, if known. + public string? ErrorType { get; init; } + + /// Optional structured error data attached by the external system. + public string? ErrorData { get; init; } + + /// Stack trace of the original error, captured before serialization. + public IReadOnlyList? OriginalStackTrace { get; init; } + + /// Creates an empty . + public CallbackException() { } + + /// Creates a with the given message. + public CallbackException(string message) : base(message) { } + + /// Creates a wrapping an inner exception. + public CallbackException(string message, Exception innerException) : base(message, innerException) { } +} + +/// +/// Thrown when the external system reports a failure result for a callback +/// (via SendDurableExecutionCallbackFailure). +/// +public class CallbackFailedException : CallbackException +{ + /// Creates an empty . + public CallbackFailedException() { } + + /// Creates a with the given message. + public CallbackFailedException(string message) : base(message) { } + + /// Creates a wrapping an inner exception. + public CallbackFailedException(string message, Exception innerException) : base(message, innerException) { } +} + +/// +/// Thrown when the durable execution service marks a callback as timed-out — +/// either the overall or the +/// elapsed. +/// +public class CallbackTimeoutException : CallbackException +{ + /// Creates an empty . + public CallbackTimeoutException() { } + + /// Creates a with the given message. + public CallbackTimeoutException(string message) : base(message) { } + + /// Creates a wrapping an inner exception. + public CallbackTimeoutException(string message, Exception innerException) : base(message, innerException) { } +} + +/// +/// Thrown only from +/// +/// when the user-supplied submitter delegate (the step that hands the callback +/// ID to the external system) fails after retries are exhausted. Wraps the +/// underlying as . +/// +public class CallbackSubmitterException : CallbackException +{ + /// Creates an empty . + public CallbackSubmitterException() { } + + /// Creates a with the given message. + public CallbackSubmitterException(string message) : base(message) { } + + /// Creates a wrapping an inner exception. + public CallbackSubmitterException(string message, Exception innerException) : base(message, innerException) { } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/ICallback.cs b/Libraries/src/Amazon.Lambda.DurableExecution/ICallback.cs new file mode 100644 index 000000000..a00b8e3c4 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/ICallback.cs @@ -0,0 +1,39 @@ +namespace Amazon.Lambda.DurableExecution; + +/// +/// A pending callback created by +/// . +/// Hands back a for external systems to use, plus a +/// hook that +/// suspends the workflow until the external system completes the callback. +/// +/// The callback result type. +public interface ICallback +{ + /// + /// The callback ID generated by the durable execution service. External + /// systems pass this ID to SendDurableExecutionCallbackSuccess / + /// SendDurableExecutionCallbackFailure / + /// SendDurableExecutionCallbackHeartbeat to deliver a result. + /// + string CallbackId { get; } + + /// + /// Suspends the workflow until the callback is completed, then returns the + /// deserialized result. + /// + /// + /// On the first invocation that reaches this call, the workflow suspends + /// (Lambda terminates). When the external system completes the callback + /// the service re-invokes Lambda; this call then returns the cached result + /// without re-executing user code. + /// + /// + /// Thrown when the external system reported a failure result. + /// + /// + /// Thrown when the service timed out the callback (overall timeout or + /// heartbeat timeout elapsed). + /// + Task GetResultAsync(CancellationToken cancellationToken = default); +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs index eb10a0ffe..9bdc8059a 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs @@ -119,6 +119,78 @@ Task RunInChildContextAsync( string? name = null, ChildContextConfig? config = null, CancellationToken cancellationToken = default); + + /// + /// Create a callback for an external system to complete. Returns an + /// handle exposing the service-allocated + /// (pass to the external system) and + /// + /// (await to suspend until a result arrives). + /// + /// + /// The result is deserialized using reflection-based System.Text.Json. + /// For NativeAOT or trimmed deployments, use the overload that takes an + /// . + /// + /// Errors are deferred to ; + /// CreateCallbackAsync always returns successfully so user code + /// between CreateCallbackAsync and the result-await runs deterministically + /// across replays. + /// + /// + [RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + [RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + Task> CreateCallbackAsync( + string? name = null, + CallbackConfig? config = null, + CancellationToken cancellationToken = default); + + /// + /// Create a callback with AOT-safe checkpoint deserialization. The supplied + /// is used in place of reflection-based JSON + /// when + /// resolves the callback's payload. + /// + Task> CreateCallbackAsync( + ICheckpointSerializer serializer, + string? name = null, + CallbackConfig? config = null, + CancellationToken cancellationToken = default); + + /// + /// Composite operation that creates a callback, runs the supplied submitter + /// (which hands the callbackId to an external system), and suspends + /// until the external system delivers a result. Equivalent to manually + /// composing + /// + + /// + + /// inside a child context. + /// + /// + /// Submitter failures (after retries are exhausted) surface as + /// . Callback failures and timeouts + /// surface as / + /// . + /// + [RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + [RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + Task WaitForCallbackAsync( + Func submitter, + string? name = null, + WaitForCallbackConfig? config = null, + CancellationToken cancellationToken = default); + + /// + /// AOT-safe overload of + /// . + /// The supplied is used in place of reflection-based JSON. + /// + Task WaitForCallbackAsync( + Func submitter, + ICheckpointSerializer serializer, + string? name = null, + WaitForCallbackConfig? config = null, + CancellationToken cancellationToken = default); } /// diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/IWaitForCallbackContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/IWaitForCallbackContext.cs new file mode 100644 index 000000000..4ed8cd4ba --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/IWaitForCallbackContext.cs @@ -0,0 +1,21 @@ +using Microsoft.Extensions.Logging; + +namespace Amazon.Lambda.DurableExecution; + +/// +/// Context passed to the submitter delegate of +/// . +/// Provides a replay-safe logger scoped to the submitter step. +/// +/// +/// Distinct from so the submitter API can evolve +/// independently. Mirrors WaitForCallbackContext in the Python and +/// JavaScript SDKs (logger-only surface). +/// +public interface IWaitForCallbackContext +{ + /// + /// Logger scoped to the submitter step. + /// + ILogger Logger { get; } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CallbackOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CallbackOperation.cs new file mode 100644 index 000000000..ee5c842bb --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/CallbackOperation.cs @@ -0,0 +1,240 @@ +using SdkCallbackOptions = Amazon.Lambda.Model.CallbackOptions; +using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate; + +namespace Amazon.Lambda.DurableExecution.Internal; + +/// +/// Durable callback operation. Sync-flushes a CALLBACK START checkpoint +/// (the service stamps a freshly-allocated CallbackId onto the response, +/// which the batcher merges back into ), then hands +/// the user an they can later +/// +/// to suspend on. +/// +/// +/// Replay branches — example: +/// +/// var cb = await ctx.CreateCallbackAsync<ApprovalResult>(name: "approval"); +/// // ... external system told to use cb.CallbackId ... +/// var result = await cb.GetResultAsync(); +/// +/// +/// Fresh: no prior state → sync-flush CALLBACK START; +/// the service responds with a CallbackId (merged into state by the +/// batcher); construct the and return it. +/// then suspends. +/// STARTED: a CallbackId is already on the checkpoint; reuse it. +/// suspends (the external system hasn't +/// responded yet) — service re-invokes once it does. +/// SUCCEEDED / FAILED / TIMED_OUT: terminal — construct the +/// with the cached state and return. +/// immediately deserializes / throws. +/// +/// CRITICAL: CreateCallbackAsync always succeeds — it returns the +/// handle regardless of terminal state. Errors are +/// deferred to +/// so user code between CreateCallbackAsync and the result-await runs +/// deterministically across replays. Mirrors Python CallbackOperationExecutor's +/// "do NOT raise on FAILED" rule. +/// +internal sealed class CallbackOperation : DurableOperation>, ICallback +{ + private readonly CallbackConfig? _config; + private readonly ICheckpointSerializer _serializer; + + private string? _callbackId; + + public CallbackOperation( + string operationId, + string? name, + CallbackConfig? config, + ICheckpointSerializer serializer, + ExecutionState state, + TerminationManager termination, + string durableExecutionArn, + CheckpointBatcher? batcher = null) + : base(operationId, name, state, termination, durableExecutionArn, batcher) + { + _config = config; + _serializer = serializer; + } + + protected override string OperationType => OperationTypes.Callback; + + /// + /// Set when an existing terminal-state checkpoint was observed during + /// dispatch. reads this directly to short- + /// circuit deserialization (or throw the recorded error) without suspending. + /// + private Operation? _terminalReplay; + + /// + public string CallbackId => _callbackId + ?? throw new InvalidOperationException( + "CallbackId is unavailable. Ensure CreateCallbackAsync has completed before reading CallbackId."); + + protected override async Task> StartAsync(CancellationToken cancellationToken) + { + // Sync-flush the START so the service can allocate a CallbackId for us. + // The batcher's onNewOperations hook merges the service's response into + // ExecutionState, so reading state.GetOperation(OperationId) right after + // the await sees the populated CallbackDetails. + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + Type = OperationTypes.Callback, + Action = "START", + SubType = OperationSubTypes.Callback, + Name = Name, + CallbackOptions = BuildCallbackOptions() + }, cancellationToken); + + var stamped = State.GetOperation(OperationId); + var callbackId = stamped?.CallbackDetails?.CallbackId; + if (string.IsNullOrEmpty(callbackId)) + { + // Service didn't return a CallbackId — this is a service-contract + // violation, not user error. Surface as a non-deterministic error + // so the workflow fails fast rather than silently NRE-ing later. + throw new NonDeterministicExecutionException( + $"Callback operation '{Name ?? OperationId}' was started but the service did not return a CallbackId."); + } + + _callbackId = callbackId; + + // If the service already reported a terminal state on the START response + // (the external system replied synchronously, or timeout was instant), + // record it for GetResultAsync to short-circuit on. + if (IsTerminalStatus(stamped?.Status)) + { + _terminalReplay = stamped; + } + + return this; + } + + protected override Task> ReplayAsync(Operation existing, CancellationToken cancellationToken) + { + var callbackId = existing.CallbackDetails?.CallbackId; + if (string.IsNullOrEmpty(callbackId)) + { + throw new NonDeterministicExecutionException( + $"Callback operation '{Name ?? OperationId}' has no CallbackId on its checkpoint."); + } + + _callbackId = callbackId; + + // CRITICAL (matches Python CallbackOperationExecutor): we must NOT + // raise on terminal state here. CreateCallbackAsync always returns the + // ICallback handle so any user code between create and GetResult runs + // deterministically across replays. Defer status inspection to + // GetResultAsync below. + switch (existing.Status) + { + case OperationStatuses.Succeeded: + case OperationStatuses.Failed: + case OperationStatuses.TimedOut: + _terminalReplay = existing; + break; + + case OperationStatuses.Started: + case OperationStatuses.Pending: + // External system hasn't responded yet — GetResultAsync will + // suspend so the service can re-invoke once it does. + break; + + default: + throw new NonDeterministicExecutionException( + $"Callback operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay."); + } + + return Task.FromResult>(this); + } + + /// + public async Task GetResultAsync(CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + // Terminal-state checkpoint already observed by Start/Replay — return + // (or throw) immediately without suspending. + if (_terminalReplay != null) + { + return ResolveTerminal(_terminalReplay); + } + + // No terminal state yet. Suspend the workflow; the service re-invokes + // when the external system delivers a result. + return await Termination.SuspendAndAwait( + TerminationReason.CallbackPending, + $"callback:{Name ?? OperationId}"); + } + + private T ResolveTerminal(Operation op) + { + switch (op.Status) + { + case OperationStatuses.Succeeded: + var serialized = op.CallbackDetails?.Result; + if (serialized == null) return default!; + return _serializer.Deserialize( + serialized, + new SerializationContext(OperationId, DurableExecutionArn)); + + case OperationStatuses.Failed: + throw BuildFailedException(op); + + case OperationStatuses.TimedOut: + throw BuildTimeoutException(op); + + default: + // Should be unreachable — _terminalReplay is only set for terminal statuses. + throw new NonDeterministicExecutionException( + $"Callback operation '{Name ?? OperationId}' has unexpected status '{op.Status}' on result resolution."); + } + } + + private CallbackFailedException BuildFailedException(Operation op) + { + var err = op.CallbackDetails?.Error; + var message = err?.ErrorMessage ?? "Callback failed"; + return new CallbackFailedException(message) + { + CallbackId = op.CallbackDetails?.CallbackId, + ErrorType = err?.ErrorType, + ErrorData = err?.ErrorData, + OriginalStackTrace = err?.StackTrace, + }; + } + + private CallbackTimeoutException BuildTimeoutException(Operation op) + { + var err = op.CallbackDetails?.Error; + var message = err?.ErrorMessage ?? "Callback timed out"; + return new CallbackTimeoutException(message) + { + CallbackId = op.CallbackDetails?.CallbackId, + ErrorType = err?.ErrorType, + ErrorData = err?.ErrorData, + OriginalStackTrace = err?.StackTrace, + }; + } + + private SdkCallbackOptions? BuildCallbackOptions() + { + if (_config == null) return null; + if (_config.Timeout == TimeSpan.Zero && _config.HeartbeatTimeout == TimeSpan.Zero) return null; + + var options = new SdkCallbackOptions(); + if (_config.Timeout > TimeSpan.Zero) + options.TimeoutSeconds = (int)Math.Max(1, Math.Ceiling(_config.Timeout.TotalSeconds)); + if (_config.HeartbeatTimeout > TimeSpan.Zero) + options.HeartbeatTimeoutSeconds = (int)Math.Max(1, Math.Ceiling(_config.HeartbeatTimeout.TotalSeconds)); + return options; + } + + private static bool IsTerminalStatus(string? status) => + status == OperationStatuses.Succeeded + || status == OperationStatuses.Failed + || status == OperationStatuses.TimedOut; +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs index dba534f6b..946760413 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs @@ -19,11 +19,17 @@ public LambdaDurableServiceClient(IAmazonLambda lambdaClient) /// /// Flushes pending checkpoint operations to the durable execution service. + /// When is supplied, any + /// NewExecutionState.Operations the service returns (e.g. a freshly + /// allocated CallbackId after a callback START checkpoint, or a + /// timer-fired SUCCEEDED) are forwarded to the callback so the caller can + /// merge them into its in-memory . /// public async Task CheckpointAsync( string durableExecutionArn, string? checkpointToken, IReadOnlyList pendingOperations, + Action>? onNewOperations = null, CancellationToken cancellationToken = default) { if (pendingOperations.Count == 0) @@ -37,6 +43,22 @@ public LambdaDurableServiceClient(IAmazonLambda lambdaClient) }; var response = await _lambdaClient.CheckpointDurableExecutionAsync(request, cancellationToken); + + // The service returns NewExecutionState carrying any operations updated + // since the last checkpoint — most importantly, the callback ID stamped + // onto a freshly-started CALLBACK op, plus any externally-completed + // callbacks/timers. Hand them to the caller (DurableFunction wires this + // back into ExecutionState) so subsequent replay-style lookups see the + // updated state immediately. + var updated = response.NewExecutionState?.Operations; + if (onNewOperations != null && updated != null && updated.Count > 0) + { + var mapped = new List(updated.Count); + foreach (var sdkOp in updated) + mapped.Add(MapFromSdkOperation(sdkOp)); + onNewOperations(mapped); + } + return response.CheckpointToken; } @@ -111,6 +133,16 @@ private static Internal.Operation MapFromSdkOperation(SdkOperation sdkOp) ErrorType = sdkOp.ContextDetails.Error.ErrorType, ErrorMessage = sdkOp.ContextDetails.Error.ErrorMessage } : null + } : null, + CallbackDetails = sdkOp.CallbackDetails != null ? new Internal.CallbackDetails + { + CallbackId = sdkOp.CallbackDetails.CallbackId, + Result = sdkOp.CallbackDetails.Result, + Error = sdkOp.CallbackDetails.Error != null ? new ErrorObject + { + ErrorType = sdkOp.CallbackDetails.Error.ErrorType, + ErrorMessage = sdkOp.CallbackDetails.Error.ErrorMessage + } : null } : null }; } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CallbackFailedTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CallbackFailedTest.cs new file mode 100644 index 000000000..de643c99f --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CallbackFailedTest.cs @@ -0,0 +1,89 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class CallbackFailedTest +{ + private readonly ITestOutputHelper _output; + public CallbackFailedTest(ITestOutputHelper output) => _output = output; + + /// + /// End-to-end failure path for CreateCallbackAsync: + /// the test acts as the external system and reports a failure via + /// SendDurableExecutionCallbackFailure. The SDK should raise + /// from GetResultAsync, and the + /// workflow surfaces FAILED with that exception type recorded. + /// + [Fact] + public async Task CallbackFailed_SurfacesAsCallbackFailedException() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("CallbackFailedFunction"), + "cb-failed", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "x"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Initial response: {responsePayload}"); + + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + var callbackId = await WaitForCallbackIdAsync(deployment, arn!, TimeSpan.FromSeconds(60)); + Assert.False(string.IsNullOrEmpty(callbackId)); + _output.WriteLine($"Service-allocated CallbackId: {callbackId}"); + + // Act as the external system: report a failure. + await deployment.LambdaClient.SendDurableExecutionCallbackFailureAsync( + new SendDurableExecutionCallbackFailureRequest + { + CallbackId = callbackId!, + Error = new Amazon.Lambda.Model.ErrorObject + { + ErrorType = "ApprovalRejected", + ErrorMessage = "external system rejected the request", + } + }); + + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("FAILED", status, ignoreCase: true); + + // The workflow's surfaced exception is CallbackFailedException — the SDK + // wraps the external error message into the exception's Message. Verify + // the recorded error type is the SDK's CallbackFailedException and that + // the original failure message survives. + var execution = await deployment.GetExecutionAsync(arn!); + Assert.NotNull(execution.Error); + Assert.Equal(typeof(CallbackFailedException).FullName, execution.Error.ErrorType); + Assert.Contains("rejected", execution.Error.ErrorMessage); + + // History records both Started and Failed for the same callback. + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Any(e => e.EventType == EventType.CallbackStarted) ?? false) + && (h.Events?.Any(e => e.EventType == EventType.CallbackFailed) ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + Assert.Single(events.Where(e => e.EventType == EventType.CallbackStarted)); + Assert.Single(events.Where(e => e.EventType == EventType.CallbackFailed)); + } + + private static async Task WaitForCallbackIdAsync( + DurableFunctionDeployment deployment, string arn, TimeSpan timeout) + { + var history = await deployment.WaitForHistoryAsync( + arn, + h => h.Events?.Any(e => + e.CallbackStartedDetails != null + && !string.IsNullOrEmpty(e.CallbackStartedDetails.CallbackId)) ?? false, + timeout); + return history.Events? + .Where(e => e.CallbackStartedDetails != null + && !string.IsNullOrEmpty(e.CallbackStartedDetails.CallbackId)) + .Select(e => e.CallbackStartedDetails.CallbackId) + .FirstOrDefault(); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CallbackTimeoutTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CallbackTimeoutTest.cs new file mode 100644 index 000000000..84a504bce --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CallbackTimeoutTest.cs @@ -0,0 +1,81 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class CallbackTimeoutTest +{ + private readonly ITestOutputHelper _output; + public CallbackTimeoutTest(ITestOutputHelper output) => _output = output; + + /// + /// End-to-end timeout path for CreateCallbackAsync: + /// the workflow waits on a callback whose + /// elapses before any result is delivered. The service marks the callback as + /// TIMED_OUT, the SDK throws , and the + /// workflow surfaces FAILED with that exception type recorded. + /// + [Fact] + public async Task CallbackTimeout_SurfacesAsCallbackTimeoutException() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("CallbackTimeoutFunction"), + "cb-timeout", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "x"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Initial response: {responsePayload}"); + + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + // Capture the CallbackId before the timeout fires so we can assert it + // on the surfaced exception. CallbackStarted has the ID; CallbackTimedOut + // typically does not echo it back on the event. + var callbackId = await WaitForCallbackIdAsync(deployment, arn!, TimeSpan.FromSeconds(30)); + Assert.False(string.IsNullOrEmpty(callbackId)); + _output.WriteLine($"Service-allocated CallbackId: {callbackId}"); + + // The configured timeout is 10s; allow generous headroom for service + // latency (timer scheduling + re-invoke + Lambda cold start). + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("FAILED", status, ignoreCase: true); + + // The execution surfaces the SDK's CallbackTimeoutException to the user. + // ErrorObject.FromException records ErrorType as the FullName; verify both + // the type and that the recorded message mentions "timed out". + var execution = await deployment.GetExecutionAsync(arn!); + Assert.NotNull(execution.Error); + Assert.Equal(typeof(CallbackTimeoutException).FullName, execution.Error.ErrorType); + Assert.Contains("timed out", execution.Error.ErrorMessage, StringComparison.OrdinalIgnoreCase); + + // History records both Started and TimedOut for the same callback. + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Any(e => e.EventType == EventType.CallbackStarted) ?? false) + && (h.Events?.Any(e => e.EventType == EventType.CallbackTimedOut) ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + Assert.Single(events.Where(e => e.EventType == EventType.CallbackStarted)); + Assert.Single(events.Where(e => e.EventType == EventType.CallbackTimedOut)); + } + + private static async Task WaitForCallbackIdAsync( + DurableFunctionDeployment deployment, string arn, TimeSpan timeout) + { + var history = await deployment.WaitForHistoryAsync( + arn, + h => h.Events?.Any(e => + e.CallbackStartedDetails != null + && !string.IsNullOrEmpty(e.CallbackStartedDetails.CallbackId)) ?? false, + timeout); + return history.Events? + .Where(e => e.CallbackStartedDetails != null + && !string.IsNullOrEmpty(e.CallbackStartedDetails.CallbackId)) + .Select(e => e.CallbackStartedDetails.CallbackId) + .FirstOrDefault(); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CreateCallbackHappyPathTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CreateCallbackHappyPathTest.cs new file mode 100644 index 000000000..97f098a41 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/CreateCallbackHappyPathTest.cs @@ -0,0 +1,97 @@ +using System.IO; +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class CreateCallbackHappyPathTest +{ + private readonly ITestOutputHelper _output; + public CreateCallbackHappyPathTest(ITestOutputHelper output) => _output = output; + + /// + /// End-to-end happy path for CreateCallbackAsync: + /// the workflow suspends inside GetResultAsync; the test acts as the + /// external system and delivers a result via SendDurableExecutionCallbackSuccess; + /// the workflow resumes and returns the delivered payload. + /// + [Fact] + public async Task CreateCallback_DeliversResultViaSendSuccess() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("CreateCallbackHappyPathFunction"), + "cb-happy", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "approve-123"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Initial response: {responsePayload}"); + + // The workflow suspends after CreateCallback's START checkpoint; locate the + // execution by name and pull the service-allocated CallbackId from history. + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + var callbackId = await WaitForCallbackIdAsync(deployment, arn!, TimeSpan.FromSeconds(60)); + Assert.False(string.IsNullOrEmpty(callbackId), "CallbackStarted event never appeared with a CallbackId"); + _output.WriteLine($"Service-allocated CallbackId: {callbackId}"); + + // Act as the external system: deliver a result. The service will re-invoke the + // Lambda with CALLBACK SUCCEEDED, GetResultAsync deserializes it, and the + // workflow returns. + var resultJson = """{"Status":"approved","ApprovedBy":"integ-test"}"""; + await deployment.LambdaClient.SendDurableExecutionCallbackSuccessAsync( + new SendDurableExecutionCallbackSuccessRequest + { + CallbackId = callbackId!, + Result = new MemoryStream(Encoding.UTF8.GetBytes(resultJson)) + }); + + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + // The execution result mirrors the payload we sent — proves GetResultAsync + // deserialized the wire-level callback Result and the workflow returned it. + var execution = await deployment.GetExecutionAsync(arn!); + Assert.NotNull(execution.Result); + Assert.Contains("approved", execution.Result); + Assert.Contains("integ-test", execution.Result); + + // History shows the canonical callback lifecycle: Started then Succeeded. + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Any(e => e.EventType == EventType.CallbackStarted) ?? false) + && (h.Events?.Any(e => e.EventType == EventType.CallbackSucceeded) ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + Assert.Single(events.Where(e => e.EventType == EventType.CallbackStarted)); + Assert.Single(events.Where(e => e.EventType == EventType.CallbackSucceeded)); + + var succeeded = events.First(e => e.CallbackSucceededDetails != null); + Assert.Equal("approve", succeeded.Name); + } + + /// + /// Polls execution history until a CallbackStarted event surfaces a + /// CallbackId. The history endpoint is eventually consistent and the + /// callback ID isn't allocated until the service processes the START checkpoint. + /// + private static async Task WaitForCallbackIdAsync( + DurableFunctionDeployment deployment, string arn, TimeSpan timeout) + { + var history = await deployment.WaitForHistoryAsync( + arn, + h => h.Events?.Any(e => + e.CallbackStartedDetails != null + && !string.IsNullOrEmpty(e.CallbackStartedDetails.CallbackId)) ?? false, + timeout); + return history.Events? + .Where(e => e.CallbackStartedDetails != null + && !string.IsNullOrEmpty(e.CallbackStartedDetails.CallbackId)) + .Select(e => e.CallbackStartedDetails.CallbackId) + .FirstOrDefault(); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/CallbackFailedFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/CallbackFailedFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/CallbackFailedFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/Function.cs new file mode 100644 index 000000000..f74f28374 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/Function.cs @@ -0,0 +1,35 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + // Test will deliver a failure via SendDurableExecutionCallbackFailure; + // GetResultAsync should raise CallbackFailedException, which the + // workflow does not catch — workflow surfaces FAILED. + var cb = await context.CreateCallbackAsync(name: "approve"); + var result = await cb.GetResultAsync(); + return result; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class MyResult { public string? Status { get; set; } public string? ApprovedBy { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/CallbackTimeoutFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/CallbackTimeoutFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/CallbackTimeoutFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/Function.cs new file mode 100644 index 000000000..6e43f4d6c --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackTimeoutFunction/Function.cs @@ -0,0 +1,37 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + // The test deliberately never delivers the callback. The service should + // fire the timeout, mark the callback TIMED_OUT, and the SDK should + // surface CallbackTimeoutException to the workflow. + var cb = await context.CreateCallbackAsync( + name: "approve", + config: new CallbackConfig { Timeout = TimeSpan.FromSeconds(10) }); + var result = await cb.GetResultAsync(); + return result; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class MyResult { public string? Status { get; set; } public string? ApprovedBy { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/CreateCallbackHappyPathFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/CreateCallbackHappyPathFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/CreateCallbackHappyPathFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/Function.cs new file mode 100644 index 000000000..a966a516e --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/Function.cs @@ -0,0 +1,34 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + // External system will call SendDurableExecutionCallbackSuccess with the + // approval result. Until then this Lambda suspends on GetResultAsync. + var cb = await context.CreateCallbackAsync(name: "approve"); + var result = await cb.GetResultAsync(); + return result; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class MyResult { public string? Status { get; set; } public string? ApprovedBy { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/Function.cs new file mode 100644 index 000000000..7dc148564 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/Function.cs @@ -0,0 +1,56 @@ +using System.IO; +using System.Text; +using Amazon.Lambda; +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.Model; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + // Reuse a single Lambda client across submitter invocations. + private static readonly IAmazonLambda LambdaClient = new AmazonLambdaClient(); + + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + // The submitter is called once with a freshly-allocated callback ID. + // For a deterministic happy-path integration test we self-resolve the + // callback by calling SendDurableExecutionCallbackSuccess directly — + // modelling a "fire-and-resolve" external system. After the submitter + // returns the SDK suspends; the service re-invokes once it has processed + // the resolution, and WaitForCallbackAsync returns the deserialized result. + var result = await context.WaitForCallbackAsync( + submitter: async (callbackId, cbCtx) => + { + var resultJson = $$"""{"Status":"approved","ApprovedBy":"{{input.OrderId}}"}"""; + await LambdaClient.SendDurableExecutionCallbackSuccessAsync( + new SendDurableExecutionCallbackSuccessRequest + { + CallbackId = callbackId, + Result = new MemoryStream(Encoding.UTF8.GetBytes(resultJson)) + }); + }, + name: "approve"); + + return result; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class MyResult { public string? Status { get; set; } public string? ApprovedBy { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/WaitForCallbackHappyPathFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/WaitForCallbackHappyPathFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/WaitForCallbackHappyPathFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/Function.cs new file mode 100644 index 000000000..a940d9dd8 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/Function.cs @@ -0,0 +1,43 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + // The submitter throws on every attempt. With RetryStrategy.None the + // SDK should fail terminally on the first attempt and surface the + // failure as CallbackSubmitterException. The workflow does not catch + // it, so the durable execution surfaces FAILED with that exception. + var result = await context.WaitForCallbackAsync( + submitter: async (callbackId, cbCtx) => + { + await Task.CompletedTask; + throw new InvalidOperationException("submitter intentional failure"); + }, + name: "approve", + config: new WaitForCallbackConfig { RetryStrategy = RetryStrategy.None }); + + return result; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class MyResult { public string? Status { get; set; } public string? ApprovedBy { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/WaitForCallbackSubmitterFailsFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/WaitForCallbackSubmitterFailsFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/WaitForCallbackSubmitterFailsFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForCallbackHappyPathTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForCallbackHappyPathTest.cs new file mode 100644 index 000000000..ef883c043 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForCallbackHappyPathTest.cs @@ -0,0 +1,69 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class WaitForCallbackHappyPathTest +{ + private readonly ITestOutputHelper _output; + public WaitForCallbackHappyPathTest(ITestOutputHelper output) => _output = output; + + /// + /// End-to-end happy path for WaitForCallbackAsync: + /// the submitter step inside the workflow self-resolves the callback by + /// calling SendDurableExecutionCallbackSuccess. The composite + /// operation suspends after the submitter step completes, the service + /// re-invokes the Lambda once the callback is resolved, and + /// WaitForCallbackAsync returns the deserialized result. + /// + [Fact] + public async Task WaitForCallback_SubmitterDeliversResult_WorkflowCompletes() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("WaitForCallbackHappyPathFunction"), + "wfcb-happy", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "approver-1"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Initial response: {responsePayload}"); + + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + // The execution returns the payload the submitter delivered. + var execution = await deployment.GetExecutionAsync(arn!); + Assert.NotNull(execution.Result); + Assert.Contains("approved", execution.Result); + Assert.Contains("approver-1", execution.Result); + + // History records the canonical WaitForCallback lifecycle: + // submitter step Started + Succeeded, callback Started + Succeeded, + // and a containing context (CONTEXT operation) wrapping the pair. + var history = await deployment.WaitForHistoryAsync( + arn!, + h => (h.Events?.Any(e => e.EventType == EventType.CallbackStarted) ?? false) + && (h.Events?.Any(e => e.EventType == EventType.CallbackSucceeded) ?? false) + && (h.Events?.Any(e => e.EventType == EventType.StepSucceeded) ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + Assert.Single(events.Where(e => e.EventType == EventType.CallbackStarted)); + Assert.Single(events.Where(e => e.EventType == EventType.CallbackSucceeded)); + + // The submitter ran exactly once and succeeded — the SDK's "callback + // already resolved" branch must NOT have re-run it on replay. Filter + // on a name that the SDK uses for the submitter step (typically + // matches the WaitForCallback name). + var submitterSteps = events + .Where(e => e.EventType == EventType.StepSucceeded + || e.EventType == EventType.StepStarted) + .ToList(); + Assert.NotEmpty(submitterSteps); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForCallbackSubmitterFailsTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForCallbackSubmitterFailsTest.cs new file mode 100644 index 000000000..4c74fc8fe --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/WaitForCallbackSubmitterFailsTest.cs @@ -0,0 +1,66 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class WaitForCallbackSubmitterFailsTest +{ + private readonly ITestOutputHelper _output; + public WaitForCallbackSubmitterFailsTest(ITestOutputHelper output) => _output = output; + + /// + /// End-to-end submitter-failure path for WaitForCallbackAsync: + /// the submitter throws on attempt 1 with ; + /// the SDK fails the composite operation terminally and surfaces + /// . The workflow surfaces FAILED. + /// + [Fact] + public async Task WaitForCallback_SubmitterThrows_SurfacesAsCallbackSubmitterException() + { + await using var deployment = await DurableFunctionDeployment.CreateAsync( + DurableFunctionDeployment.FindTestFunctionDir("WaitForCallbackSubmitterFailsFunction"), + "wfcb-fail", _output); + + var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "x"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Initial response: {responsePayload}"); + + var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("FAILED", status, ignoreCase: true); + + // The workflow surfaces CallbackSubmitterException — the SDK's wrapper + // type around the failed submitter step. Verify both the recorded + // ErrorType and that the original "submitter intentional failure" + // message survives in the error chain. + var execution = await deployment.GetExecutionAsync(arn!); + Assert.NotNull(execution.Error); + Assert.Equal(typeof(CallbackSubmitterException).FullName, execution.Error.ErrorType); + // ErrorObject.FromException records the outer exception's Message; that + // message should reference the submitter failure context. Be lenient + // about exact wording since the SDK may prepend / wrap the inner. + Assert.False(string.IsNullOrEmpty(execution.Error.ErrorMessage)); + + // History records the submitter step failed exactly once — RetryStrategy.None + // means no retries — and no callback was ever started since the submitter + // never delivered the ID. + var history = await deployment.WaitForHistoryAsync( + arn!, + h => h.Events?.Any(e => e.StepFailedDetails != null) ?? false, + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + var stepFailures = events.Where(e => e.StepFailedDetails != null).ToList(); + Assert.Single(stepFailures); + var failureMessage = stepFailures[0].StepFailedDetails.Error?.Payload?.ErrorMessage ?? string.Empty; + Assert.Contains("submitter intentional failure", failureMessage); + + // No SUCCEEDED step events — the submitter never succeeded. + Assert.Empty(events.Where(e => e.StepSucceededDetails != null)); + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/CallbackOperationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/CallbackOperationTests.cs new file mode 100644 index 000000000..b89b17fee --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/CallbackOperationTests.cs @@ -0,0 +1,497 @@ +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.DurableExecution.Internal; +using Amazon.Lambda.TestUtilities; +using Xunit; + +namespace Amazon.Lambda.DurableExecution.Tests; + +public class CallbackOperationTests +{ + /// Reproduces the Id that emits for the n-th root-level operation. + private static string IdAt(int position) => OperationIdGenerator.HashOperationId(position.ToString()); + + private static (DurableContext context, RecordingBatcher recorder, TerminationManager tm, ExecutionState state) + CreateContext(InitialExecutionState? initialState = null) + { + var state = new ExecutionState(); + state.LoadFromCheckpoint(initialState); + var tm = new TerminationManager(); + var idGen = new OperationIdGenerator(); + var lambdaContext = new TestLambdaContext(); + var recorder = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + return (context, recorder, tm, state); + } + + /// + /// Wires a recorder so that the next CALLBACK START flush stamps the given + /// callback ID into — modeling the durable-execution + /// service's NewExecutionState response that allocates the ID. + /// + private static void WireServiceCallbackIdAllocation( + RecordingBatcher recorder, ExecutionState state, string callbackId) + { + recorder.OnFlush = ops => + { + foreach (var op in ops) + { + if (op.Type == OperationTypes.Callback && op.Action == "START") + { + state.AddOperations(new[] + { + new Operation + { + Id = op.Id, + Type = OperationTypes.Callback, + Status = OperationStatuses.Started, + Name = op.Name, + CallbackDetails = new CallbackDetails { CallbackId = callbackId } + } + }); + } + } + }; + } + + [Fact] + public async Task CreateCallbackAsync_FreshExecution_FlushesStartAndReturnsCallbackId() + { + var (context, recorder, tm, state) = CreateContext(); + WireServiceCallbackIdAllocation(recorder, state, "cb-abc-123"); + + var callback = await context.CreateCallbackAsync(name: "approval"); + + Assert.Equal("cb-abc-123", callback.CallbackId); + Assert.False(tm.IsTerminated); + + await recorder.Batcher.DrainAsync(); + + // CreateCallbackAsync sync-flushes a single START checkpoint. + var single = Assert.Single(recorder.Flushed); + Assert.Equal(OperationTypes.Callback, single.Type); + Assert.Equal("START", single.Action); + Assert.Equal(OperationSubTypes.Callback, single.SubType); + Assert.Equal("approval", single.Name); + Assert.Equal(IdAt(1), single.Id); + } + + [Fact] + public async Task CreateCallbackAsync_FreshExecution_NoConfig_DoesNotEmitCallbackOptions() + { + var (context, recorder, _, state) = CreateContext(); + WireServiceCallbackIdAllocation(recorder, state, "cb-1"); + + await context.CreateCallbackAsync(name: "no_options"); + + await recorder.Batcher.DrainAsync(); + + var single = Assert.Single(recorder.Flushed); + Assert.Null(single.CallbackOptions); + } + + [Fact] + public async Task CreateCallbackAsync_FreshExecution_WithConfig_EmitsCallbackOptions() + { + var (context, recorder, _, state) = CreateContext(); + WireServiceCallbackIdAllocation(recorder, state, "cb-1"); + + await context.CreateCallbackAsync( + name: "with_options", + config: new CallbackConfig + { + Timeout = TimeSpan.FromHours(1), + HeartbeatTimeout = TimeSpan.FromMinutes(5) + }); + + await recorder.Batcher.DrainAsync(); + + var single = Assert.Single(recorder.Flushed); + Assert.NotNull(single.CallbackOptions); + Assert.Equal(3600, single.CallbackOptions.TimeoutSeconds); + Assert.Equal(300, single.CallbackOptions.HeartbeatTimeoutSeconds); + } + + [Fact] + public async Task CreateCallbackAsync_FreshExecution_OnlyTimeout_EmitsOnlyTimeout() + { + var (context, recorder, _, state) = CreateContext(); + WireServiceCallbackIdAllocation(recorder, state, "cb-1"); + + await context.CreateCallbackAsync( + config: new CallbackConfig { Timeout = TimeSpan.FromSeconds(45) }); + + await recorder.Batcher.DrainAsync(); + + var single = Assert.Single(recorder.Flushed); + Assert.NotNull(single.CallbackOptions); + Assert.Equal(45, single.CallbackOptions.TimeoutSeconds); + // HeartbeatTimeout was not set → property remains at its default + // (the AWS SDK Marshaller will not serialize the field). + Assert.True( + single.CallbackOptions.HeartbeatTimeoutSeconds == null + || single.CallbackOptions.HeartbeatTimeoutSeconds == 0); + } + + [Fact] + public async Task CreateCallbackAsync_ServiceMissingCallbackId_ThrowsNonDeterministic() + { + // Service doesn't stamp a CallbackId — RecordingBatcher's OnFlush left unset. + var (context, _, _, _) = CreateContext(); + + var ex = await Assert.ThrowsAsync(() => + context.CreateCallbackAsync(name: "broken")); + Assert.Contains("CallbackId", ex.Message); + } + + [Fact] + public async Task GetResultAsync_FreshExecution_SuspendsExecution() + { + var (context, recorder, tm, state) = CreateContext(); + WireServiceCallbackIdAllocation(recorder, state, "cb-1"); + + var callback = await context.CreateCallbackAsync(name: "approval"); + + // GetResultAsync should signal termination and return a never-completing task. + var resultTask = callback.GetResultAsync(); + await Task.Delay(10); + + Assert.True(tm.IsTerminated); + Assert.False(resultTask.IsCompleted); + } + + [Fact] + public async Task ReplayStarted_DoesNotReFlushStart_AndSuspendsOnGetResult() + { + // STARTED on replay = service has stamped CallbackId but no terminal yet. + var (context, recorder, tm, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Callback, + Status = OperationStatuses.Started, + Name = "approval", + CallbackDetails = new CallbackDetails { CallbackId = "cb-replay-1" } + } + } + }); + + var callback = await context.CreateCallbackAsync(name: "approval"); + Assert.Equal("cb-replay-1", callback.CallbackId); + Assert.False(tm.IsTerminated); + + var resultTask = callback.GetResultAsync(); + await Task.Delay(10); + + Assert.True(tm.IsTerminated); + Assert.False(resultTask.IsCompleted); + + // No new checkpoints — replay path doesn't re-flush START. + await recorder.Batcher.DrainAsync(); + Assert.Empty(recorder.Flushed); + } + + [Fact] + public async Task ReplaySucceeded_GetResultDeserializes_NoSuspension() + { + var (context, recorder, tm, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Callback, + Status = OperationStatuses.Succeeded, + Name = "approval", + CallbackDetails = new CallbackDetails + { + CallbackId = "cb-done-1", + Result = "\"approved\"" + } + } + } + }); + + var callback = await context.CreateCallbackAsync(name: "approval"); + var result = await callback.GetResultAsync(); + + Assert.Equal("cb-done-1", callback.CallbackId); + Assert.Equal("approved", result); + Assert.False(tm.IsTerminated); + + await recorder.Batcher.DrainAsync(); + Assert.Empty(recorder.Flushed); + } + + [Fact] + public async Task ReplaySucceeded_NullResultReturnsDefault() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Callback, + Status = OperationStatuses.Succeeded, + Name = "no_payload", + CallbackDetails = new CallbackDetails { CallbackId = "cb-1" } + } + } + }); + + var callback = await context.CreateCallbackAsync(name: "no_payload"); + var result = await callback.GetResultAsync(); + Assert.Null(result); + } + + [Fact] + public async Task ReplayFailed_GetResultThrowsCallbackFailedException() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Callback, + Status = OperationStatuses.Failed, + Name = "approval", + CallbackDetails = new CallbackDetails + { + CallbackId = "cb-fail-1", + Error = new ErrorObject + { + ErrorType = "ExternalSystemError", + ErrorMessage = "rejected by reviewer", + ErrorData = "{\"reviewer\":\"jane\"}" + } + } + } + } + }); + + var callback = await context.CreateCallbackAsync(name: "approval"); + + var ex = await Assert.ThrowsAsync(() => callback.GetResultAsync()); + Assert.IsAssignableFrom(ex); + Assert.Equal("rejected by reviewer", ex.Message); + Assert.Equal("cb-fail-1", ex.CallbackId); + Assert.Equal("ExternalSystemError", ex.ErrorType); + Assert.Equal("{\"reviewer\":\"jane\"}", ex.ErrorData); + } + + [Fact] + public async Task ReplayTimedOut_GetResultThrowsCallbackTimeoutException() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Callback, + Status = OperationStatuses.TimedOut, + Name = "approval", + CallbackDetails = new CallbackDetails + { + CallbackId = "cb-to-1", + Error = new ErrorObject + { + ErrorMessage = "callback timed out after 24h" + } + } + } + } + }); + + var callback = await context.CreateCallbackAsync(name: "approval"); + + var ex = await Assert.ThrowsAsync(() => callback.GetResultAsync()); + Assert.IsAssignableFrom(ex); + Assert.Equal("callback timed out after 24h", ex.Message); + Assert.Equal("cb-to-1", ex.CallbackId); + } + + [Fact] + public async Task ReplayTimedOut_NoErrorDetails_DefaultMessage() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Callback, + Status = OperationStatuses.TimedOut, + Name = "approval", + CallbackDetails = new CallbackDetails { CallbackId = "cb-1" } + } + } + }); + + var callback = await context.CreateCallbackAsync(name: "approval"); + var ex = await Assert.ThrowsAsync(() => callback.GetResultAsync()); + Assert.Equal("Callback timed out", ex.Message); + } + + [Fact] + public async Task ReplayUnknownStatus_ThrowsNonDeterministic() + { + // Replay must throw on unexpected statuses (CANCELLED, garbage, etc.) + // rather than silently degrading to a suspend. Mirrors WaitOperation + // and ChildContextOperation's `default:` arms. + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Callback, + Status = "CANCELLED", + Name = "approval", + CallbackDetails = new CallbackDetails { CallbackId = "cb-1" } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.CreateCallbackAsync(name: "approval")); + Assert.Contains("unexpected status", ex.Message); + Assert.Contains("CANCELLED", ex.Message); + } + + [Fact] + public async Task ReplayMissingCallbackId_ThrowsNonDeterministic() + { + // Replay path expects the CallbackId to be present. If it's absent, surface + // a clear non-deterministic error rather than letting users see a NRE later. + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Callback, + Status = OperationStatuses.Started, + Name = "broken", + CallbackDetails = new CallbackDetails { CallbackId = null } + } + } + }); + + await Assert.ThrowsAsync(() => + context.CreateCallbackAsync(name: "broken")); + } + + [Fact] + public async Task ReplayDeterministic_CallbackIdStableAcrossReplays() + { + // Round-trip: STARTED checkpoint with CallbackId X must yield the same X + // on replay so external systems' references remain valid. + const string id = "stable-cb-id-12345"; + + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Callback, + Status = OperationStatuses.Started, + Name = "approval", + CallbackDetails = new CallbackDetails { CallbackId = id } + } + } + }); + + var callback = await context.CreateCallbackAsync(name: "approval"); + Assert.Equal(id, callback.CallbackId); + } + + [Fact] + public async Task ReplayTypeMismatch_ThrowsNonDeterministic() + { + // What was a CALLBACK on a previous invocation is now arriving as something + // else — code drift detection. ExecutionState.ValidateReplayConsistency + // is the gate. + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + Status = OperationStatuses.Succeeded, + Name = "approval", + StepDetails = new StepDetails { Result = "\"ok\"" } + } + } + }); + + await Assert.ThrowsAsync(() => + context.CreateCallbackAsync(name: "approval")); + } + + [Fact] + public async Task CreateCallbackAsync_AOTOverload_UsesSuppliedSerializer() + { + // Custom serializer that prefixes deserialized strings — proves the AOT-safe + // overload is wired through and not silently using reflection JSON. + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Callback, + Status = OperationStatuses.Succeeded, + Name = "marker", + CallbackDetails = new CallbackDetails + { + CallbackId = "cb-1", + Result = "raw-payload" + } + } + } + }); + + var callback = await context.CreateCallbackAsync( + new MarkerSerializer(), + name: "marker"); + var result = await callback.GetResultAsync(); + + Assert.Equal("custom:raw-payload", result); + } + + [Fact] + public async Task CreateCallbackAsync_CallbackIdAccessBeforeStart_Throws() + { + // Direct construction of the CallbackOperation without going through + // ExecuteAsync — guard against bugs that try to read CallbackId early. + var op = new CallbackOperation( + "op-id", "name", null, new MarkerSerializer(), + new ExecutionState(), new TerminationManager(), "arn", batcher: null); + + Assert.Throws(() => _ = ((ICallback)op).CallbackId); + await Task.CompletedTask; + } + + private sealed class MarkerSerializer : ICheckpointSerializer + { + public string Serialize(string value, SerializationContext context) => $"custom:{value}"; + public string Deserialize(string data, SerializationContext context) => $"custom:{data}"; + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs index b624766eb..81a2275c8 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs @@ -10,6 +10,7 @@ using StepDetails = Amazon.Lambda.DurableExecution.Internal.StepDetails; using WaitDetails = Amazon.Lambda.DurableExecution.Internal.WaitDetails; using ExecutionDetails = Amazon.Lambda.DurableExecution.Internal.ExecutionDetails; +using CallbackDetails = Amazon.Lambda.DurableExecution.Internal.CallbackDetails; namespace Amazon.Lambda.DurableExecution.Tests; @@ -561,6 +562,183 @@ private static async Task SingleStepWorkflow(OrderEvent input, IDur return new OrderResult { Status = "done" }; } + [Fact] + public async Task WrapAsync_CreateCallbackThenWait_AllocatesCallbackIdAndSuspends() + { + // End-to-end through the real LambdaDurableServiceClient: the mock + // client returns NewExecutionState carrying a CallbackId on the + // CALLBACK START checkpoint response, and the SDK plumbs it through. + var input = new DurableExecutionInvocationInput + { + DurableExecutionArn = "arn:aws:lambda:us-east-1:123:durable-execution:cb-test", + InitialExecutionState = new InitialExecutionState + { + Operations = new List + { + new() + { + Id = "exec-0", + Type = OperationTypes.Execution, + Status = OperationStatuses.Started, + ExecutionDetails = new ExecutionDetails { InputPayload = "{\"OrderId\":\"o-1\"}" } + } + } + } + }; + + var capturedCallbackId = (string?)null; + var mockClient = new MockLambdaClient + { + CheckpointHandler = req => + { + // Echo back any CALLBACK START as a STARTED op with a service-allocated id. + var newOps = new List(); + foreach (var u in req.Updates) + { + if (u.Type == OperationTypes.Callback && u.Action == "START") + { + newOps.Add(new Amazon.Lambda.Model.Operation + { + Id = u.Id, + Type = OperationTypes.Callback, + Status = OperationStatuses.Started, + Name = u.Name, + CallbackDetails = new Amazon.Lambda.Model.CallbackDetails + { + CallbackId = "servicealloccbid" + } + }); + } + } + return new Amazon.Lambda.Model.CheckpointDurableExecutionResponse + { + NewExecutionState = newOps.Count == 0 + ? null + : new Amazon.Lambda.Model.CheckpointUpdatedExecutionState { Operations = newOps } + }; + } + }; + + var output = await DurableFunction.WrapAsync( + async (e, ctx) => + { + var cb = await ctx.CreateCallbackAsync(name: "approval"); + capturedCallbackId = cb.CallbackId; + var status = await cb.GetResultAsync(); + return new OrderResult { Status = status, OrderId = e.OrderId }; + }, + input, + new TestLambdaContext(), + mockClient); + + Assert.Equal(InvocationStatus.Pending, output.Status); + Assert.Equal("servicealloccbid", capturedCallbackId); + } + + [Fact] + public async Task WrapAsync_ReplayCallbackSucceeded_ReturnsResultAfterSuspend() + { + // Second invocation: the callback's checkpoint is now SUCCEEDED; + // the workflow returns the deserialized result. + var input = new DurableExecutionInvocationInput + { + DurableExecutionArn = "arn:aws:lambda:us-east-1:123:durable-execution:cb-test", + InitialExecutionState = new InitialExecutionState + { + Operations = new List + { + new() + { + Id = "exec-0", + Type = OperationTypes.Execution, + Status = OperationStatuses.Started, + ExecutionDetails = new ExecutionDetails { InputPayload = "{\"OrderId\":\"o-1\"}" } + }, + new() + { + Id = IdAt(1), + Type = OperationTypes.Callback, + Status = OperationStatuses.Succeeded, + Name = "approval", + CallbackDetails = new CallbackDetails + { + CallbackId = "servicealloccbid", + Result = "\"approved\"" + } + } + } + } + }; + + var output = await DurableFunction.WrapAsync( + async (e, ctx) => + { + var cb = await ctx.CreateCallbackAsync(name: "approval"); + var status = await cb.GetResultAsync(); + return new OrderResult { Status = status, OrderId = e.OrderId }; + }, + input, + new TestLambdaContext(), + new MockLambdaClient()); + + Assert.Equal(InvocationStatus.Succeeded, output.Status); + Assert.NotNull(output.Result); + var result = JsonSerializer.Deserialize(output.Result!); + Assert.Equal("approved", result!.Status); + } + + [Fact] + public async Task WrapAsync_ReplayDeterminism_CallbackIdStableAcrossInvocations() + { + // First invocation allocates a callback ID via the mock; in a real run + // that ID would be persisted in the service's checkpoint state and + // returned to the second invocation via InitialExecutionState. Verify + // the same ID survives that round-trip (we model "round-trip" by + // replaying with a STARTED checkpoint that carries the same ID). + const string id = "stablecbidreplay"; + var input = new DurableExecutionInvocationInput + { + DurableExecutionArn = "arn:test", + InitialExecutionState = new InitialExecutionState + { + Operations = new List + { + new() + { + Id = "exec-0", + Type = OperationTypes.Execution, + Status = OperationStatuses.Started, + ExecutionDetails = new ExecutionDetails { InputPayload = "{\"OrderId\":\"o-1\"}" } + }, + new() + { + Id = IdAt(1), + Type = OperationTypes.Callback, + Status = OperationStatuses.Started, + Name = "approval", + CallbackDetails = new CallbackDetails { CallbackId = id } + } + } + } + }; + + string? observed = null; + var output = await DurableFunction.WrapAsync( + async (e, ctx) => + { + var cb = await ctx.CreateCallbackAsync(name: "approval"); + observed = cb.CallbackId; + var status = await cb.GetResultAsync(); + return new OrderResult { Status = status, OrderId = e.OrderId }; + }, + input, + new TestLambdaContext(), + new MockLambdaClient()); + + Assert.Equal(InvocationStatus.Pending, output.Status); + Assert.Equal(id, observed); + } + private static async Task MyWorkflow(OrderEvent input, IDurableContext context) { var validation = await context.StepAsync( diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExceptionsTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExceptionsTests.cs index 7105849bb..4bca933be 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExceptionsTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExceptionsTests.cs @@ -65,4 +65,87 @@ public void StepException_HasErrorProperties() Assert.Equal("operation timed out", ex.ErrorData); Assert.Equal(2, ex.OriginalStackTrace!.Count); } + + [Fact] + public void CallbackException_BaseClassCtors() + { + var empty = new CallbackException(); + Assert.IsAssignableFrom(empty); + + var withMsg = new CallbackException("cb error"); + Assert.Equal("cb error", withMsg.Message); + + var inner = new InvalidOperationException("inner"); + var wrapping = new CallbackException("outer", inner); + Assert.Same(inner, wrapping.InnerException); + } + + [Fact] + public void CallbackException_InitProperties() + { + var ex = new CallbackException("rejected") + { + CallbackId = "cb-1", + ErrorType = "ExternalSystemError", + ErrorData = "{\"reviewer\":\"jane\"}", + OriginalStackTrace = new[] { "at A.B()" } + }; + + Assert.Equal("cb-1", ex.CallbackId); + Assert.Equal("ExternalSystemError", ex.ErrorType); + Assert.Equal("{\"reviewer\":\"jane\"}", ex.ErrorData); + Assert.Single(ex.OriginalStackTrace!); + } + + [Fact] + public void CallbackFailedException_IsCallbackException() + { + var ex = new CallbackFailedException("rejected") { CallbackId = "cb-1" }; + Assert.IsAssignableFrom(ex); + Assert.IsAssignableFrom(ex); + Assert.Equal("rejected", ex.Message); + Assert.Equal("cb-1", ex.CallbackId); + } + + [Fact] + public void CallbackFailedException_AllCtors() + { + Assert.NotNull(new CallbackFailedException()); + Assert.Equal("m", new CallbackFailedException("m").Message); + var inner = new Exception("inner"); + Assert.Same(inner, new CallbackFailedException("m", inner).InnerException); + } + + [Fact] + public void CallbackTimeoutException_IsCallbackException() + { + var ex = new CallbackTimeoutException("timed out") { CallbackId = "cb-1" }; + Assert.IsAssignableFrom(ex); + Assert.Equal("timed out", ex.Message); + } + + [Fact] + public void CallbackTimeoutException_AllCtors() + { + Assert.NotNull(new CallbackTimeoutException()); + Assert.Equal("m", new CallbackTimeoutException("m").Message); + var inner = new Exception("inner"); + Assert.Same(inner, new CallbackTimeoutException("m", inner).InnerException); + } + + [Fact] + public void CallbackSubmitterException_IsCallbackException() + { + var inner = new StepException("submitter failed"); + var ex = new CallbackSubmitterException("submitter failed", inner); + Assert.IsAssignableFrom(ex); + Assert.Same(inner, ex.InnerException); + } + + [Fact] + public void CallbackSubmitterException_AllCtors() + { + Assert.NotNull(new CallbackSubmitterException()); + Assert.Equal("m", new CallbackSubmitterException("m").Message); + } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/MockLambdaClient.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/MockLambdaClient.cs index 8df98a67d..98ef36cb3 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/MockLambdaClient.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/MockLambdaClient.cs @@ -38,12 +38,29 @@ public MockLambdaClient() : base("fake-access-key", "fake-secret-key", Amazon.Re /// public Exception? GetExecutionStateThrows { get; set; } + /// + /// Optional handler that produces a + /// per request. Tests modeling the durable-execution service's + /// NewExecutionState response (e.g. stamping a CallbackId onto a + /// freshly-started CALLBACK op) wire this up. When null, a default + /// response is produced with only the auto-incremented checkpoint token. + /// + public Func? CheckpointHandler { get; set; } + public override Task CheckpointDurableExecutionAsync( CheckpointDurableExecutionRequest request, CancellationToken cancellationToken = default) { CheckpointCalls.Add(request); if (CheckpointThrows != null) throw CheckpointThrows; + if (CheckpointHandler != null) + { + var resp = CheckpointHandler(request); + // Auto-fill token if the test left it blank. + if (string.IsNullOrEmpty(resp.CheckpointToken)) + resp.CheckpointToken = $"token-{++_tokenCounter}"; + return Task.FromResult(resp); + } return Task.FromResult(new CheckpointDurableExecutionResponse { CheckpointToken = $"token-{++_tokenCounter}" diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RecordingBatcher.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RecordingBatcher.cs index 8fe7b6d6d..11a34ffc6 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RecordingBatcher.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/RecordingBatcher.cs @@ -16,6 +16,15 @@ internal sealed class RecordingBatcher public CheckpointBatcher Batcher { get; } + /// + /// Optional hook invoked synchronously after each batch flush, with that + /// batch's updates. Tests modeling the durable-execution service's + /// NewExecutionState response (e.g. stamping a CallbackId onto a + /// freshly-started CALLBACK op) wire this up to mutate the test's + /// . + /// + public Action>? OnFlush { get; set; } + public RecordingBatcher(CheckpointBatcherConfig? config = null) { Batcher = new CheckpointBatcher("test-token", Flush, config); @@ -46,6 +55,7 @@ public IReadOnlyList FlushBatchSizes _flushed.AddRange(ops); _flushBatchSizes.Add(ops.Count); } + OnFlush?.Invoke(ops); return Task.FromResult(token); } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForCallbackTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForCallbackTests.cs new file mode 100644 index 000000000..d055475f4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForCallbackTests.cs @@ -0,0 +1,567 @@ +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.DurableExecution.Internal; +using Amazon.Lambda.TestUtilities; +using Xunit; + +namespace Amazon.Lambda.DurableExecution.Tests; + +public class WaitForCallbackTests +{ + /// Reproduces the Id that emits for the n-th root-level operation. + private static string IdAt(int position) => OperationIdGenerator.HashOperationId(position.ToString()); + + /// The hashed ID of the n-th child operation under . + private static string ChildIdAt(string parentOpId, int position) => + OperationIdGenerator.HashOperationId($"{parentOpId}-{position}"); + + private static (DurableContext context, RecordingBatcher recorder, TerminationManager tm, ExecutionState state) + CreateContext(InitialExecutionState? initialState = null) + { + var state = new ExecutionState(); + state.LoadFromCheckpoint(initialState); + var tm = new TerminationManager(); + var idGen = new OperationIdGenerator(); + var lambdaContext = new TestLambdaContext(); + var recorder = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + return (context, recorder, tm, state); + } + + private static void WireServiceCallbackIdAllocation( + RecordingBatcher recorder, ExecutionState state, string callbackId) + { + recorder.OnFlush = ops => + { + foreach (var op in ops) + { + if (op.Type == OperationTypes.Callback && op.Action == "START") + { + state.AddOperations(new[] + { + new Operation + { + Id = op.Id, + Type = OperationTypes.Callback, + Status = OperationStatuses.Started, + Name = op.Name, + CallbackDetails = new CallbackDetails { CallbackId = callbackId } + } + }); + } + } + }; + } + + [Fact] + public async Task WaitForCallbackAsync_FreshExecution_RunsSubmitterAndSuspendsForCallback() + { + var (context, recorder, tm, state) = CreateContext(); + WireServiceCallbackIdAllocation(recorder, state, "cb-wait-1"); + + string? receivedCallbackId = null; + var resultTask = context.WaitForCallbackAsync( + async (callbackId, ctx) => + { + receivedCallbackId = callbackId; + Assert.NotNull(ctx.Logger); + await Task.CompletedTask; + }, + name: "approval"); + + // Race the suspended user task against termination — same idiom as the + // production handler. Once Terminate() is called inside the inner + // GetResultAsync, this completes immediately. + var winner = await Task.WhenAny(resultTask, tm.TerminationTask); + Assert.Same(tm.TerminationTask, winner); + + Assert.True(tm.IsTerminated); + Assert.False(resultTask.IsCompleted); + Assert.Equal("cb-wait-1", receivedCallbackId); + + await recorder.Batcher.DrainAsync(); + + var actions = recorder.Flushed.Select(o => $"{o.Type}:{o.Action}:{o.SubType}").ToArray(); + Assert.Equal(new[] + { + $"{OperationTypes.Context}:START:{OperationSubTypes.WaitForCallback}", + $"{OperationTypes.Callback}:START:{OperationSubTypes.Callback}", + $"{OperationTypes.Step}:START:{OperationSubTypes.Step}", + $"{OperationTypes.Step}:SUCCEED:{OperationSubTypes.Step}", + }, actions); + } + + [Fact] + public async Task WaitForCallbackAsync_FreshExecution_KebabSuffixedSubOpNames() + { + var (context, recorder, tm, state) = CreateContext(); + WireServiceCallbackIdAllocation(recorder, state, "cb-1"); + + var resultTask = context.WaitForCallbackAsync( + async (_, _) => await Task.CompletedTask, + name: "approval"); + + await Task.WhenAny(resultTask, tm.TerminationTask); + await recorder.Batcher.DrainAsync(); + + var callbackStart = recorder.Flushed.Single(o => o.Type == OperationTypes.Callback); + var stepSucceed = recorder.Flushed.Single(o => o.Type == OperationTypes.Step && o.Action == "SUCCEED"); + + Assert.Equal("approval-callback", callbackStart.Name); + Assert.Equal("approval-submitter", stepSucceed.Name); + + // Avoid unobserved-task warning. + _ = resultTask; + } + + [Fact] + public async Task WaitForCallbackAsync_FreshExecution_NullParentName_LeavesSubOpsNameless() + { + var (context, recorder, tm, state) = CreateContext(); + WireServiceCallbackIdAllocation(recorder, state, "cb-1"); + + var resultTask = context.WaitForCallbackAsync( + async (_, _) => await Task.CompletedTask); + + await Task.WhenAny(resultTask, tm.TerminationTask); + await recorder.Batcher.DrainAsync(); + + var callbackStart = recorder.Flushed.Single(o => o.Type == OperationTypes.Callback); + var stepSucceed = recorder.Flushed.Single(o => o.Type == OperationTypes.Step && o.Action == "SUCCEED"); + + Assert.Null(callbackStart.Name); + Assert.Null(stepSucceed.Name); + + _ = resultTask; + } + + [Fact] + public async Task WaitForCallbackAsync_ChildOperationIdsDeterministic() + { + var (context, recorder, tm, state) = CreateContext(); + WireServiceCallbackIdAllocation(recorder, state, "cb-1"); + + var resultTask = context.WaitForCallbackAsync( + async (_, _) => await Task.CompletedTask, + name: "approval"); + + await Task.WhenAny(resultTask, tm.TerminationTask); + await recorder.Batcher.DrainAsync(); + + // Parent CONTEXT has IdAt(1); the inner callback is child #1, the inner + // submitter step is child #2 (under the same parent context op id). + var parentOpId = IdAt(1); + var callbackChildId = ChildIdAt(parentOpId, 1); + var submitterChildId = ChildIdAt(parentOpId, 2); + + Assert.Equal(callbackChildId, + recorder.Flushed.Single(o => o.Type == OperationTypes.Callback).Id); + Assert.Equal(submitterChildId, + recorder.Flushed.Single(o => o.Type == OperationTypes.Step && o.Action == "SUCCEED").Id); + + _ = resultTask; + } + + [Fact] + public async Task WaitForCallbackAsync_CallbackTimeoutInheritsFromConfig() + { + var (context, recorder, tm, state) = CreateContext(); + WireServiceCallbackIdAllocation(recorder, state, "cb-1"); + + var resultTask = context.WaitForCallbackAsync( + async (_, _) => await Task.CompletedTask, + name: "approval", + config: new WaitForCallbackConfig + { + Timeout = TimeSpan.FromHours(2), + HeartbeatTimeout = TimeSpan.FromMinutes(15), + }); + + await Task.WhenAny(resultTask, tm.TerminationTask); + await recorder.Batcher.DrainAsync(); + + var callbackStart = recorder.Flushed.Single(o => o.Type == OperationTypes.Callback); + Assert.NotNull(callbackStart.CallbackOptions); + Assert.Equal(7200, callbackStart.CallbackOptions.TimeoutSeconds); + Assert.Equal(900, callbackStart.CallbackOptions.HeartbeatTimeoutSeconds); + + _ = resultTask; + } + + [Fact] + public async Task WaitForCallbackAsync_ReplayWithCallbackSucceeded_ReturnsResult() + { + // Full replay: parent CONTEXT SUCCEEDED with the callback's deserialized + // payload as its checkpointed result. + var (context, recorder, tm, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Context, + Status = OperationStatuses.Succeeded, + Name = "approval", + SubType = OperationSubTypes.WaitForCallback, + ContextDetails = new ContextDetails { Result = "\"approved\"" } + } + } + }); + + var executed = false; + var result = await context.WaitForCallbackAsync( + async (_, _) => { executed = true; await Task.CompletedTask; }, + name: "approval"); + + Assert.False(executed); // Replay returns cached without re-running submitter. + Assert.Equal("approved", result); + Assert.False(tm.IsTerminated); + + await recorder.Batcher.DrainAsync(); + Assert.Empty(recorder.Flushed); + } + + [Fact] + public async Task WaitForCallbackAsync_ReplayCallbackTimedOut_ThrowsCallbackTimeoutException() + { + // Inside-out replay: parent CONTEXT is STARTED (still in flight), + // inner callback is TIMED_OUT, inner submitter step has SUCCEEDED. + var parentId = IdAt(1); + var callbackChildId = ChildIdAt(parentId, 1); + var submitterChildId = ChildIdAt(parentId, 2); + + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = parentId, + Type = OperationTypes.Context, + Status = OperationStatuses.Started, + Name = "approval", + SubType = OperationSubTypes.WaitForCallback, + }, + new() + { + Id = callbackChildId, + Type = OperationTypes.Callback, + Status = OperationStatuses.TimedOut, + Name = "approval-callback", + ParentId = parentId, + CallbackDetails = new CallbackDetails + { + CallbackId = "cb-to-1", + Error = new ErrorObject { ErrorMessage = "callback timed out" } + } + }, + new() + { + Id = submitterChildId, + Type = OperationTypes.Step, + Status = OperationStatuses.Succeeded, + Name = "approval-submitter", + ParentId = parentId, + StepDetails = new StepDetails { Result = "null" } + }, + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.WaitForCallbackAsync( + async (_, _) => await Task.CompletedTask, + name: "approval")); + + Assert.Equal("callback timed out", ex.Message); + Assert.Equal("cb-to-1", ex.CallbackId); + } + + [Fact] + public async Task WaitForCallbackAsync_ReplayCallbackFailed_ThrowsCallbackFailedException() + { + var parentId = IdAt(1); + var callbackChildId = ChildIdAt(parentId, 1); + var submitterChildId = ChildIdAt(parentId, 2); + + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = parentId, + Type = OperationTypes.Context, + Status = OperationStatuses.Started, + Name = "approval", + SubType = OperationSubTypes.WaitForCallback, + }, + new() + { + Id = callbackChildId, + Type = OperationTypes.Callback, + Status = OperationStatuses.Failed, + Name = "approval-callback", + ParentId = parentId, + CallbackDetails = new CallbackDetails + { + CallbackId = "cb-fail-1", + Error = new ErrorObject + { + ErrorType = "ExternalSystemError", + ErrorMessage = "external rejected" + } + } + }, + new() + { + Id = submitterChildId, + Type = OperationTypes.Step, + Status = OperationStatuses.Succeeded, + Name = "approval-submitter", + ParentId = parentId, + StepDetails = new StepDetails { Result = "null" } + }, + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.WaitForCallbackAsync( + async (_, _) => await Task.CompletedTask, + name: "approval")); + + Assert.Equal("external rejected", ex.Message); + Assert.Equal("cb-fail-1", ex.CallbackId); + Assert.Equal("ExternalSystemError", ex.ErrorType); + } + + [Fact] + public async Task WaitForCallbackAsync_SubmitterFails_ThrowsCallbackSubmitterException() + { + // Replay: parent CONTEXT is FAILED with a Step-error inside. + var parentId = IdAt(1); + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = parentId, + Type = OperationTypes.Context, + Status = OperationStatuses.Failed, + Name = "approval", + SubType = OperationSubTypes.WaitForCallback, + ContextDetails = new ContextDetails + { + Error = new ErrorObject + { + ErrorType = typeof(StepException).FullName, + ErrorMessage = "submitter API failed", + ErrorData = "{\"code\":\"500\"}", + } + } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.WaitForCallbackAsync( + async (_, _) => await Task.CompletedTask, + name: "approval")); + + Assert.IsAssignableFrom(ex); + Assert.Equal("submitter API failed", ex.Message); + // On the replay path the live StepException was lost across invocations; + // we preserve the StepException type-name string and carry the + // ChildContextException as the InnerException for traceability. + Assert.NotNull(ex.InnerException); + Assert.Equal(typeof(StepException).FullName, ex.ErrorType); + Assert.Equal("{\"code\":\"500\"}", ex.ErrorData); + } + + [Fact] + public async Task WaitForCallbackAsync_ReplayParentContextFailedWithCallbackTimeoutErrorType_PreservesSubclass() + { + // Subclass-fidelity guarantee: when the parent CONTEXT was checkpointed + // FAILED on a previous invocation with a CallbackTimeoutException + // ErrorType, replay must surface CallbackTimeoutException — not the + // more generic CallbackFailedException — so user catch blocks behave + // identically across live and replay paths. + var parentId = IdAt(1); + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = parentId, + Type = OperationTypes.Context, + Status = OperationStatuses.Failed, + Name = "approval", + SubType = OperationSubTypes.WaitForCallback, + ContextDetails = new ContextDetails + { + Error = new ErrorObject + { + ErrorType = typeof(CallbackTimeoutException).FullName, + ErrorMessage = "callback timed out after 24h", + } + } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.WaitForCallbackAsync( + async (_, _) => await Task.CompletedTask, + name: "approval")); + + // Concrete-type check: not just `is CallbackException` — must be the + // CallbackTimeoutException subclass exactly. + Assert.Equal(typeof(CallbackTimeoutException), ex.GetType()); + Assert.Equal("callback timed out after 24h", ex.Message); + Assert.Equal(typeof(CallbackTimeoutException).FullName, ex.ErrorType); + } + + [Fact] + public async Task WaitForCallbackAsync_ReplayParentContextFailedWithCallbackFailedErrorType_RemapsToCallbackFailed() + { + // Companion case: a stored CallbackFailedException ErrorType remaps to + // CallbackFailedException (not the base or CallbackTimeoutException). + var parentId = IdAt(1); + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = parentId, + Type = OperationTypes.Context, + Status = OperationStatuses.Failed, + Name = "approval", + SubType = OperationSubTypes.WaitForCallback, + ContextDetails = new ContextDetails + { + Error = new ErrorObject + { + ErrorType = typeof(CallbackFailedException).FullName, + ErrorMessage = "external rejected", + } + } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.WaitForCallbackAsync( + async (_, _) => await Task.CompletedTask, + name: "approval")); + + Assert.Equal(typeof(CallbackFailedException), ex.GetType()); + Assert.Equal("external rejected", ex.Message); + } + + [Fact] + public async Task WaitForCallbackAsync_RetryStrategyForwardedToSubmitterStep() + { + // Verifies the WaitForCallbackConfig.RetryStrategy gets passed into the + // submitter step's StepConfig (via the kebab "-submitter" inner step). + var (context, recorder, tm, state) = CreateContext(); + WireServiceCallbackIdAllocation(recorder, state, "cb-1"); + + var seenAttempts = new List(); + var resultTask = context.WaitForCallbackAsync( + async (_, ctx) => + { + // The submitter receives an IWaitForCallbackContext (no AttemptNumber) + // — but this test doesn't need to verify retry mechanics, only + // that the StepConfig with a retry strategy is wired through. + seenAttempts.Add(seenAttempts.Count + 1); + await Task.CompletedTask; + }, + name: "approval", + config: new WaitForCallbackConfig + { + RetryStrategy = new CountingRetryStrategy() + }); + + await Task.WhenAny(resultTask, tm.TerminationTask); + await recorder.Batcher.DrainAsync(); + + // Submitter ran exactly once (no failures to retry); a single STEP SUCCEED + // is sufficient evidence that the strategy was wired without throwing. + Assert.Single(recorder.Flushed.Where(o => o.Type == OperationTypes.Step && o.Action == "SUCCEED")); + + _ = resultTask; + } + + [Fact] + public async Task WaitForCallbackAsync_AOTOverload_UsesSuppliedSerializer() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Context, + Status = OperationStatuses.Succeeded, + Name = "approval", + SubType = OperationSubTypes.WaitForCallback, + ContextDetails = new ContextDetails { Result = "raw-payload" } + } + } + }); + + var result = await context.WaitForCallbackAsync( + async (_, _) => await Task.CompletedTask, + new MarkerSerializer(), + name: "approval"); + + Assert.Equal("custom:raw-payload", result); + } + + [Fact] + public async Task WaitForCallbackAsync_SubmitterContext_IsIWaitForCallbackContext_NotIStepContext() + { + // Verifies the submitter delegate receives our distinct + // IWaitForCallbackContext type (not IStepContext) — protects the + // architectural decision against accidental conflation. + var (context, recorder, tm, state) = CreateContext(); + WireServiceCallbackIdAllocation(recorder, state, "cb-1"); + + Type? observedContextType = null; + var resultTask = context.WaitForCallbackAsync( + async (_, ctx) => + { + observedContextType = ctx.GetType(); + await Task.CompletedTask; + }, + name: "approval"); + + await Task.WhenAny(resultTask, tm.TerminationTask); + await recorder.Batcher.DrainAsync(); + + Assert.NotNull(observedContextType); + Assert.True(typeof(IWaitForCallbackContext).IsAssignableFrom(observedContextType)); + Assert.False(typeof(IStepContext).IsAssignableFrom(observedContextType)); + + _ = resultTask; + } + + private sealed class CountingRetryStrategy : IRetryStrategy + { + public int Attempts; + public RetryDecision ShouldRetry(Exception exception, int attemptNumber) + { + Attempts = attemptNumber; + return RetryDecision.DoNotRetry(); + } + } + + private sealed class MarkerSerializer : ICheckpointSerializer + { + public string Serialize(string value, SerializationContext context) => $"custom:{value}"; + public string Deserialize(string data, SerializationContext context) => $"custom:{data}"; + } +}