From 797d9201742aa612b2050d0f110d3823a1e3dc61 Mon Sep 17 00:00:00 2001 From: Garrett Beatty Date: Thu, 14 May 2026 17:39:15 -0400 Subject: [PATCH] Add InvokeAsync for chained durable function calls (DOTNET-8661) Adds chained-function invocation to the .NET Durable Execution SDK. InvokeAsync calls another durable function as a separate execution; the caller suspends until the chained function completes, with the result checkpointed for replay. Public surface: - IDurableContext.InvokeAsync (reflection + AOT-safe overloads with positional ICheckpointSerializer and ICheckpointSerializer) - InvokeConfig with Timeout (currently [Obsolete] - reserved for a future ChainedInvokeOptions wire field) and TenantId for tenant propagation - Exception subclass tree: InvokeException base + InvokeFailedException, InvokeTimedOutException, InvokeStoppedException Internal: - InvokeOperation mirrors WaitOperation's sync-flush START + SuspendAndAwait pattern. Replay maps STARTED/PENDING to re-suspend; SUCCEEDED to cached deserialize; FAILED/TIMED_OUT/STOPPED to typed exception subclasses. - ExecutionState.IsTerminalStatus now includes TimedOut (was missing). - LambdaDurableServiceClient.MapFromSdkOperation now preserves ErrorData and StackTrace fields across all operation types (Step, ChildContext, ChainedInvoke). Pre-existing data-loss bug fix. - Reuses TerminationReason.InvokePending and OperationSubTypes.Invoke from Wave 0. Adds 38 unit tests + 4 integration tests covering happy path, failure propagation, tenant-ID propagation, and replay determinism. Extends DurableFunctionDeployment to support a downstream callee function with cross-Lambda IAM. Co-Authored-By: Claude Opus 4.7 (1M context) --- Docs/durable-execution-design.md | 22 +- .../Config/InvokeConfig.cs | 38 + .../DurableContext.cs | 59 ++ .../Exceptions/InvokeException.cs | 85 +++ .../IDurableContext.cs | 71 ++ .../Internal/ExecutionState.cs | 3 +- .../Internal/InvokeOperation.cs | 171 +++++ .../Services/LambdaDurableServiceClient.cs | 41 +- .../DurableFunctionDeployment.cs | 151 +++- .../InvokeFailureTest.cs | 70 ++ .../InvokeHappyPathTest.cs | 67 ++ .../InvokeReplayDeterminismTest.cs | 119 +++ .../InvokeWithTenantIdTest.cs | 64 ++ .../InvokeChildTenantFunction/Dockerfile | 7 + .../InvokeChildTenantFunction/Function.cs | 30 + .../InvokeChildTenantFunction.csproj | 18 + .../InvokeFailureChildFunction/Dockerfile | 7 + .../InvokeFailureChildFunction/Function.cs | 39 + .../InvokeFailureChildFunction.csproj | 18 + .../InvokeFailureParentFunction/Dockerfile | 7 + .../InvokeFailureParentFunction/Function.cs | 53 ++ .../InvokeFailureParentFunction.csproj | 18 + .../InvokeHappyPathChildFunction/Dockerfile | 7 + .../InvokeHappyPathChildFunction/Function.cs | 30 + .../InvokeHappyPathChildFunction.csproj | 18 + .../InvokeHappyPathParentFunction/Dockerfile | 7 + .../InvokeHappyPathParentFunction/Function.cs | 41 ++ .../InvokeHappyPathParentFunction.csproj | 18 + .../Dockerfile | 7 + .../Function.cs | 30 + ...nvokeReplayDeterminismChildFunction.csproj | 18 + .../Dockerfile | 7 + .../Function.cs | 52 ++ ...vokeReplayDeterminismParentFunction.csproj | 18 + .../InvokeWithTenantIdFunction/Dockerfile | 7 + .../InvokeWithTenantIdFunction/Function.cs | 43 ++ .../InvokeWithTenantIdFunction.csproj | 18 + .../ConfigTests.cs | 25 + .../ExceptionsTests.cs | 113 +++ .../ExecutionStateTests.cs | 28 + .../InvokeOperationTests.cs | 685 ++++++++++++++++++ .../LambdaDurableServiceClientTests.cs | 99 +++ 42 files changed, 2408 insertions(+), 21 deletions(-) create mode 100644 Libraries/src/Amazon.Lambda.DurableExecution/Config/InvokeConfig.cs create mode 100644 Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/InvokeException.cs create mode 100644 Libraries/src/Amazon.Lambda.DurableExecution/Internal/InvokeOperation.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/InvokeFailureTest.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/InvokeHappyPathTest.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/InvokeReplayDeterminismTest.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/InvokeWithTenantIdTest.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeChildTenantFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeChildTenantFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeChildTenantFunction/InvokeChildTenantFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureChildFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureChildFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureChildFunction/InvokeFailureChildFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureParentFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureParentFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureParentFunction/InvokeFailureParentFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathChildFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathChildFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathChildFunction/InvokeHappyPathChildFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathParentFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathParentFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathParentFunction/InvokeHappyPathParentFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismChildFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismChildFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismChildFunction/InvokeReplayDeterminismChildFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismParentFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismParentFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismParentFunction/InvokeReplayDeterminismParentFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeWithTenantIdFunction/Dockerfile create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeWithTenantIdFunction/Function.cs create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeWithTenantIdFunction/InvokeWithTenantIdFunction.csproj create mode 100644 Libraries/test/Amazon.Lambda.DurableExecution.Tests/InvokeOperationTests.cs diff --git a/Docs/durable-execution-design.md b/Docs/durable-execution-design.md index 402d689af..1dacd7814 100644 --- a/Docs/durable-execution-design.md +++ b/Docs/durable-execution-design.md @@ -1614,15 +1614,29 @@ public class CallbackException : DurableExecutionException } /// -/// Thrown when an invoked function fails. +/// Base exception for chained-invoke failures. Catch InvokeException +/// to handle every non-success terminal state uniformly, or pattern-match the +/// concrete subclasses (InvokeFailedException, InvokeTimedOutException, +/// InvokeStoppedException) to react differently to specific outcomes. +/// Mirrors the Java SDK's invoke exception tree. /// public class InvokeException : DurableExecutionException { - public string? FunctionName { get; } - public string? ErrorType { get; } - public string? ErrorData { get; } + public string? FunctionName { get; init; } + public string? ErrorType { get; init; } + public string? ErrorData { get; init; } + public IReadOnlyList? OriginalStackTrace { get; init; } } +/// The chained function ran and threw. +public class InvokeFailedException : InvokeException { } + +/// The chained function did not complete within the configured (or service) timeout. +public class InvokeTimedOutException : InvokeException { } + +/// The chained execution was stopped by the service before reaching a normal terminal state. +public class InvokeStoppedException : InvokeException { } + /// /// Thrown when a child context operation fails. /// diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/InvokeConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/InvokeConfig.cs new file mode 100644 index 000000000..9da7a2340 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/InvokeConfig.cs @@ -0,0 +1,38 @@ +namespace Amazon.Lambda.DurableExecution; + +/// +/// Configuration for chained invoke operations. +/// +/// +/// Use with +/// (and the AOT-safe overload) to configure a single chained invocation. Custom +/// payload/result serializers are supplied via the AOT-safe InvokeAsync +/// overload — there are intentionally no serializer fields here, matching the +/// pattern established by . +/// +public sealed class InvokeConfig +{ + /// + /// Reserved for a future SDK/service contract; not currently honored. + /// + /// + /// The AWS Lambda model ChainedInvokeOptions in + /// AWSSDK.Lambda 4.0.13.1 does not expose a per-invocation timeout + /// field, so this property is not propagated to the durable execution + /// service today. It is kept on the configuration surface so that callers + /// who set it now will pick up the intended behavior automatically once a + /// wire field is added (or via a follow-up SDK update); the service's + /// own service-level timeout still applies in the meantime. Track at + /// . + /// + [Obsolete("Timeout is reserved for a future SDK/service contract; not currently honored. Tracked at https://issues.amazon.com/issues/DOTNET-8661.")] + public TimeSpan Timeout { get; set; } = TimeSpan.Zero; + + /// + /// Optional tenant identifier propagated to the chained invocation via + /// ChainedInvokeOptions.TenantId. Used to route the invocation to a + /// tenant-isolated function. Matches the tenantId field on the + /// Python, JavaScript, and Java SDKs. + /// + public string? TenantId { get; set; } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs index 69e5e580c..d145d0d43 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs @@ -162,6 +162,65 @@ private Task RunChildContext( _state, _terminationManager, _durableExecutionArn, _batcher); return op.ExecuteAsync(cancellationToken); } + + [RequiresUnreferencedCode("Reflection-based JSON for TPayload/TResult. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + [RequiresDynamicCode("Reflection-based JSON for TPayload/TResult. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + public Task InvokeAsync( + string functionName, + TPayload payload, + string? name = null, + InvokeConfig? config = null, + CancellationToken cancellationToken = default) + => RunInvoke( + functionName, payload, + new ReflectionJsonCheckpointSerializer(), + new ReflectionJsonCheckpointSerializer(), + name, config, cancellationToken); + + public Task InvokeAsync( + string functionName, + TPayload payload, + ICheckpointSerializer payloadSerializer, + ICheckpointSerializer resultSerializer, + string? name = null, + InvokeConfig? config = null, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(payloadSerializer); + ArgumentNullException.ThrowIfNull(resultSerializer); + return RunInvoke( + functionName, payload, + payloadSerializer, resultSerializer, + name, config, cancellationToken); + } + + private Task RunInvoke( + string functionName, + TPayload payload, + ICheckpointSerializer payloadSerializer, + ICheckpointSerializer resultSerializer, + string? name, + InvokeConfig? config, + CancellationToken cancellationToken) + { + // Argument validation runs synchronously at the call site (matches the + // .NET convention of failing fast for misuse). Match Python/JS/Java + // parity: only check for null/empty here; the durable execution service + // enforces the qualified-ARN rule and surfaces a precise error when an + // unqualified identifier is used. + ArgumentNullException.ThrowIfNull(functionName); + if (string.IsNullOrWhiteSpace(functionName)) + throw new ArgumentException("Function name must not be empty or whitespace.", nameof(functionName)); + + cancellationToken.ThrowIfCancellationRequested(); + + var operationId = _idGenerator.NextId(); + var op = new InvokeOperation( + operationId, name, functionName, payload, config, + payloadSerializer, resultSerializer, + _state, _terminationManager, _durableExecutionArn, _batcher); + return op.ExecuteAsync(cancellationToken); + } } /// diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/InvokeException.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/InvokeException.cs new file mode 100644 index 000000000..fb92dd3c1 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/InvokeException.cs @@ -0,0 +1,85 @@ +namespace Amazon.Lambda.DurableExecution; + +/// +/// Thrown when a chained invoke operation reaches a non-success terminal state. +/// +/// +/// Base class for the invoke exception tree. Catch +/// to handle every chained-invoke failure mode uniformly, or pattern-match the +/// concrete subclasses to react differently to specific outcomes: +/// +/// — the chained function threw. +/// — the configured (or service) +/// timeout elapsed before completion. +/// — the chained execution was +/// stopped by the service or an operator. +/// +/// Mirrors the Java SDK's InvokeException / InvokeFailedException +/// / InvokeTimedOutException / InvokeStoppedException tree; the +/// .NET SDK keeps non-abstract so callers can also +/// rethrow it directly when wrapping fallback logic. +/// +public class InvokeException : DurableExecutionException +{ + /// The fully-qualified name of the invoked function (ARN, alias, or version). + public string? FunctionName { get; init; } + + /// The fully-qualified type name of the original exception, when known. + public string? ErrorType { get; init; } + + /// Optional structured error data attached by the invoked function. + public string? ErrorData { get; init; } + + /// Stack trace of the original exception, captured before serialization. + public IReadOnlyList? OriginalStackTrace { get; init; } + + /// Creates an empty . + public InvokeException() { } + /// Creates an with the given message. + public InvokeException(string message) : base(message) { } + /// Creates an wrapping an inner exception. + public InvokeException(string message, Exception innerException) : base(message, innerException) { } +} + +/// +/// Thrown when a chained invoke operation completes with status FAILED — +/// the invoked function ran and threw. +/// +public class InvokeFailedException : InvokeException +{ + /// Creates an empty . + public InvokeFailedException() { } + /// Creates an with the given message. + public InvokeFailedException(string message) : base(message) { } + /// Creates an wrapping an inner exception. + public InvokeFailedException(string message, Exception innerException) : base(message, innerException) { } +} + +/// +/// Thrown when a chained invoke operation completes with status TIMED_OUT +/// — the invocation did not complete within the service-level timeout. +/// +public class InvokeTimedOutException : InvokeException +{ + /// Creates an empty . + public InvokeTimedOutException() { } + /// Creates an with the given message. + public InvokeTimedOutException(string message) : base(message) { } + /// Creates an wrapping an inner exception. + public InvokeTimedOutException(string message, Exception innerException) : base(message, innerException) { } +} + +/// +/// Thrown when a chained invoke operation completes with status STOPPED +/// — the invocation was stopped administratively by the durable execution +/// service before reaching a normal terminal state. +/// +public class InvokeStoppedException : InvokeException +{ + /// Creates an empty . + public InvokeStoppedException() { } + /// Creates an with the given message. + public InvokeStoppedException(string message) : base(message) { } + /// Creates an wrapping an inner exception. + public InvokeStoppedException(string message, Exception innerException) : base(message, innerException) { } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs index eb10a0ffe..d89dc1fe4 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs @@ -119,6 +119,77 @@ Task RunInChildContextAsync( string? name = null, ChildContextConfig? config = null, CancellationToken cancellationToken = default); + + /// + /// Invoke another durable Lambda function and await its result. The + /// invocation is checkpointed so it survives parent failures and is not + /// double-fired on replay. + /// + /// + /// + /// The chained function runs out-of-process: the SDK checkpoints a + /// CHAINED_INVOKE START with the supplied + /// and suspends; the durable execution service runs the target function + /// asynchronously and re-invokes the parent workflow when it completes. + /// On resume, the cached result is deserialized and returned, or the + /// recorded failure surfaces as an subclass. + /// + /// + /// must be a qualified identifier (version, + /// alias, or $LATEST). Unqualified ARNs are rejected by the durable + /// execution service. The SDK only validates that the value is non-null + /// and non-empty; ARN shape validation is left to the service to keep + /// behavior consistent with the Python, JavaScript, and Java SDKs. + /// + /// + /// The payload is serialized to a checkpoint using reflection-based + /// System.Text.Json; the result is deserialized the same way. For + /// NativeAOT or trimmed deployments, use the overload that takes + /// parameters for both payload and + /// result. + /// + /// + /// The payload type sent to the chained function. + /// The result type returned by the chained function. + /// is null. + /// is empty or whitespace. + /// The chained function threw. + /// The chained function did not complete within the configured timeout. + /// The chained execution was stopped by the service. + [RequiresUnreferencedCode("Reflection-based JSON for TPayload/TResult. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + [RequiresDynamicCode("Reflection-based JSON for TPayload/TResult. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + Task InvokeAsync( + string functionName, + TPayload payload, + string? name = null, + InvokeConfig? config = null, + CancellationToken cancellationToken = default); + + /// + /// Invoke another durable Lambda function with AOT-safe checkpoint + /// serialization. The supplied payload and result serializers are used in + /// place of reflection-based JSON. + /// + /// + /// Two positional serializers are required (one for the payload, one for + /// the result) — mirroring the StepAsync AOT-safe overload's pattern of + /// taking as a positional parameter. + /// + /// The payload type sent to the chained function. + /// The result type returned by the chained function. + /// , , or is null. + /// is empty or whitespace. + /// The chained function threw. + /// The chained function did not complete within the configured timeout. + /// The chained execution was stopped by the service. + Task InvokeAsync( + string functionName, + TPayload payload, + ICheckpointSerializer payloadSerializer, + ICheckpointSerializer resultSerializer, + string? name = null, + InvokeConfig? config = null, + CancellationToken cancellationToken = default); } /// diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExecutionState.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExecutionState.cs index 606614621..8bbd08535 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExecutionState.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExecutionState.cs @@ -130,5 +130,6 @@ private static bool IsTerminalStatus(string? status) => status == OperationStatuses.Succeeded || status == OperationStatuses.Failed || status == OperationStatuses.Cancelled - || status == OperationStatuses.Stopped; + || status == OperationStatuses.Stopped + || status == OperationStatuses.TimedOut; } diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/InvokeOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/InvokeOperation.cs new file mode 100644 index 000000000..06a7097bf --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/InvokeOperation.cs @@ -0,0 +1,171 @@ +using SdkChainedInvokeOptions = Amazon.Lambda.Model.ChainedInvokeOptions; +using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate; + +namespace Amazon.Lambda.DurableExecution.Internal; + +/// +/// Durable chained-invoke operation. Schedules an asynchronous invocation of +/// another durable Lambda function via the durable execution service and +/// suspends the parent workflow until the chained execution reaches a terminal +/// state. The service drives the chained function and re-invokes the parent +/// with an updated operation status. +/// +/// +/// Replay branches — example: +/// await ctx.InvokeAsync<Req, Resp>("arn:...:fn:prod", req, "process_payment") +/// +/// Fresh: serialize payload → sync-flush CHAINED_INVOKE START +/// (carrying ) → suspend with +/// . +/// SUCCEEDED: deserialize and return cached result from +/// ChainedInvokeDetails.Result; the chained function is NOT +/// re-invoked. +/// FAILED: throw populated +/// from the recorded error. +/// TIMED_OUT: throw . +/// STOPPED: throw . +/// STARTED / PENDING: chained execution is still in +/// flight; re-suspend without re-checkpointing — the original +/// START remains authoritative. +/// +/// Mirrors 's "sync-flush START → suspend" idiom; +/// the chained function executes out-of-process so there is nothing to run +/// locally on either fresh or replay paths besides the suspend wiring. +/// +internal sealed class InvokeOperation : DurableOperation +{ + private readonly string _functionName; + private readonly TPayload _payload; + private readonly InvokeConfig? _config; + private readonly ICheckpointSerializer _payloadSerializer; + private readonly ICheckpointSerializer _resultSerializer; + + public InvokeOperation( + string operationId, + string? name, + string functionName, + TPayload payload, + InvokeConfig? config, + ICheckpointSerializer payloadSerializer, + ICheckpointSerializer resultSerializer, + ExecutionState state, + TerminationManager termination, + string durableExecutionArn, + CheckpointBatcher? batcher = null) + : base(operationId, name, state, termination, durableExecutionArn, batcher) + { + _functionName = functionName; + _payload = payload; + _config = config; + _payloadSerializer = payloadSerializer; + _resultSerializer = resultSerializer; + } + + protected override string OperationType => OperationTypes.ChainedInvoke; + + protected override async Task StartAsync(CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + var serializedPayload = _payloadSerializer.Serialize( + _payload, new SerializationContext(OperationId, DurableExecutionArn)); + + // Sync-flush CHAINED_INVOKE START before suspending — the service + // cannot drive a chained execution it hasn't received. A queued-but- + // unflushed START is indistinguishable from "never enqueued" if the + // parent is recycled, so the parent would resume without the service + // ever invoking the target function. + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + Type = OperationTypes.ChainedInvoke, + Action = "START", + SubType = OperationSubTypes.Invoke, + Name = Name, + Payload = serializedPayload, + ChainedInvokeOptions = new SdkChainedInvokeOptions + { + FunctionName = _functionName, + TenantId = _config?.TenantId + } + }, cancellationToken); + + return await Termination.SuspendAndAwait( + TerminationReason.InvokePending, $"invoke:{Name ?? _functionName}"); + } + + protected override Task ReplayAsync(Operation existing, CancellationToken cancellationToken) + { + switch (existing.Status) + { + case OperationStatuses.Succeeded: + return Task.FromResult(DeserializeResult(existing.ChainedInvokeDetails?.Result)); + + case OperationStatuses.Failed: + throw BuildFailed(existing); + + case OperationStatuses.TimedOut: + throw BuildTimedOut(existing); + + case OperationStatuses.Stopped: + throw BuildStopped(existing); + + case OperationStatuses.Started: + case OperationStatuses.Pending: + // Service hasn't reached a terminal state yet; re-suspend + // without re-checkpointing. The original START is still + // authoritative. .NET's checkpoint flow does not need Python's + // pre-suspend status re-check — a synchronous-completion race + // resolves naturally on the next replay because the service + // includes the updated operation status when it re-invokes us. + return Termination.SuspendAndAwait( + TerminationReason.InvokePending, $"invoke:{Name ?? _functionName}"); + + default: + throw new NonDeterministicExecutionException( + $"Chained invoke operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay."); + } + } + + private TResult DeserializeResult(string? serialized) + { + if (serialized == null) return default!; + return _resultSerializer.Deserialize(serialized, new SerializationContext(OperationId, DurableExecutionArn)); + } + + private InvokeFailedException BuildFailed(Operation failedOp) + { + var err = failedOp.ChainedInvokeDetails?.Error; + return new InvokeFailedException(err?.ErrorMessage ?? "Chained invoke failed.") + { + FunctionName = _functionName, + ErrorType = err?.ErrorType, + ErrorData = err?.ErrorData, + OriginalStackTrace = err?.StackTrace + }; + } + + private InvokeTimedOutException BuildTimedOut(Operation failedOp) + { + var err = failedOp.ChainedInvokeDetails?.Error; + return new InvokeTimedOutException(err?.ErrorMessage ?? "Chained invoke timed out.") + { + FunctionName = _functionName, + ErrorType = err?.ErrorType, + ErrorData = err?.ErrorData, + OriginalStackTrace = err?.StackTrace + }; + } + + private InvokeStoppedException BuildStopped(Operation failedOp) + { + var err = failedOp.ChainedInvokeDetails?.Error; + return new InvokeStoppedException(err?.ErrorMessage ?? "Chained invoke was stopped.") + { + FunctionName = _functionName, + ErrorType = err?.ErrorType, + ErrorData = err?.ErrorData, + OriginalStackTrace = err?.StackTrace + }; + } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs index dba534f6b..2e42d7478 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs @@ -83,11 +83,7 @@ private static Internal.Operation MapFromSdkOperation(SdkOperation sdkOp) StepDetails = sdkOp.StepDetails != null ? new Internal.StepDetails { Result = sdkOp.StepDetails.Result, - Error = sdkOp.StepDetails.Error != null ? new ErrorObject - { - ErrorType = sdkOp.StepDetails.Error.ErrorType, - ErrorMessage = sdkOp.StepDetails.Error.ErrorMessage - } : null, + Error = MapError(sdkOp.StepDetails.Error), Attempt = sdkOp.StepDetails.Attempt, NextAttemptTimestamp = sdkOp.StepDetails.NextAttemptTimestamp.HasValue ? new DateTimeOffset(sdkOp.StepDetails.NextAttemptTimestamp.Value, TimeSpan.Zero).ToUnixTimeMilliseconds() @@ -106,12 +102,37 @@ private static Internal.Operation MapFromSdkOperation(SdkOperation sdkOp) ContextDetails = sdkOp.ContextDetails != null ? new Internal.ContextDetails { Result = sdkOp.ContextDetails.Result, - Error = sdkOp.ContextDetails.Error != null ? new ErrorObject - { - ErrorType = sdkOp.ContextDetails.Error.ErrorType, - ErrorMessage = sdkOp.ContextDetails.Error.ErrorMessage - } : null + Error = MapError(sdkOp.ContextDetails.Error) + } : null, + ChainedInvokeDetails = sdkOp.ChainedInvokeDetails != null ? new Internal.ChainedInvokeDetails + { + Result = sdkOp.ChainedInvokeDetails.Result, + Error = MapError(sdkOp.ChainedInvokeDetails.Error) } : null }; } + + /// + /// Maps an SDK into the + /// internal . Carries every field the wire object + /// exposes — ErrorType, ErrorMessage, ErrorData, and + /// StackTrace — so the durable execution exception builders + /// (, , and + /// the tree) can rehydrate the original + /// failure faithfully on real-service replay. + /// + private static ErrorObject? MapError(Amazon.Lambda.Model.ErrorObject? sdkError) + { + if (sdkError == null) return null; + return new ErrorObject + { + ErrorType = sdkError.ErrorType, + ErrorMessage = sdkError.ErrorMessage, + ErrorData = sdkError.ErrorData, + // SDK exposes List; assigning into IReadOnlyList? + // is reference-identical. A null list (SDK 4.x default when the + // field isn't set on the wire) propagates as null on our side. + StackTrace = sdkError.StackTrace + }; + } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs index b2ba4bb1a..3bd75a0c9 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/DurableFunctionDeployment.cs @@ -28,10 +28,21 @@ internal sealed class DurableFunctionDeployment : IAsyncDisposable private readonly string _roleName; private string? _roleArn; private string? _imageUri; + private string? _functionArn; private bool _functionCreated; private bool _ecrRepoCreated; + private readonly List _inlinePolicyNames = new(); public string FunctionName => _functionName; + + /// + /// The fully-qualified function ARN (unqualified). Available after + /// or completes. Use $"{FunctionArn}:$LATEST" + /// when constructing a qualified identifier for chained invocation. + /// + public string FunctionArn => _functionArn + ?? throw new InvalidOperationException("Function ARN is not available until the function has been created."); + public IAmazonLambda LambdaClient => _lambdaClient; private DurableFunctionDeployment(ITestOutputHelper output, string suffix) @@ -52,12 +63,14 @@ private DurableFunctionDeployment(ITestOutputHelper output, string suffix) public static async Task CreateAsync( string testFunctionDir, string scenarioSuffix, - ITestOutputHelper output) + ITestOutputHelper output, + IDictionary? environment = null, + IReadOnlyList? invokeAllowedFunctionArns = null) { var deployment = new DurableFunctionDeployment(output, scenarioSuffix); try { - await deployment.InitializeAsync(testFunctionDir); + await deployment.InitializeAsync(testFunctionDir, environment, invokeAllowedFunctionArns); } catch { @@ -69,7 +82,73 @@ public static async Task CreateAsync( return deployment; } - private async Task InitializeAsync(string testFunctionDir) + /// + /// Two-step deployment for chained-invoke scenarios: deploys the downstream (callee) + /// function first, captures its ARN, then deploys the parent (caller) with + /// DOWNSTREAM_FUNCTION_ARN set in the parent's environment and the parent's + /// role granted lambda:InvokeFunction on the downstream's ARN. + /// + /// + /// The parent and downstream are independent + /// instances; both are returned so the caller can dispose them in the right order + /// (parent first, then downstream — the caller is the one in flight when the test ends). + /// The DOWNSTREAM_FUNCTION_ARN env var carries a qualified identifier + /// (arn:...:function:name:$LATEST) so the parent can pass it directly to + /// ctx.InvokeAsync(...) without further manipulation. + /// + public static async Task<(DurableFunctionDeployment Parent, DurableFunctionDeployment Downstream)> + CreateWithDownstreamAsync( + string parentTestFunctionDir, + string downstreamTestFunctionDir, + string scenarioSuffix, + ITestOutputHelper output, + IDictionary? extraParentEnvironment = null) + { + // Deploy downstream first so we can pass its ARN to the parent's environment. + var downstream = await CreateAsync( + downstreamTestFunctionDir, + scenarioSuffix + "-d", + output); + + DurableFunctionDeployment? parent = null; + try + { + // Use a qualified identifier — the durable execution service rejects + // unqualified ARNs. $LATEST is fine for integration tests; production + // should use a version or alias. + var qualifiedDownstreamArn = downstream.FunctionArn + ":$LATEST"; + var parentEnv = new Dictionary(StringComparer.Ordinal) + { + ["DOWNSTREAM_FUNCTION_ARN"] = qualifiedDownstreamArn, + }; + if (extraParentEnvironment != null) + { + foreach (var kv in extraParentEnvironment) + parentEnv[kv.Key] = kv.Value; + } + + parent = await CreateAsync( + parentTestFunctionDir, + scenarioSuffix + "-p", + output, + environment: parentEnv, + invokeAllowedFunctionArns: new[] { downstream.FunctionArn }); + } + catch + { + // Parent failed to deploy — tear down the downstream we already created + // so we don't leak resources. + await downstream.DisposeAsync(); + throw; + } + + return (parent!, downstream); + } + + private async Task InitializeAsync( + string testFunctionDir, + IDictionary? environment = null, + IReadOnlyList? invokeAllowedFunctionArns = null) { // 1. Create IAM role _output.WriteLine($"Creating IAM role: {_roleName}"); @@ -103,6 +182,42 @@ await _iamClient.AttachRolePolicyAsync(new AttachRolePolicyRequest PolicyArn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicDurableExecutionRolePolicy" }); + // Grant cross-Lambda invoke when the parent of a chained-invoke scenario + // needs to call out to a downstream function. The durable execution service + // is the one that actually drives the chained invocation in production — + // attaching this directly to the parent's role keeps the parent role + // capable of being used in non-durable contexts (e.g. for diagnostic + // direct invokes from the test harness). + if (invokeAllowedFunctionArns != null && invokeAllowedFunctionArns.Count > 0) + { + // Allow both the unqualified ARN and any qualifier (alias/version/$LATEST). + var resources = new List(invokeAllowedFunctionArns.Count * 2); + foreach (var arn in invokeAllowedFunctionArns) + { + resources.Add(arn); + resources.Add(arn + ":*"); + } + var resourceJson = "[" + string.Join(",", resources.Select(r => $"\"{r}\"")) + "]"; + var policyDoc = $$""" + { + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Action": ["lambda:InvokeFunction"], + "Resource": {{resourceJson}} + }] + } + """; + const string PolicyName = "AllowChainedInvoke"; + await _iamClient.PutRolePolicyAsync(new PutRolePolicyRequest + { + RoleName = _roleName, + PolicyName = PolicyName, + PolicyDocument = policyDoc + }); + _inlinePolicyNames.Add(PolicyName); + } + // Wait for IAM propagation await Task.Delay(TimeSpan.FromSeconds(10)); @@ -122,7 +237,7 @@ await _iamClient.AttachRolePolicyAsync(new AttachRolePolicyRequest // 4. Create Lambda function _output.WriteLine($"Creating Lambda function: {_functionName}"); - await _lambdaClient.CreateFunctionAsync(new CreateFunctionRequest + var createFunctionRequest = new CreateFunctionRequest { FunctionName = _functionName, PackageType = PackageType.Image, @@ -131,10 +246,19 @@ await _lambdaClient.CreateFunctionAsync(new CreateFunctionRequest Timeout = 30, MemorySize = 256, DurableConfig = new DurableConfig { ExecutionTimeout = 60 } - }); + }; + if (environment != null && environment.Count > 0) + { + createFunctionRequest.Environment = new Amazon.Lambda.Model.Environment + { + Variables = new Dictionary(environment, StringComparer.Ordinal) + }; + } + var createFunctionResponse = await _lambdaClient.CreateFunctionAsync(createFunctionRequest); _functionCreated = true; + _functionArn = createFunctionResponse.FunctionArn; - _output.WriteLine("Waiting for function to become Active..."); + _output.WriteLine($"Waiting for function to become Active... (ARN: {_functionArn})"); await WaitForFunctionActive(); } @@ -442,6 +566,21 @@ await _ecrClient.DeleteRepositoryAsync(new DeleteRepositoryRequest // want to attempt the others and the final DeleteRole. await TryDetachPolicy("arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"); await TryDetachPolicy("arn:aws:iam::aws:policy/service-role/AWSLambdaBasicDurableExecutionRolePolicy"); + + // Inline policies must be deleted (not detached) before DeleteRole succeeds. + foreach (var inline in _inlinePolicyNames) + { + try + { + await _iamClient.DeleteRolePolicyAsync(new DeleteRolePolicyRequest + { + RoleName = _roleName, + PolicyName = inline + }); + } + catch (Exception ex) { _output.WriteLine($"Cleanup error (IAM DeleteRolePolicy {inline}): {ex.Message}"); } + } + try { await _iamClient.DeleteRoleAsync(new DeleteRoleRequest { RoleName = _roleName }); diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/InvokeFailureTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/InvokeFailureTest.cs new file mode 100644 index 000000000..e82080f33 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/InvokeFailureTest.cs @@ -0,0 +1,70 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class InvokeFailureTest +{ + private readonly ITestOutputHelper _output; + public InvokeFailureTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task InvokeAsync_ChildThrows_ParentSurfacesInvokeFailedException() + { + var (parent, downstream) = await DurableFunctionDeployment.CreateWithDownstreamAsync( + parentTestFunctionDir: DurableFunctionDeployment.FindTestFunctionDir("InvokeFailureParentFunction"), + downstreamTestFunctionDir: DurableFunctionDeployment.FindTestFunctionDir("InvokeFailureChildFunction"), + scenarioSuffix: "invokefail", + output: _output); + + await using (downstream) + await using (parent) + { + var (invokeResponse, executionName) = await parent.InvokeAsync("""{"orderId": "invoke-fail"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Parent response: {responsePayload}"); + + var arn = await parent.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + // The parent catches InvokeFailedException and returns normally — + // the parent execution itself SUCCEEDS even though the chained + // invocation FAILED. This is the value of the SDK's exception + // surface: failure is observable but not necessarily fatal. + var status = await parent.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + var history = await parent.WaitForHistoryAsync( + arn!, + h => (h.Events?.Any(e => e.EventType == EventType.ChainedInvokeStarted) ?? false) + && (h.Events?.Any(e => e.ChainedInvokeFailedDetails != null) ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + // Exactly one chained invoke was issued and it FAILED — the parent + // did not retry the invoke (no retry semantics for InvokeAsync yet). + Assert.Equal(1, events.Count(e => e.EventType == EventType.ChainedInvokeStarted)); + var failed = events.FirstOrDefault(e => e.ChainedInvokeFailedDetails != null); + Assert.NotNull(failed); + Assert.Equal("call_failing_child", failed!.Name); + + var error = failed.ChainedInvokeFailedDetails.Error?.Payload; + Assert.NotNull(error); + // The child's exception type and message propagate through the + // service into the parent's history. Some service implementations + // record only the simple type name and others the fully-qualified + // one — match either by checking for the substring. + Assert.Contains("InvalidOperationException", error!.ErrorType ?? string.Empty); + Assert.Contains("intentional child failure", error.ErrorMessage ?? string.Empty); + + // The parent's terminal result encodes "parent-saw-" — confirms + // the parent's catch block ran and the exception's ErrorType field + // was populated by the SDK on resume from the FAILED chained invoke. + var execution = await parent.GetExecutionAsync(arn!); + Assert.Null(execution.Error); + } + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/InvokeHappyPathTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/InvokeHappyPathTest.cs new file mode 100644 index 000000000..66a5c1d7f --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/InvokeHappyPathTest.cs @@ -0,0 +1,67 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class InvokeHappyPathTest +{ + private readonly ITestOutputHelper _output; + public InvokeHappyPathTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task InvokeAsync_HappyPath_ChildResultPropagatesToParent() + { + var (parent, downstream) = await DurableFunctionDeployment.CreateWithDownstreamAsync( + parentTestFunctionDir: DurableFunctionDeployment.FindTestFunctionDir("InvokeHappyPathParentFunction"), + downstreamTestFunctionDir: DurableFunctionDeployment.FindTestFunctionDir("InvokeHappyPathChildFunction"), + scenarioSuffix: "invokehappy", + output: _output); + + await using (downstream) + await using (parent) + { + var (invokeResponse, executionName) = await parent.InvokeAsync("""{"orderId": "invoke-happy"}"""); + Assert.Equal(200, invokeResponse.StatusCode); + + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Parent response: {responsePayload}"); + + // Locate the parent execution and wait for terminal status. Chained + // invoke suspends the parent — the synchronous Invoke response + // carries no data — so we drive completion via the listing API. + var arn = await parent.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + var status = await parent.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + // The chained invoke's result surfaces in the parent's history as a + // ChainedInvokeSucceeded event. The parent then returns that result + // verbatim from its workflow. + var history = await parent.WaitForHistoryAsync( + arn!, + h => (h.Events?.Any(e => e.EventType == EventType.ChainedInvokeStarted) ?? false) + && (h.Events?.Any(e => e.ChainedInvokeSucceededDetails != null) ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + var started = events.FirstOrDefault(e => e.EventType == EventType.ChainedInvokeStarted); + Assert.NotNull(started); + Assert.Equal(downstream.FunctionArn + ":$LATEST", started!.ChainedInvokeStartedDetails.FunctionName); + + var succeeded = events.FirstOrDefault(e => e.ChainedInvokeSucceededDetails != null); + Assert.NotNull(succeeded); + // The child returned the JSON-encoded string "got-42". + var childPayload = succeeded!.ChainedInvokeSucceededDetails.Result?.Payload?.Trim('"'); + Assert.Equal("got-42", childPayload); + + // The chained invoke event names what was invoked; cross-check against + // the deployed downstream's name so we know the parent really called + // the function we wired in. + Assert.Equal("call_child", succeeded.Name); + } + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/InvokeReplayDeterminismTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/InvokeReplayDeterminismTest.cs new file mode 100644 index 000000000..c99259297 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/InvokeReplayDeterminismTest.cs @@ -0,0 +1,119 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class InvokeReplayDeterminismTest +{ + private readonly ITestOutputHelper _output; + public InvokeReplayDeterminismTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task InvokeAsync_ReplayDeterminism_OperationIdsStableAcrossInvocations() + { + var (parent, downstream) = await DurableFunctionDeployment.CreateWithDownstreamAsync( + parentTestFunctionDir: DurableFunctionDeployment.FindTestFunctionDir("InvokeReplayDeterminismParentFunction"), + downstreamTestFunctionDir: DurableFunctionDeployment.FindTestFunctionDir("InvokeReplayDeterminismChildFunction"), + scenarioSuffix: "invokerply", + output: _output); + + await using (downstream) + await using (parent) + { + var (invokeResponse, executionName) = await parent.InvokeAsync("""{"orderId": "invoke-replay"}"""); + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Parent response: {responsePayload}"); + + var arn = await parent.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + var status = await parent.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(180)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + // History is eventually consistent — wait until both step-succeeded + // events AND the chained-invoke-succeeded event are visible. + var history = await parent.WaitForHistoryAsync( + arn!, + h => (h.Events?.Count(e => e.StepSucceededDetails != null) ?? 0) >= 2 + && (h.Events?.Any(e => e.ChainedInvokeSucceededDetails != null) ?? false), + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + // Each step ran exactly once across the entire workflow — proves + // the chained invoke's suspend/resume cycle did NOT cause the + // pre-invoke step to re-execute. (Replay returned the cached + // checkpoint instead.) + var stepSucceededByName = events + .Where(e => e.StepSucceededDetails != null) + .GroupBy(e => e.Name) + .ToDictionary(g => g.Key!, g => g.Count()); + Assert.Equal(1, stepSucceededByName["before_invoke"]); + Assert.Equal(1, stepSucceededByName["after_invoke"]); + + // Exactly ONE chained invoke fired — replay didn't double-fire + // the InvokeAsync. Same invariant we check for steps. + Assert.Equal(1, events.Count(e => e.EventType == EventType.ChainedInvokeStarted)); + Assert.Equal(1, events.Count(e => e.ChainedInvokeSucceededDetails != null)); + + var beforeInvokeEvent = events.First(e => e.StepSucceededDetails != null && e.Name == "before_invoke"); + var generatedGuid = beforeInvokeEvent.StepSucceededDetails.Result?.Payload?.Trim('"'); + Assert.NotNull(generatedGuid); + Assert.True(Guid.TryParse(generatedGuid, out _), + $"before_invoke should produce a valid GUID, got: {generatedGuid}"); + + // The downstream's echo carries through to after_invoke verbatim, + // proving the cached chained-invoke result was used on resume. + var chainedSucceeded = events.First(e => e.ChainedInvokeSucceededDetails != null); + var chainedPayload = chainedSucceeded.ChainedInvokeSucceededDetails.Result?.Payload?.Trim('"'); + Assert.Equal($"echoed:{generatedGuid}", chainedPayload); + + var afterInvokeEvent = events.First(e => e.StepSucceededDetails != null && e.Name == "after_invoke"); + var afterPayload = afterInvokeEvent.StepSucceededDetails.Result?.Payload?.Trim('"'); + Assert.Equal($"final:echoed:{generatedGuid}", afterPayload); + + // The chained invoke's suspend/resume forced at least 2 invocations + // of the parent — proves replay actually happened (not just a + // single straight-through execution that skipped suspension). + var invocations = events.Where(e => e.InvocationCompletedDetails != null).ToList(); + Assert.True( + invocations.Count >= 2, + $"Expected at least 2 InvocationCompleted events (proves replay happened), got {invocations.Count}"); + + // Operation IDs are stable across all replays of the same logical + // position. The Started event and the corresponding Succeeded event + // for each operation share the same ID — that's the clearest + // observable proof the SDK's deterministic ID generator is working. + // The SDK hashes "<counter>" at the root, so each ID is a + // 64-char lowercase hex SHA-256 digest. + var startedIds = events + .Where(e => e.EventType == EventType.StepStarted || e.EventType == EventType.ChainedInvokeStarted) + .Select(e => (e.Name, Id: e.Id)) + .ToList(); + var succeededIds = events + .Where(e => e.StepSucceededDetails != null || e.ChainedInvokeSucceededDetails != null) + .Select(e => (e.Name, Id: e.Id)) + .ToList(); + + // All operation IDs are populated and look like SHA-256 hex digests. + foreach (var (name, id) in startedIds) + { + Assert.False(string.IsNullOrEmpty(id), $"Operation '{name}' has no Id on its Started event"); + Assert.Equal(64, id!.Length); + Assert.Matches("^[0-9a-f]{64}$", id); + } + + // Every started operation ID must appear in a succeeded event — + // proves the deterministic IDs from the Start path matched the IDs + // the service used to record the terminal event. + foreach (var (name, id) in startedIds) + { + Assert.True( + succeededIds.Any(s => s.Name == name && s.Id == id), + $"Operation '{name}' (id={id}) started but did not produce a matching SUCCEEDED event with the same ID"); + } + } + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/InvokeWithTenantIdTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/InvokeWithTenantIdTest.cs new file mode 100644 index 000000000..e6acb3d8f --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/InvokeWithTenantIdTest.cs @@ -0,0 +1,64 @@ +using System.Linq; +using System.Text; +using Amazon.Lambda.Model; +using Xunit; +using Xunit.Abstractions; + +namespace Amazon.Lambda.DurableExecution.IntegrationTests; + +public class InvokeWithTenantIdTest +{ + private readonly ITestOutputHelper _output; + public InvokeWithTenantIdTest(ITestOutputHelper output) => _output = output; + + [Fact] + public async Task InvokeAsync_WithTenantId_PropagatesToChainedInvokeOptions() + { + var (parent, downstream) = await DurableFunctionDeployment.CreateWithDownstreamAsync( + parentTestFunctionDir: DurableFunctionDeployment.FindTestFunctionDir("InvokeWithTenantIdFunction"), + downstreamTestFunctionDir: DurableFunctionDeployment.FindTestFunctionDir("InvokeChildTenantFunction"), + scenarioSuffix: "invoketenant", + output: _output); + + await using (downstream) + await using (parent) + { + var (invokeResponse, executionName) = await parent.InvokeAsync("""{"orderId": "tenant-test"}"""); + Assert.Equal(200, invokeResponse.StatusCode); + + var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray()); + _output.WriteLine($"Parent response: {responsePayload}"); + + var arn = await parent.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60)); + Assert.NotNull(arn); + + // The parent itself succeeds — proves the tenant-tagged chained + // invoke completed end-to-end. (.NET's IDurableContext doesn't + // expose tenant ID on the receiving side as of this commit, so + // wire-format propagation IS the substantive assertion.) + var status = await parent.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120)); + Assert.Equal("SUCCEEDED", status, ignoreCase: true); + + var history = await parent.WaitForHistoryAsync( + arn!, + h => h.Events?.Any(e => e.EventType == EventType.ChainedInvokeStarted) ?? false, + TimeSpan.FromSeconds(60)); + var events = history.Events ?? new List(); + + var started = events.FirstOrDefault(e => e.EventType == EventType.ChainedInvokeStarted); + Assert.NotNull(started); + + // The tenant ID flows through ChainedInvokeOptions -> service -> + // ChainedInvokeStartedDetails. This is the load-bearing assertion: + // it proves the SDK's InvokeConfig.TenantId reaches the wire. + Assert.Equal("test-tenant", started!.ChainedInvokeStartedDetails.TenantId); + + // The chained call still produced a result — proves nothing in the + // tenant-routing path silently dropped the invocation. + var succeeded = events.FirstOrDefault(e => e.ChainedInvokeSucceededDetails != null); + Assert.NotNull(succeeded); + var childPayload = succeeded!.ChainedInvokeSucceededDetails.Result?.Payload?.Trim('"'); + Assert.Equal("tenant-aware-7", childPayload); + } + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeChildTenantFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeChildTenantFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeChildTenantFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeChildTenantFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeChildTenantFunction/Function.cs new file mode 100644 index 000000000..445d91a18 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeChildTenantFunction/Function.cs @@ -0,0 +1,30 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(int input, IDurableContext context) + { + var formatted = await context.StepAsync( + async (_) => { await Task.CompletedTask; return $"tenant-aware-{input}"; }, + name: "tenant_step"); + return formatted; + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeChildTenantFunction/InvokeChildTenantFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeChildTenantFunction/InvokeChildTenantFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeChildTenantFunction/InvokeChildTenantFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureChildFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureChildFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureChildFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureChildFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureChildFunction/Function.cs new file mode 100644 index 000000000..99caec59c --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureChildFunction/Function.cs @@ -0,0 +1,39 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(int input, IDurableContext context) + { + // Throw inside a step so the workflow records a step-failed event AND + // surfaces a FAILED execution status. The parent's InvokeAsync sees a + // FAILED chained invocation and raises InvokeFailedException with the + // step's error type (System.InvalidOperationException) attached. + await context.StepAsync( + async (_) => + { + await Task.CompletedTask; + throw new InvalidOperationException("intentional child failure"); + }, + name: "fail_step"); + + return "unreachable"; + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureChildFunction/InvokeFailureChildFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureChildFunction/InvokeFailureChildFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureChildFunction/InvokeFailureChildFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureParentFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureParentFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureParentFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureParentFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureParentFunction/Function.cs new file mode 100644 index 000000000..53284d7b9 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureParentFunction/Function.cs @@ -0,0 +1,53 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + var downstreamArn = System.Environment.GetEnvironmentVariable("DOWNSTREAM_FUNCTION_ARN") + ?? throw new InvalidOperationException("DOWNSTREAM_FUNCTION_ARN env var is not set."); + + try + { + await context.InvokeAsync( + downstreamArn, + payload: 1, + name: "call_failing_child"); + + // Should not reach — the child throws and the parent surfaces + // InvokeFailedException on the resume. + return new TestResult { Status = "unexpected_success", Data = null }; + } + catch (InvokeFailedException ex) + { + // The parent catches and converts the exception into a normal result — + // the workflow itself succeeds, even though the chained invoke failed. + return new TestResult + { + Status = "completed", + Data = $"parent-saw-{ex.ErrorType ?? "unknown"}" + }; + } + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult { public string? Status { get; set; } public string? Data { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureParentFunction/InvokeFailureParentFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureParentFunction/InvokeFailureParentFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureParentFunction/InvokeFailureParentFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathChildFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathChildFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathChildFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathChildFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathChildFunction/Function.cs new file mode 100644 index 000000000..e064982df --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathChildFunction/Function.cs @@ -0,0 +1,30 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(int input, IDurableContext context) + { + var prefixed = await context.StepAsync( + async (_) => { await Task.CompletedTask; return $"got-{input}"; }, + name: "format"); + return prefixed; + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathChildFunction/InvokeHappyPathChildFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathChildFunction/InvokeHappyPathChildFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathChildFunction/InvokeHappyPathChildFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathParentFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathParentFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathParentFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathParentFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathParentFunction/Function.cs new file mode 100644 index 000000000..a9c6270fa --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathParentFunction/Function.cs @@ -0,0 +1,41 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + // Parent receives the downstream function ARN via env var so the test + // harness can wire arbitrary downstream functions without rebuilding + // the parent image. + var downstreamArn = System.Environment.GetEnvironmentVariable("DOWNSTREAM_FUNCTION_ARN") + ?? throw new InvalidOperationException("DOWNSTREAM_FUNCTION_ARN env var is not set."); + + var result = await context.InvokeAsync( + downstreamArn, + payload: 42, + name: "call_child"); + + return new TestResult { Status = "completed", Data = result }; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult { public string? Status { get; set; } public string? Data { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathParentFunction/InvokeHappyPathParentFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathParentFunction/InvokeHappyPathParentFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathParentFunction/InvokeHappyPathParentFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismChildFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismChildFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismChildFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismChildFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismChildFunction/Function.cs new file mode 100644 index 000000000..576fad805 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismChildFunction/Function.cs @@ -0,0 +1,30 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(string input, IDurableContext context) + { + var echoed = await context.StepAsync( + async (_) => { await Task.CompletedTask; return $"echoed:{input}"; }, + name: "child_echo"); + return echoed; + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismChildFunction/InvokeReplayDeterminismChildFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismChildFunction/InvokeReplayDeterminismChildFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismChildFunction/InvokeReplayDeterminismChildFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismParentFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismParentFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismParentFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismParentFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismParentFunction/Function.cs new file mode 100644 index 000000000..fd3e5ebd5 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismParentFunction/Function.cs @@ -0,0 +1,52 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + var downstreamArn = System.Environment.GetEnvironmentVariable("DOWNSTREAM_FUNCTION_ARN") + ?? throw new InvalidOperationException("DOWNSTREAM_FUNCTION_ARN env var is not set."); + + // Step 1 generates a fresh GUID. On replay this MUST return the + // checkpointed value — proves the SDK's deterministic operation IDs + // line up with the service's view of the state. + var generatedId = await context.StepAsync( + async (_) => { await Task.CompletedTask; return Guid.NewGuid().ToString(); }, + name: "before_invoke"); + + // The chained invoke forces a suspend/resume cycle. After the resume, + // step 1 must replay (returning the cached GUID) and the invoke must + // not be re-fired (cached result is returned immediately). + var invokeResult = await context.InvokeAsync( + downstreamArn, + payload: generatedId, + name: "echo_invoke"); + + var afterInvoke = await context.StepAsync( + async (_) => { await Task.CompletedTask; return $"final:{invokeResult}"; }, + name: "after_invoke"); + + return new TestResult { Status = "completed", Data = afterInvoke }; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult { public string? Status { get; set; } public string? Data { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismParentFunction/InvokeReplayDeterminismParentFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismParentFunction/InvokeReplayDeterminismParentFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismParentFunction/InvokeReplayDeterminismParentFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeWithTenantIdFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeWithTenantIdFunction/Dockerfile new file mode 100644 index 000000000..c1913d56a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeWithTenantIdFunction/Dockerfile @@ -0,0 +1,7 @@ +FROM public.ecr.aws/lambda/provided:al2023 + +RUN dnf install -y libicu + +COPY bin/publish/ ${LAMBDA_TASK_ROOT} + +ENTRYPOINT ["/var/task/bootstrap"] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeWithTenantIdFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeWithTenantIdFunction/Function.cs new file mode 100644 index 000000000..b8f38b3d7 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeWithTenantIdFunction/Function.cs @@ -0,0 +1,43 @@ +using Amazon.Lambda.Core; +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +namespace DurableExecutionTestFunction; + +public class Function +{ + public static async Task Main(string[] args) + { + var handler = new Function(); + var serializer = new DefaultLambdaJsonSerializer(); + using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer); + using var bootstrap = new LambdaBootstrap(handlerWrapper); + await bootstrap.RunAsync(); + } + + public Task Handler( + DurableExecutionInvocationInput input, ILambdaContext context) + => DurableFunction.WrapAsync(Workflow, input, context); + + private async Task Workflow(TestEvent input, IDurableContext context) + { + var downstreamArn = System.Environment.GetEnvironmentVariable("DOWNSTREAM_FUNCTION_ARN") + ?? throw new InvalidOperationException("DOWNSTREAM_FUNCTION_ARN env var is not set."); + + // Pass an explicit TenantId via InvokeConfig — the wire-format + // propagation is the test. .NET's IDurableContext does not currently + // expose tenant ID to the child workflow, so the assertion is on the + // parent's history (ChainedInvokeStartedDetails.TenantId). + var result = await context.InvokeAsync( + downstreamArn, + payload: 7, + name: "call_with_tenant", + config: new InvokeConfig { TenantId = "test-tenant" }); + + return new TestResult { Status = "completed", Data = result }; + } +} + +public class TestEvent { public string? OrderId { get; set; } } +public class TestResult { public string? Status { get; set; } public string? Data { get; set; } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeWithTenantIdFunction/InvokeWithTenantIdFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeWithTenantIdFunction/InvokeWithTenantIdFunction.csproj new file mode 100644 index 000000000..6f5f657e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeWithTenantIdFunction/InvokeWithTenantIdFunction.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + Exe + true + bootstrap + enable + enable + + + + + + + + + diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ConfigTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ConfigTests.cs index f31586ea0..14c87feb9 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ConfigTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ConfigTests.cs @@ -12,4 +12,29 @@ public void SerializationContext_RecordEquality() var ctx2 = new SerializationContext("op-1", "arn:aws:lambda:us-east-1:123:function:my-func"); Assert.Equal(ctx1, ctx2); } + + [Fact] + public void InvokeConfig_Defaults() + { + var config = new InvokeConfig(); +#pragma warning disable CS0618 // Timeout is reserved for a future SDK/service contract. + Assert.Equal(TimeSpan.Zero, config.Timeout); +#pragma warning restore CS0618 + Assert.Null(config.TenantId); + } + + [Fact] + public void InvokeConfig_RoundTripsProperties() + { +#pragma warning disable CS0618 // Timeout is reserved for a future SDK/service contract. + var config = new InvokeConfig + { + Timeout = TimeSpan.FromMinutes(5), + TenantId = "tenant-42" + }; + + Assert.Equal(TimeSpan.FromMinutes(5), config.Timeout); +#pragma warning restore CS0618 + Assert.Equal("tenant-42", config.TenantId); + } } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExceptionsTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExceptionsTests.cs index 7105849bb..2e934703f 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExceptionsTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExceptionsTests.cs @@ -65,4 +65,117 @@ public void StepException_HasErrorProperties() Assert.Equal("operation timed out", ex.ErrorData); Assert.Equal(2, ex.OriginalStackTrace!.Count); } + + #region InvokeException tree + + [Fact] + public void InvokeException_IsDurableExecutionException() + { + var ex = new InvokeException("invoke failed"); + Assert.IsAssignableFrom(ex); + Assert.Equal("invoke failed", ex.Message); + } + + [Fact] + public void InvokeException_ParameterlessCtor() + { + var ex = new InvokeException(); + Assert.IsAssignableFrom(ex); + } + + [Fact] + public void InvokeException_WrapsInnerException() + { + var inner = new InvalidOperationException("inner"); + var ex = new InvokeException("outer", inner); + Assert.Same(inner, ex.InnerException); + } + + [Fact] + public void InvokeException_HasInvokeProperties() + { + var ex = new InvokeException("boom") + { + FunctionName = "arn:aws:lambda:us-east-1:123:function:fn:prod", + ErrorType = "System.TimeoutException", + ErrorData = "{\"detail\":\"x\"}", + OriginalStackTrace = new[] { "at A.B()" } + }; + + Assert.Equal("arn:aws:lambda:us-east-1:123:function:fn:prod", ex.FunctionName); + Assert.Equal("System.TimeoutException", ex.ErrorType); + Assert.Equal("{\"detail\":\"x\"}", ex.ErrorData); + Assert.Single(ex.OriginalStackTrace!); + } + + [Fact] + public void InvokeFailedException_IsInvokeException() + { + var ex = new InvokeFailedException("boom") { FunctionName = "fn:prod" }; + Assert.IsAssignableFrom(ex); + Assert.IsAssignableFrom(ex); + Assert.Equal("boom", ex.Message); + Assert.Equal("fn:prod", ex.FunctionName); + } + + [Fact] + public void InvokeFailedException_AllCtorOverloads() + { + var inner = new InvalidOperationException("inner"); + Assert.IsAssignableFrom(new InvokeFailedException()); + Assert.Equal("m", new InvokeFailedException("m").Message); + Assert.Same(inner, new InvokeFailedException("m", inner).InnerException); + } + + [Fact] + public void InvokeTimedOutException_IsInvokeException() + { + var ex = new InvokeTimedOutException("timed out"); + Assert.IsAssignableFrom(ex); + Assert.IsAssignableFrom(ex); + Assert.Equal("timed out", ex.Message); + } + + [Fact] + public void InvokeTimedOutException_AllCtorOverloads() + { + var inner = new TimeoutException("inner"); + Assert.IsAssignableFrom(new InvokeTimedOutException()); + Assert.Equal("m", new InvokeTimedOutException("m").Message); + Assert.Same(inner, new InvokeTimedOutException("m", inner).InnerException); + } + + [Fact] + public void InvokeStoppedException_IsInvokeException() + { + var ex = new InvokeStoppedException("stopped"); + Assert.IsAssignableFrom(ex); + Assert.IsAssignableFrom(ex); + Assert.Equal("stopped", ex.Message); + } + + [Fact] + public void InvokeStoppedException_AllCtorOverloads() + { + var inner = new InvalidOperationException("inner"); + Assert.IsAssignableFrom(new InvokeStoppedException()); + Assert.Equal("m", new InvokeStoppedException("m").Message); + Assert.Same(inner, new InvokeStoppedException("m", inner).InnerException); + } + + [Fact] + public void InvokeException_SubclassesCaughtByBase() + { + // Verifies the documented pattern-matching contract: catch + // (InvokeException) catches all three subclasses. + Exception failed = new InvokeFailedException("fail"); + Exception timedOut = new InvokeTimedOutException("timeout"); + Exception stopped = new InvokeStoppedException("stop"); + + Assert.True(failed is InvokeException); + Assert.True(timedOut is InvokeException); + Assert.True(stopped is InvokeException); + } + + #endregion } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExecutionStateTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExecutionStateTests.cs index 6500879c1..8b3081aa3 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExecutionStateTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ExecutionStateTests.cs @@ -196,6 +196,34 @@ public void HasOperation_ReturnsTrueForExisting() Assert.False(state.HasOperation("1-step_b")); } + [Fact] + public void TrackReplay_TerminalSet_IncludesTimedOut() + { + // TIMED_OUT is a terminal state (matches Python/JS/Java reference SDKs). + // A timed-out chained-invoke that has been visited must allow the + // replay-mode flag to flip; otherwise IsReplaying would stay stuck on + // for the rest of the invocation and downstream replay-aware features + // (e.g., the future replay-aware logger) would mis-fire. + var state = new ExecutionState(); + state.LoadFromCheckpoint(new InitialExecutionState + { + Operations = new List + { + ExecutionInputOp(), + new() + { + Id = "0-invoke", + Type = OperationTypes.ChainedInvoke, + Status = OperationStatuses.TimedOut + } + } + }); + Assert.True(state.IsReplaying); + + state.TrackReplay("0-invoke"); + Assert.False(state.IsReplaying); + } + [Fact] public void GetOperation_ReturnsLatestRecord_WhenIdAppearsMultipleTimes() { diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/InvokeOperationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/InvokeOperationTests.cs new file mode 100644 index 000000000..807e34e3a --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/InvokeOperationTests.cs @@ -0,0 +1,685 @@ +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.DurableExecution.Internal; +using Amazon.Lambda.TestUtilities; +using Xunit; + +namespace Amazon.Lambda.DurableExecution.Tests; + +public class InvokeOperationTests +{ + /// Reproduces the Id that emits for the n-th root-level operation. + private static string IdAt(int position) => OperationIdGenerator.HashOperationId(position.ToString()); + + private const string FunctionArn = "arn:aws:lambda:us-east-1:123456789012:function:downstream:prod"; + + private static (DurableContext context, RecordingBatcher recorder, TerminationManager tm, ExecutionState state) + CreateContext(InitialExecutionState? initialState = null) + { + var state = new ExecutionState(); + state.LoadFromCheckpoint(initialState); + var tm = new TerminationManager(); + var idGen = new OperationIdGenerator(); + var lambdaContext = new TestLambdaContext(); + var recorder = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + return (context, recorder, tm, state); + } + + #region Argument validation + + [Fact] + public async Task InvokeAsync_NullFunctionName_ThrowsArgumentNullException() + { + var (context, _, _, _) = CreateContext(); + + await Assert.ThrowsAsync(() => + context.InvokeAsync(functionName: null!, payload: "x")); + } + + [Fact] + public async Task InvokeAsync_EmptyFunctionName_ThrowsArgumentException() + { + var (context, _, _, _) = CreateContext(); + + await Assert.ThrowsAsync(() => + context.InvokeAsync(functionName: "", payload: "x")); + } + + [Fact] + public async Task InvokeAsync_WhitespaceFunctionName_ThrowsArgumentException() + { + var (context, _, _, _) = CreateContext(); + + await Assert.ThrowsAsync(() => + context.InvokeAsync(functionName: " ", payload: "x")); + } + + [Fact] + public async Task InvokeAsync_AotOverload_NullPayloadSerializer_ThrowsArgumentNullException() + { + var (context, _, _, _) = CreateContext(); + + await Assert.ThrowsAsync(() => + context.InvokeAsync( + FunctionArn, + payload: "x", + payloadSerializer: null!, + resultSerializer: new StringPassthroughSerializer())); + } + + [Fact] + public async Task InvokeAsync_AotOverload_NullResultSerializer_ThrowsArgumentNullException() + { + var (context, _, _, _) = CreateContext(); + + await Assert.ThrowsAsync(() => + context.InvokeAsync( + FunctionArn, + payload: "x", + payloadSerializer: new StringPassthroughSerializer(), + resultSerializer: null!)); + } + + [Fact] + public async Task InvokeAsync_PreservesUnqualifiedArn_AndPassesItThrough() + { + // The SDK does NOT regex-validate qualified ARNs. The service enforces + // that rule. We verify the value is propagated unmodified to the + // ChainedInvokeOptions.FunctionName so that service-side rejection + // surfaces with the user's exact input. + var (context, recorder, tm, _) = CreateContext(); + + var task = context.InvokeAsync( + "arn:aws:lambda:us-east-1:123456789012:function:no-version", + payload: "x", + name: "noversion"); + + await Task.Delay(20); + Assert.True(tm.IsTerminated); + Assert.False(task.IsCompleted); + + var start = recorder.Flushed.Single(o => o.Action == "START"); + Assert.Equal("arn:aws:lambda:us-east-1:123456789012:function:no-version", + start.ChainedInvokeOptions.FunctionName); + } + + #endregion + + #region Fresh execution + + [Fact] + public async Task InvokeAsync_FreshExecution_CheckpointsStartAndSuspends() + { + var (context, recorder, tm, _) = CreateContext(); + + var task = context.InvokeAsync( + FunctionArn, + new RequestPayload { Amount = 100, Currency = "USD" }, + name: "process_payment", + config: new InvokeConfig { TenantId = "tenant-A" }); + + // Service-side suspend mechanics: TerminationManager fires before the + // user task completes; the task itself never resolves on the fresh path. + await Task.Delay(20); + Assert.True(tm.IsTerminated); + Assert.False(task.IsCompleted); + + await recorder.Batcher.DrainAsync(); + + var start = recorder.Flushed.Single(); + Assert.Equal("CHAINED_INVOKE", start.Type); + Assert.Equal("START", start.Action); + Assert.Equal("Invoke", start.SubType); + Assert.Equal("process_payment", start.Name); + Assert.Equal(IdAt(1), start.Id); + + // Payload is JSON-serialized via reflection (default). + Assert.Contains("\"Amount\":100", start.Payload); + Assert.Contains("\"Currency\":\"USD\"", start.Payload); + + // ChainedInvokeOptions carries function name + tenant id. + Assert.NotNull(start.ChainedInvokeOptions); + Assert.Equal(FunctionArn, start.ChainedInvokeOptions.FunctionName); + Assert.Equal("tenant-A", start.ChainedInvokeOptions.TenantId); + } + + [Fact] + public async Task InvokeAsync_FreshExecution_NoTenantId_OmitsTenantId() + { + var (context, recorder, tm, _) = CreateContext(); + + var task = context.InvokeAsync(FunctionArn, "payload", name: "no_tenant"); + + await Task.Delay(20); + Assert.True(tm.IsTerminated); + Assert.False(task.IsCompleted); + + await recorder.Batcher.DrainAsync(); + + var start = recorder.Flushed.Single(); + Assert.NotNull(start.ChainedInvokeOptions); + Assert.Equal(FunctionArn, start.ChainedInvokeOptions.FunctionName); + // null tenant means the SDK didn't set the field; the AWS SDK model's + // IsSet property is what callers actually inspect, but the easy + // deterministic assertion is that the property is null. + Assert.Null(start.ChainedInvokeOptions.TenantId); + } + + [Fact] + public async Task InvokeAsync_FreshExecution_StartIsSyncFlushed() + { + // Critical correctness invariant: START must be flushed BEFORE we + // suspend. A queued-but-unflushed START is "the service doesn't know + // about the chained invocation," so the parent suspends forever. + var (context, recorder, tm, _) = CreateContext(); + + var task = context.InvokeAsync(FunctionArn, "x", name: "sync_flush"); + await Task.Delay(20); + + Assert.True(tm.IsTerminated); + Assert.False(task.IsCompleted); + + // No DrainAsync — the START must already be flushed at the moment + // suspension is signaled. This mirrors WaitOperation_NewExecution_SignalsTermination's + // contract: TerminationManager firing implies the matching START is durable. + Assert.Single(recorder.Flushed); + Assert.Equal("START", recorder.Flushed[0].Action); + } + + [Fact] + public async Task InvokeAsync_TerminationReason_IsInvokePending() + { + var (context, _, tm, _) = CreateContext(); + + _ = context.InvokeAsync(FunctionArn, "x", name: "reason_check"); + var termination = await tm.TerminationTask; + + Assert.Equal(TerminationReason.InvokePending, termination.Reason); + } + + #endregion + + #region Replay — terminal status mapping + + [Fact] + public async Task InvokeAsync_ReplaySucceeded_ReturnsCachedResultWithoutRescheduling() + { + var (context, recorder, tm, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.ChainedInvoke, + Status = OperationStatuses.Succeeded, + Name = "cached", + ChainedInvokeDetails = new ChainedInvokeDetails + { + Result = "{\"OrderId\":\"abc\",\"Total\":42}" + } + } + } + }); + + var result = await context.InvokeAsync( + FunctionArn, "x", name: "cached"); + + Assert.False(tm.IsTerminated); + Assert.Equal("abc", result.OrderId); + Assert.Equal(42, result.Total); + + await recorder.Batcher.DrainAsync(); + Assert.Empty(recorder.Flushed); + } + + [Fact] + public async Task InvokeAsync_ReplayFailed_ThrowsInvokeFailedException() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.ChainedInvoke, + Status = OperationStatuses.Failed, + Name = "boom", + ChainedInvokeDetails = new ChainedInvokeDetails + { + Error = new ErrorObject + { + ErrorType = "System.InvalidOperationException", + ErrorMessage = "downstream exploded", + ErrorData = "{\"detail\":\"x\"}", + StackTrace = new[] { "at A.B()", "at C.D()" } + } + } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.InvokeAsync(FunctionArn, "x", name: "boom")); + + Assert.Equal("downstream exploded", ex.Message); + Assert.Equal(FunctionArn, ex.FunctionName); + Assert.Equal("System.InvalidOperationException", ex.ErrorType); + Assert.Equal("{\"detail\":\"x\"}", ex.ErrorData); + Assert.NotNull(ex.OriginalStackTrace); + Assert.Equal(2, ex.OriginalStackTrace!.Count); + + // Subclass relationship — `catch (InvokeException)` catches all three. + Assert.IsAssignableFrom(ex); + } + + [Fact] + public async Task InvokeAsync_ReplayTimedOut_ThrowsInvokeTimedOutException() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.ChainedInvoke, + Status = OperationStatuses.TimedOut, + Name = "slow", + ChainedInvokeDetails = new ChainedInvokeDetails + { + Error = new ErrorObject + { + ErrorMessage = "execution timed out after 60s" + } + } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.InvokeAsync(FunctionArn, "x", name: "slow")); + + Assert.Equal("execution timed out after 60s", ex.Message); + Assert.Equal(FunctionArn, ex.FunctionName); + Assert.IsAssignableFrom(ex); + } + + [Fact] + public async Task InvokeAsync_ReplayStopped_ThrowsInvokeStoppedException() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.ChainedInvoke, + Status = OperationStatuses.Stopped, + Name = "stopped" + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.InvokeAsync(FunctionArn, "x", name: "stopped")); + + // No recorded ErrorMessage → fallback default. + Assert.Equal("Chained invoke was stopped.", ex.Message); + Assert.Equal(FunctionArn, ex.FunctionName); + Assert.IsAssignableFrom(ex); + } + + [Fact] + public async Task InvokeAsync_ReplayStarted_ResuspendsWithoutRecheckpoint() + { + // Service hasn't reached terminal yet. The original START is still + // authoritative; do not re-emit, just suspend. + var (context, recorder, tm, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.ChainedInvoke, + Status = OperationStatuses.Started, + Name = "still_running" + } + } + }); + + var task = context.InvokeAsync(FunctionArn, "x", name: "still_running"); + await Task.Delay(20); + + Assert.True(tm.IsTerminated); + Assert.False(task.IsCompleted); + + // Crucially: no checkpoint was emitted. Original START is authoritative. + Assert.Empty(recorder.Flushed); + } + + [Fact] + public async Task InvokeAsync_ReplayPending_ResuspendsWithoutRecheckpoint() + { + var (context, recorder, tm, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.ChainedInvoke, + Status = OperationStatuses.Pending, + Name = "pending" + } + } + }); + + var task = context.InvokeAsync(FunctionArn, "x", name: "pending"); + await Task.Delay(20); + + Assert.True(tm.IsTerminated); + Assert.False(task.IsCompleted); + Assert.Empty(recorder.Flushed); + } + + [Fact] + public async Task InvokeAsync_ReplayUnknownStatus_ThrowsNonDeterministicException() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.ChainedInvoke, + Status = "TOTALLY_BOGUS", + Name = "mystery" + } + } + }); + + await Assert.ThrowsAsync(() => + context.InvokeAsync(FunctionArn, "x", name: "mystery")); + } + + [Fact] + public async Task InvokeAsync_ReplayTypeMismatch_ThrowsNonDeterministicException() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, // wrong type + Status = OperationStatuses.Succeeded, + Name = "kept_consistent", + StepDetails = new StepDetails { Result = "\"x\"" } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.InvokeAsync(FunctionArn, "x", name: "kept_consistent")); + + Assert.Contains("expected type 'CHAINED_INVOKE'", ex.Message); + Assert.Contains("found 'STEP'", ex.Message); + } + + #endregion + + #region Serialization + + [Fact] + public async Task InvokeAsync_AotOverload_UsesCustomPayloadSerializer() + { + var (context, recorder, tm, _) = CreateContext(); + + var payloadSerializer = new RecordingPayloadSerializer(); + var resultSerializer = new RecordingResultSerializer(); + + var task = context.InvokeAsync( + FunctionArn, + new RequestPayload { Amount = 7, Currency = "EUR" }, + payloadSerializer, + resultSerializer, + name: "custom_payload"); + + await Task.Delay(20); + Assert.True(tm.IsTerminated); + Assert.False(task.IsCompleted); + + Assert.True(payloadSerializer.SerializeCalled); + Assert.False(resultSerializer.DeserializeCalled); + + await recorder.Batcher.DrainAsync(); + var start = recorder.Flushed.Single(); + Assert.Equal("7,EUR", start.Payload); + } + + [Fact] + public async Task InvokeAsync_AotOverload_UsesCustomResultSerializerOnReplay() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.ChainedInvoke, + Status = OperationStatuses.Succeeded, + Name = "custom_result", + ChainedInvokeDetails = new ChainedInvokeDetails + { + Result = "order-99,500" + } + } + } + }); + + var payloadSerializer = new RecordingPayloadSerializer(); + var resultSerializer = new RecordingResultSerializer(); + + var result = await context.InvokeAsync( + FunctionArn, + new RequestPayload { Amount = 1, Currency = "USD" }, + payloadSerializer, + resultSerializer, + name: "custom_result"); + + Assert.False(payloadSerializer.SerializeCalled); // replay, no fresh serialize + Assert.True(resultSerializer.DeserializeCalled); + Assert.Equal("order-99", result.OrderId); + Assert.Equal(500, result.Total); + } + + [Fact] + public async Task InvokeAsync_ReflectionSerializer_DeserializesResultViaJson() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.ChainedInvoke, + Status = OperationStatuses.Succeeded, + Name = "json_result", + ChainedInvokeDetails = new ChainedInvokeDetails + { + Result = "{\"OrderId\":\"o-7\",\"Total\":1024}" + } + } + } + }); + + var result = await context.InvokeAsync( + FunctionArn, + new RequestPayload { Amount = 1, Currency = "USD" }, + name: "json_result"); + + Assert.Equal("o-7", result.OrderId); + Assert.Equal(1024, result.Total); + } + + #endregion + + #region End-to-end suspension / resume parity + + [Fact] + public async Task EndToEnd_StepInvokeStep_FirstInvocation_SuspendsOnInvoke() + { + var tm = new TerminationManager(); + var state = new ExecutionState(); + state.LoadFromCheckpoint(null); + var idGen = new OperationIdGenerator(); + var lambdaContext = new TestLambdaContext(); + var batcher = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, batcher.Batcher); + + var result = await DurableExecutionHandler.RunAsync( + state, tm, + async () => + { + await context.StepAsync(async (_) => { await Task.CompletedTask; return "validated"; }, name: "validate"); + var paymentId = await context.InvokeAsync( + FunctionArn, "validated", name: "process_payment"); + return await context.StepAsync(async (_) => { await Task.CompletedTask; return paymentId + "-done"; }, name: "finalize"); + }); + + Assert.Equal(InvocationStatus.Pending, result.Status); + + await batcher.Batcher.DrainAsync(); + Assert.Contains(batcher.Flushed, o => o.Type == "CHAINED_INVOKE" && o.Action == "START"); + Assert.DoesNotContain(batcher.Flushed, o => o.Type == "STEP" && o.Name == "finalize"); + } + + [Fact] + public async Task EndToEnd_StepInvokeStep_SecondInvocation_ResumesAndCompletes() + { + var tm = new TerminationManager(); + var state = new ExecutionState(); + state.LoadFromCheckpoint(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, + Status = OperationStatuses.Succeeded, + Name = "validate", + StepDetails = new StepDetails { Result = "\"validated\"" } + }, + new() + { + Id = IdAt(2), + Type = OperationTypes.ChainedInvoke, + Status = OperationStatuses.Succeeded, + Name = "process_payment", + ChainedInvokeDetails = new ChainedInvokeDetails { Result = "\"pmt-42\"" } + } + } + }); + + var idGen = new OperationIdGenerator(); + var lambdaContext = new TestLambdaContext(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext); + var finalizeRan = false; + + var result = await DurableExecutionHandler.RunAsync( + state, tm, + async () => + { + var validated = await context.StepAsync(async (_) => { await Task.CompletedTask; return "fresh-validated"; }, name: "validate"); + Assert.Equal("validated", validated); // cached + + var paymentId = await context.InvokeAsync( + FunctionArn, validated, name: "process_payment"); + Assert.Equal("pmt-42", paymentId); // cached + + return await context.StepAsync(async (_) => + { + finalizeRan = true; + await Task.CompletedTask; + return paymentId + "-done"; + }, name: "finalize"); + }); + + Assert.Equal(InvocationStatus.Succeeded, result.Status); + Assert.Equal("pmt-42-done", result.Result); + Assert.True(finalizeRan); + } + + #endregion + + #region Test-only types + + private class RequestPayload + { + public int Amount { get; set; } + public string? Currency { get; set; } + } + + private class ResponsePayload + { + public string? OrderId { get; set; } + public long Total { get; set; } + } + + private sealed class StringPassthroughSerializer : ICheckpointSerializer + { + public string Serialize(string value, SerializationContext context) => value; + public string Deserialize(string data, SerializationContext context) => data; + } + + private sealed class RecordingPayloadSerializer : ICheckpointSerializer + { + public bool SerializeCalled { get; private set; } + public bool DeserializeCalled { get; private set; } + + public string Serialize(RequestPayload value, SerializationContext context) + { + SerializeCalled = true; + return $"{value.Amount},{value.Currency}"; + } + + public RequestPayload Deserialize(string data, SerializationContext context) + { + DeserializeCalled = true; + var inner = data.Replace("", "").Replace("", ""); + var parts = inner.Split(','); + return new RequestPayload { Amount = int.Parse(parts[0]), Currency = parts[1] }; + } + } + + private sealed class RecordingResultSerializer : ICheckpointSerializer + { + public bool SerializeCalled { get; private set; } + public bool DeserializeCalled { get; private set; } + + public string Serialize(ResponsePayload value, SerializationContext context) + { + SerializeCalled = true; + return $"{value.OrderId},{value.Total}"; + } + + public ResponsePayload Deserialize(string data, SerializationContext context) + { + DeserializeCalled = true; + var inner = data.Replace("", "").Replace("", ""); + var parts = inner.Split(','); + return new ResponsePayload { OrderId = parts[0], Total = long.Parse(parts[1]) }; + } + } + + #endregion +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/LambdaDurableServiceClientTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/LambdaDurableServiceClientTests.cs index 287937dec..e860f4805 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/LambdaDurableServiceClientTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/LambdaDurableServiceClientTests.cs @@ -231,6 +231,105 @@ public async Task GetExecutionStateAsync_CopiesContextDetailsResultAndError() Assert.Equal("boom", operations[1].ContextDetails!.Error!.ErrorMessage); } + [Fact] + public async Task GetExecutionStateAsync_MapFromSdkOperation_RoundTripsAllErrorFields() + { + // Pre-existing bug guard: MapFromSdkOperation used to drop ErrorData + // and StackTrace from the SDK error object, so the durable exception + // builders (StepException, ChildContextException, and the + // InvokeException tree) always saw nulls for those fields on + // real-service replay. This test pins down the fix for all three + // operation types that carry an error. + var stack = new List { "at Frame.One()", "at Frame.Two()" }; + + var mockClient = new MockLambdaClient + { + GetExecutionStateHandler = _ => new GetDurableExecutionStateResponse + { + Operations = new List + { + new Operation + { + Id = "step-1", + Type = "STEP", + Status = "FAILED", + StepDetails = new Amazon.Lambda.Model.StepDetails + { + Error = new SdkErrorObject + { + ErrorType = "System.InvalidOperationException", + ErrorMessage = "step blew up", + ErrorData = "{\"detail\":\"step\"}", + StackTrace = stack + } + } + }, + new Operation + { + Id = "ctx-1", + Type = "CONTEXT", + Status = "FAILED", + ContextDetails = new Amazon.Lambda.Model.ContextDetails + { + Error = new SdkErrorObject + { + ErrorType = "System.ArgumentException", + ErrorMessage = "ctx blew up", + ErrorData = "{\"detail\":\"ctx\"}", + StackTrace = stack + } + } + }, + new Operation + { + Id = "inv-1", + Type = "CHAINED_INVOKE", + Status = "FAILED", + ChainedInvokeDetails = new Amazon.Lambda.Model.ChainedInvokeDetails + { + Error = new SdkErrorObject + { + ErrorType = "System.TimeoutException", + ErrorMessage = "invoke blew up", + ErrorData = "{\"detail\":\"invoke\"}", + StackTrace = stack + } + } + } + } + } + }; + var client = new LambdaDurableServiceClient(mockClient); + + var (operations, _) = await client.GetExecutionStateAsync("arn", "tok", "marker"); + + Assert.Equal(3, operations.Count); + + // STEP — all four fields propagate. + var stepError = operations[0].StepDetails!.Error!; + Assert.Equal("System.InvalidOperationException", stepError.ErrorType); + Assert.Equal("step blew up", stepError.ErrorMessage); + Assert.Equal("{\"detail\":\"step\"}", stepError.ErrorData); + Assert.NotNull(stepError.StackTrace); + Assert.Equal(new[] { "at Frame.One()", "at Frame.Two()" }, stepError.StackTrace!); + + // CHILD CONTEXT — all four fields propagate. + var ctxError = operations[1].ContextDetails!.Error!; + Assert.Equal("System.ArgumentException", ctxError.ErrorType); + Assert.Equal("ctx blew up", ctxError.ErrorMessage); + Assert.Equal("{\"detail\":\"ctx\"}", ctxError.ErrorData); + Assert.NotNull(ctxError.StackTrace); + Assert.Equal(new[] { "at Frame.One()", "at Frame.Two()" }, ctxError.StackTrace!); + + // CHAINED_INVOKE — all four fields propagate. + var invError = operations[2].ChainedInvokeDetails!.Error!; + Assert.Equal("System.TimeoutException", invError.ErrorType); + Assert.Equal("invoke blew up", invError.ErrorMessage); + Assert.Equal("{\"detail\":\"invoke\"}", invError.ErrorData); + Assert.NotNull(invError.StackTrace); + Assert.Equal(new[] { "at Frame.One()", "at Frame.Two()" }, invError.StackTrace!); + } + [Fact] public async Task CheckpointAsync_ReturnsNewToken() {