diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Config/ChildContextConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Config/ChildContextConfig.cs new file mode 100644 index 000000000..7840211fc --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Config/ChildContextConfig.cs @@ -0,0 +1,32 @@ +namespace Amazon.Lambda.DurableExecution; + +/// +/// Configuration for a child context. +/// +/// +/// A child context is a logical sub-workflow with its own deterministic +/// operation-ID space, persisted as a CONTEXT operation. Use +/// +/// (and overloads) to run code inside one. +/// +public sealed class ChildContextConfig +{ + /// + /// Operation sub-type label for observability (e.g. "WaitForCallback"). + /// Surfaces on the wire OperationUpdate.SubType field. + /// + public string? SubType { get; set; } + + /// + /// Optional function to transform exceptions thrown by the child context's + /// user function before they surface to the caller. Useful for wrapping + /// low-level errors into domain-specific exceptions. + /// + /// + /// Applied when the user function throws (the mapped exception propagates + /// to the caller of RunInChildContextAsync) and on replay of a + /// FAILED child context (the constructed + /// is mapped before being thrown). + /// + public Func? ErrorMapping { get; set; } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs index 87a874c2d..69e5e580c 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs @@ -108,6 +108,60 @@ public Task WaitAsync( _state, _terminationManager, _durableExecutionArn, _batcher); return op.ExecuteAsync(cancellationToken); } + + [RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + [RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + public Task RunInChildContextAsync( + Func> func, + string? name = null, + ChildContextConfig? config = null, + CancellationToken cancellationToken = default) + => RunChildContext(func, new ReflectionJsonCheckpointSerializer(), name, config, cancellationToken); + + public Task RunInChildContextAsync( + Func> func, + ICheckpointSerializer serializer, + string? name = null, + ChildContextConfig? config = null, + CancellationToken cancellationToken = default) + => RunChildContext(func, serializer, name, config, cancellationToken); + + public async Task RunInChildContextAsync( + Func func, + string? name = null, + ChildContextConfig? config = null, + CancellationToken cancellationToken = default) + { + // Void child contexts don't carry a meaningful payload; wrap with a + // null-only serializer that doesn't touch reflection. + await RunChildContext( + async (ctx) => { await func(ctx); return null; }, + NullCheckpointSerializer.Instance, + name, config, cancellationToken); + } + + private Task RunChildContext( + Func> func, + ICheckpointSerializer serializer, + string? name, + ChildContextConfig? config, + CancellationToken cancellationToken) + { + var operationId = _idGenerator.NextId(); + + // Capture this DurableContext's collaborators; the child shares state, + // termination, batcher, ARN, and Lambda context — but uses a child + // OperationIdGenerator so its operation IDs are deterministically + // namespaced under the parent op ID. + IDurableContext ChildFactory(string parentOpId) => new DurableContext( + _state, _terminationManager, _idGenerator.CreateChild(parentOpId), + _durableExecutionArn, LambdaContext, _batcher); + + var op = new ChildContextOperation( + operationId, name, func, config, serializer, ChildFactory, + _state, _terminationManager, _durableExecutionArn, _batcher); + return op.ExecuteAsync(cancellationToken); + } } /// diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/DurableExecutionException.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/DurableExecutionException.cs index 0f724b4a2..d8124a367 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/DurableExecutionException.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Exceptions/DurableExecutionException.cs @@ -47,3 +47,31 @@ public StepException(string message) : base(message) { } /// Creates a wrapping an inner exception. public StepException(string message, Exception innerException) : base(message, innerException) { } } + +/// +/// Thrown when a child context's user function fails. Surfaces from +/// RunInChildContextAsync; the underlying error is preserved on the +/// // +/// fields. Use to remap into a +/// domain-specific exception. +/// +public class ChildContextException : DurableExecutionException +{ + /// + /// The child context's , if any. + /// + public string? SubType { get; init; } + /// The fully-qualified type name of the original exception. + public string? ErrorType { get; init; } + /// Optional structured error data attached by the user. + public string? ErrorData { get; init; } + /// Stack trace of the original exception, captured before serialization. + public IReadOnlyList? OriginalStackTrace { get; init; } + + /// Creates an empty . + public ChildContextException() { } + /// Creates a with the given message. + public ChildContextException(string message) : base(message) { } + /// Creates a wrapping an inner exception. + public ChildContextException(string message, Exception innerException) : base(message, innerException) { } +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs index ff18d1218..eb10a0ffe 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs @@ -73,6 +73,52 @@ Task WaitAsync( TimeSpan duration, string? name = null, CancellationToken cancellationToken = default); + + /// + /// Run a user function inside a logical sub-workflow (a "child context"). + /// The child has its own deterministic operation-ID space; its result is + /// checkpointed as a CONTEXT operation so subsequent invocations + /// replay the cached value without re-executing the func. + /// + /// + /// Use child contexts to group related durable operations (e.g. a step plus + /// a wait plus a step) into a single observability/error-handling boundary. + /// On failure, surfaces as ; supply + /// to remap into a + /// domain-specific exception. + /// The child context's return value is serialized to a checkpoint using + /// reflection-based System.Text.Json. For NativeAOT or trimmed + /// deployments, use the overload that takes an + /// . + /// + [RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + [RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")] + Task RunInChildContextAsync( + Func> func, + string? name = null, + ChildContextConfig? config = null, + CancellationToken cancellationToken = default); + + /// + /// Run a user function inside a child context with AOT-safe checkpoint + /// serialization. The supplied is used in + /// place of reflection-based JSON. + /// + Task RunInChildContextAsync( + Func> func, + ICheckpointSerializer serializer, + string? name = null, + ChildContextConfig? config = null, + CancellationToken cancellationToken = default); + + /// + /// Run a user function inside a child context that returns no value. + /// + Task RunInChildContextAsync( + Func func, + string? name = null, + ChildContextConfig? config = null, + CancellationToken cancellationToken = default); } /// diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs new file mode 100644 index 000000000..521798255 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs @@ -0,0 +1,187 @@ +using SdkErrorObject = Amazon.Lambda.Model.ErrorObject; +using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate; + +namespace Amazon.Lambda.DurableExecution.Internal; + +/// +/// Durable child context operation. Runs a user-supplied function inside a +/// nested with its own deterministic operation-ID +/// space, persisting the function's result so subsequent invocations replay +/// the cached value without re-executing. +/// +/// +/// Replay branches — example: await ctx.RunInChildContextAsync(child => ..., name: "phase") +/// +/// Fresh: no prior state → sync-flush CONTEXT START → run user +/// func → on success emit CONTEXT SUCCEED → on failure emit CONTEXT FAIL +/// and throw . +/// SUCCEEDED: return cached deserialized result; user func is +/// NOT re-executed. +/// FAILED: throw with the +/// recorded error; if is +/// set, the mapped exception is thrown instead. +/// STARTED / PENDING: re-run the user func without +/// re-checkpointing START. The child's own operations recover from their +/// own checkpoints, so this is replay propagation; if a wait/callback +/// inside the child is still pending, the user func re-suspends. +/// +/// Unlike , child contexts have no retry strategy: +/// failure is terminal and surfaces immediately via +/// . +/// +internal sealed class ChildContextOperation : DurableOperation +{ + private readonly Func> _func; + private readonly ChildContextConfig? _config; + private readonly ICheckpointSerializer _serializer; + private readonly Func _childContextFactory; + + public ChildContextOperation( + string operationId, + string? name, + Func> func, + ChildContextConfig? config, + ICheckpointSerializer serializer, + Func childContextFactory, + ExecutionState state, + TerminationManager termination, + string durableExecutionArn, + CheckpointBatcher? batcher = null) + : base(operationId, name, state, termination, durableExecutionArn, batcher) + { + _func = func; + _config = config; + _serializer = serializer; + _childContextFactory = childContextFactory; + } + + protected override string OperationType => OperationTypes.Context; + + protected override async Task StartAsync(CancellationToken cancellationToken) + { + // Sync-flush CONTEXT START before user code so the service has a record + // of the parent context if the inner func suspends (e.g. a Wait inside + // the child terminates the workflow before SUCCEED is reached). + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + Type = OperationTypes.Context, + Action = "START", + SubType = _config?.SubType, + Name = Name + }, cancellationToken); + + return await ExecuteFunc(cancellationToken); + } + + protected override Task ReplayAsync(Operation existing, CancellationToken cancellationToken) + { + switch (existing.Status) + { + case OperationStatuses.Succeeded: + // Side-effecting code runs at most once: replay returns the + // cached result without invoking the user func. + return Task.FromResult(DeserializeResult(existing.ContextDetails?.Result)); + + case OperationStatuses.Failed: + throw MapFailureException(BuildChildContextException(existing)); + + case OperationStatuses.Started: + case OperationStatuses.Pending: + // Re-run the user func: the child's own operations replay from + // their own checkpoints. Do NOT re-checkpoint START — the + // original is still authoritative. If something inside the + // child is still pending (Wait, callback, retry) the user func + // will re-suspend on its own. + return ExecuteFunc(cancellationToken); + + default: + throw new NonDeterministicExecutionException( + $"Child context operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay."); + } + } + + private async Task ExecuteFunc(CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + var childContext = _childContextFactory(OperationId); + + T result; + try + { + result = await _func(childContext); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch (Exception ex) + { + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + Type = OperationTypes.Context, + Action = "FAIL", + SubType = _config?.SubType, + Name = Name, + Error = ToSdkError(ex) + }, cancellationToken); + + throw MapFailureException(new ChildContextException(ex.Message, ex) + { + SubType = _config?.SubType, + ErrorType = ex.GetType().FullName + }); + } + + await EnqueueAsync(new SdkOperationUpdate + { + Id = OperationId, + Type = OperationTypes.Context, + Action = "SUCCEED", + SubType = _config?.SubType, + Name = Name, + Payload = SerializeResult(result) + }, cancellationToken); + + return result; + } + + private Exception MapFailureException(ChildContextException ex) + { + var mapper = _config?.ErrorMapping; + if (mapper == null) return ex; + + var mapped = mapper(ex); + return mapped ?? ex; + } + + private ChildContextException BuildChildContextException(Operation failedOp) + { + var err = failedOp.ContextDetails?.Error; + return new ChildContextException(err?.ErrorMessage ?? "Child context failed") + { + SubType = failedOp.SubType ?? _config?.SubType, + ErrorType = err?.ErrorType, + ErrorData = err?.ErrorData, + OriginalStackTrace = err?.StackTrace + }; + } + + private T DeserializeResult(string? serialized) + { + if (serialized == null) return default!; + return _serializer.Deserialize(serialized, new SerializationContext(OperationId, DurableExecutionArn)); + } + + private string SerializeResult(T value) + => _serializer.Serialize(value, new SerializationContext(OperationId, DurableExecutionArn)); + + private static SdkErrorObject ToSdkError(Exception ex) => new() + { + ErrorType = ex.GetType().FullName, + ErrorMessage = ex.Message, + StackTrace = ex.StackTrace?.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries).ToList() + }; +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs index 709341760..dba534f6b 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Services/LambdaDurableServiceClient.cs @@ -102,6 +102,15 @@ private static Internal.Operation MapFromSdkOperation(SdkOperation sdkOp) ExecutionDetails = sdkOp.ExecutionDetails != null ? new Internal.ExecutionDetails { InputPayload = sdkOp.ExecutionDetails.InputPayload + } : null, + ContextDetails = sdkOp.ContextDetails != null ? new Internal.ContextDetails + { + Result = sdkOp.ContextDetails.Result, + Error = sdkOp.ContextDetails.Error != null ? new ErrorObject + { + ErrorType = sdkOp.ContextDetails.Error.ErrorType, + ErrorMessage = sdkOp.ContextDetails.Error.ErrorMessage + } : null } : null }; } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs new file mode 100644 index 000000000..82cb8eda5 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs @@ -0,0 +1,544 @@ +using Amazon.Lambda.DurableExecution; +using Amazon.Lambda.DurableExecution.Internal; +using Amazon.Lambda.TestUtilities; +using Xunit; + +namespace Amazon.Lambda.DurableExecution.Tests; + +public class ChildContextOperationTests +{ + /// Reproduces the Id that emits for the n-th root-level operation. + private static string IdAt(int position) => OperationIdGenerator.HashOperationId(position.ToString()); + + /// The hashed ID of the n-th child operation under . + private static string ChildIdAt(string parentOpId, int position) => + OperationIdGenerator.HashOperationId($"{parentOpId}-{position}"); + + private static (DurableContext context, RecordingBatcher recorder, TerminationManager tm, ExecutionState state) + CreateContext(InitialExecutionState? initialState = null) + { + var state = new ExecutionState(); + state.LoadFromCheckpoint(initialState); + var tm = new TerminationManager(); + var idGen = new OperationIdGenerator(); + var lambdaContext = new TestLambdaContext(); + var recorder = new RecordingBatcher(); + var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + return (context, recorder, tm, state); + } + + [Fact] + public async Task RunInChildContextAsync_FreshExecution_RunsFuncAndCheckpoints() + { + var (context, recorder, tm, _) = CreateContext(); + + var executed = false; + var result = await context.RunInChildContextAsync( + async (childCtx) => + { + executed = true; + return await childCtx.StepAsync(async (_) => { await Task.CompletedTask; return "inner"; }, name: "inner_step"); + }, + name: "phase"); + + Assert.True(executed); + Assert.Equal("inner", result); + Assert.False(tm.IsTerminated); + + // CONTEXT START → STEP START (fire-and-forget, but flushed before drain) + // → STEP SUCCEED → CONTEXT SUCCEED + await recorder.Batcher.DrainAsync(); + + var actions = recorder.Flushed.Select(o => $"{o.Type}:{o.Action}").ToArray(); + Assert.Equal(new[] + { + "CONTEXT:START", + "STEP:START", + "STEP:SUCCEED", + "CONTEXT:SUCCEED" + }, actions); + + var contextSucceed = recorder.Flushed.Single(o => o.Type == "CONTEXT" && o.Action == "SUCCEED"); + Assert.Equal(IdAt(1), contextSucceed.Id); + Assert.Equal("phase", contextSucceed.Name); + Assert.Equal("\"inner\"", contextSucceed.Payload); + } + + [Fact] + public async Task RunInChildContextAsync_FreshExecution_ChildOperationIdsDeterministic() + { + var (context, recorder, _, _) = CreateContext(); + + await context.RunInChildContextAsync( + async (childCtx) => + { + await childCtx.StepAsync(async (_) => { await Task.CompletedTask; return "a"; }, name: "first"); + await childCtx.StepAsync(async (_) => { await Task.CompletedTask; return "b"; }, name: "second"); + return 0; + }, + name: "phase"); + + await recorder.Batcher.DrainAsync(); + + var parentOpId = IdAt(1); + var firstChildOpId = ChildIdAt(parentOpId, 1); + var secondChildOpId = ChildIdAt(parentOpId, 2); + + var stepStarts = recorder.Flushed.Where(o => o.Type == "STEP" && o.Action == "START").ToArray(); + Assert.Equal(2, stepStarts.Length); + Assert.Equal(firstChildOpId, stepStarts[0].Id); + Assert.Equal(secondChildOpId, stepStarts[1].Id); + } + + [Fact] + public async Task RunInChildContextAsync_ReplaySucceeded_ReturnsCachedAndDoesNotRun() + { + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Context, + Status = OperationStatuses.Succeeded, + Name = "phase", + ContextDetails = new ContextDetails { Result = "\"cached\"" } + } + } + }); + + var executed = false; + var result = await context.RunInChildContextAsync( + async (childCtx) => + { + executed = true; + await Task.CompletedTask; + return "fresh"; + }, + name: "phase"); + + Assert.False(executed); + Assert.Equal("cached", result); + + await recorder.Batcher.DrainAsync(); + Assert.Empty(recorder.Flushed); + } + + [Fact] + public async Task RunInChildContextAsync_ReplayFailed_ThrowsChildContextException() + { + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Context, + Status = OperationStatuses.Failed, + Name = "phase", + SubType = "WaitForCallback", + ContextDetails = new ContextDetails + { + Error = new ErrorObject + { + ErrorType = "System.InvalidOperationException", + ErrorMessage = "child went wrong", + ErrorData = "{\"detail\":\"x\"}", + StackTrace = new[] { "at A.B()", "at C.D()" } + } + } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; return "should not run"; }, + name: "phase")); + + Assert.Equal("child went wrong", ex.Message); + Assert.Equal("System.InvalidOperationException", ex.ErrorType); + Assert.Equal("{\"detail\":\"x\"}", ex.ErrorData); + Assert.Equal("WaitForCallback", ex.SubType); + Assert.NotNull(ex.OriginalStackTrace); + Assert.Equal(2, ex.OriginalStackTrace!.Count); + + await recorder.Batcher.DrainAsync(); + Assert.Empty(recorder.Flushed); + } + + [Fact] + public async Task RunInChildContextAsync_ReplayFailed_AppliesErrorMapping() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Context, + Status = OperationStatuses.Failed, + Name = "phase", + ContextDetails = new ContextDetails + { + Error = new ErrorObject + { + ErrorType = "System.InvalidOperationException", + ErrorMessage = "boom" + } + } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; return "x"; }, + name: "phase", + config: new ChildContextConfig + { + // Mapper sees the ChildContextException and remaps to a + // domain-specific exception, preserving the original via + // InnerException. + ErrorMapping = e => new InvalidOperationException("mapped", e) + })); + + Assert.Equal("mapped", ex.Message); + Assert.IsType(ex.InnerException); + } + + [Fact] + public async Task RunInChildContextAsync_FuncThrows_CheckpointsFailAndThrows() + { + var (context, recorder, _, _) = CreateContext(); + + var ex = await Assert.ThrowsAsync(() => + context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; throw new InvalidOperationException("inner boom"); }, + name: "phase")); + + Assert.Equal("inner boom", ex.Message); + Assert.Equal("System.InvalidOperationException", ex.ErrorType); + + await recorder.Batcher.DrainAsync(); + var contextActions = recorder.Flushed + .Where(o => o.Type == "CONTEXT") + .Select(o => o.Action.ToString()) + .ToArray(); + Assert.Equal(new[] { "START", "FAIL" }, contextActions); + } + + [Fact] + public async Task RunInChildContextAsync_FuncThrows_AppliesErrorMapping() + { + var (context, _, _, _) = CreateContext(); + + var ex = await Assert.ThrowsAsync(() => + context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; throw new TimeoutException("inner timeout"); }, + name: "phase", + config: new ChildContextConfig + { + ErrorMapping = e => new InvalidOperationException("mapped", e) + })); + + Assert.Equal("mapped", ex.Message); + Assert.IsType(ex.InnerException); + } + + [Fact] + public async Task RunInChildContextAsync_ChildSuspendsOnWait_TerminatesWithWaitScheduled() + { + var (context, recorder, tm, _) = CreateContext(); + + // Suspending child: the inner Wait flushes WAIT START sync, then + // returns a never-completing Task via TerminationManager.SuspendAndAwait. + // The outer ChildContextOperation awaits that and never reaches + // CONTEXT SUCCEED. DurableExecutionHandler.RunAsync's WhenAny race + // wins on the termination signal; the test below short-circuits via + // the same TerminationManager.IsTerminated check. + var task = context.RunInChildContextAsync( + async (childCtx) => + { + await childCtx.WaitAsync(TimeSpan.FromSeconds(5), name: "wait_inside"); + return "should not return"; + }, + name: "phase"); + + await Task.Delay(50); + + Assert.True(tm.IsTerminated); + Assert.False(task.IsCompleted); + + // CONTEXT START + WAIT START have flushed; no SUCCEED/FAIL since the + // child is suspended. + var actions = recorder.Flushed.Select(o => $"{o.Type}:{o.Action}").ToArray(); + Assert.Contains("CONTEXT:START", actions); + Assert.Contains("WAIT:START", actions); + Assert.DoesNotContain("CONTEXT:SUCCEED", actions); + Assert.DoesNotContain("CONTEXT:FAIL", actions); + } + + [Fact] + public async Task RunInChildContextAsync_ReplayStarted_ReExecutesFuncWithInnerCacheReplay() + { + var parentOpId = IdAt(1); + var innerStepOpId = ChildIdAt(parentOpId, 1); + + var (context, recorder, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = parentOpId, + Type = OperationTypes.Context, + Status = OperationStatuses.Started, + Name = "phase" + }, + new() + { + Id = innerStepOpId, + Type = OperationTypes.Step, + Status = OperationStatuses.Succeeded, + Name = "inner_step", + StepDetails = new StepDetails { Result = "\"cached_inner\"" } + } + } + }); + + var innerExecuted = false; + var result = await context.RunInChildContextAsync( + async (childCtx) => + { + return await childCtx.StepAsync( + async (_) => { innerExecuted = true; await Task.CompletedTask; return "fresh_inner"; }, + name: "inner_step"); + }, + name: "phase"); + + // The user func re-runs (replay propagation), but its inner step + // replays the cached value without invoking the inner code. + Assert.False(innerExecuted); + Assert.Equal("cached_inner", result); + + await recorder.Batcher.DrainAsync(); + + // Critical: do NOT re-checkpoint CONTEXT START on replay. The original + // STARTED checkpoint is still authoritative. + Assert.DoesNotContain(recorder.Flushed, o => o.Type == "CONTEXT" && o.Action == "START"); + + // The CONTEXT SUCCEED happens only this time, since the user func + // returned successfully. + Assert.Contains(recorder.Flushed, o => o.Type == "CONTEXT" && o.Action == "SUCCEED"); + } + + [Fact] + public async Task RunInChildContextAsync_VoidOverload_RunsAndCheckpoints() + { + var (context, recorder, _, _) = CreateContext(); + + var executed = false; + await context.RunInChildContextAsync( + async (childCtx) => + { + await childCtx.StepAsync( + async (_) => { executed = true; await Task.CompletedTask; }, + name: "inner_void"); + }, + name: "phase"); + + Assert.True(executed); + + await recorder.Batcher.DrainAsync(); + + var actions = recorder.Flushed.Select(o => $"{o.Type}:{o.Action}").ToArray(); + Assert.Equal(new[] + { + "CONTEXT:START", + "STEP:START", + "STEP:SUCCEED", + "CONTEXT:SUCCEED" + }, actions); + + // Void overload uses NullCheckpointSerializer → "null" payload. + var contextSucceed = recorder.Flushed.Single(o => o.Type == "CONTEXT" && o.Action == "SUCCEED"); + Assert.Equal("null", contextSucceed.Payload); + } + + [Fact] + public async Task RunInChildContextAsync_CustomSerializer_UsedForReplay() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Context, + Status = OperationStatuses.Succeeded, + Name = "phase", + ContextDetails = new ContextDetails { Result = "Eve,21" } + } + } + }); + + var serializer = new RecordingPersonSerializer(); + var result = await context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; return new TestPerson { Name = "ignored", Age = 0 }; }, + serializer, + name: "phase"); + + Assert.True(serializer.DeserializeCalled); + Assert.Equal("Eve", result.Name); + Assert.Equal(21, result.Age); + } + + [Fact] + public async Task RunInChildContextAsync_CustomSerializer_UsedForFreshSerialize() + { + var (context, recorder, _, _) = CreateContext(); + + var serializer = new RecordingPersonSerializer(); + var result = await context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; return new TestPerson { Name = "Frank", Age = 33 }; }, + serializer, + name: "phase"); + + Assert.Equal("Frank", result.Name); + Assert.True(serializer.SerializeCalled); + + await recorder.Batcher.DrainAsync(); + + var contextSucceed = recorder.Flushed.Single(o => o.Type == "CONTEXT" && o.Action == "SUCCEED"); + Assert.Equal("Frank,33", contextSucceed.Payload); + } + + [Fact] + public async Task RunInChildContextAsync_ReplayTypeMismatch_ThrowsNonDeterministicException() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Step, // wrong type — should be CONTEXT + Status = OperationStatuses.Succeeded, + Name = "phase", + StepDetails = new StepDetails { Result = "\"x\"" } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; return "x"; }, + name: "phase")); + + Assert.Contains("expected type 'CONTEXT'", ex.Message); + Assert.Contains("found 'STEP'", ex.Message); + } + + [Fact] + public async Task RunInChildContextAsync_ReplayNameMismatch_ThrowsNonDeterministicException() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Context, + Status = OperationStatuses.Succeeded, + Name = "old_name", + ContextDetails = new ContextDetails { Result = "\"x\"" } + } + } + }); + + var ex = await Assert.ThrowsAsync(() => + context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; return "x"; }, + name: "new_name")); + + Assert.Contains("expected name 'new_name'", ex.Message); + Assert.Contains("found 'old_name'", ex.Message); + } + + [Fact] + public async Task RunInChildContextAsync_ReplayUnknownStatus_ThrowsNonDeterministicException() + { + var (context, _, _, _) = CreateContext(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = IdAt(1), + Type = OperationTypes.Context, + Status = "BOGUS", + Name = "phase" + } + } + }); + + await Assert.ThrowsAsync(() => + context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; return "x"; }, + name: "phase")); + } + + [Fact] + public async Task RunInChildContextAsync_SubTypeAndName_PropagateToCheckpoint() + { + var (context, recorder, _, _) = CreateContext(); + + await context.RunInChildContextAsync( + async (_) => { await Task.CompletedTask; return "ok"; }, + name: "phase", + config: new ChildContextConfig { SubType = "WaitForCallback" }); + + await recorder.Batcher.DrainAsync(); + + var contextOps = recorder.Flushed.Where(o => o.Type == "CONTEXT").ToArray(); + Assert.Equal(2, contextOps.Length); + foreach (var op in contextOps) + { + Assert.Equal("WaitForCallback", op.SubType); + Assert.Equal("phase", op.Name); + } + } + + private class TestPerson + { + public string? Name { get; set; } + public int Age { get; set; } + } + + private class RecordingPersonSerializer : ICheckpointSerializer + { + public bool SerializeCalled { get; private set; } + public bool DeserializeCalled { get; private set; } + + public string Serialize(TestPerson value, SerializationContext context) + { + SerializeCalled = true; + return $"{value.Name},{value.Age}"; + } + + public TestPerson Deserialize(string data, SerializationContext context) + { + DeserializeCalled = true; + var inner = data.Replace("", "").Replace("", ""); + var parts = inner.Split(','); + return new TestPerson { Name = parts[0], Age = int.Parse(parts[1]) }; + } + } +} diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs index cc3b9a460..cfef598ae 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs @@ -887,7 +887,7 @@ public async Task StepAsync_ReadyReplay_AdvancesAttemptAndExecutes() Assert.True(executed); Assert.Equal("ok", result); Assert.False(tm.IsTerminated); - Assert.Equal(ExecutionMode.Execution, state.Mode); + Assert.False(state.IsReplaying); } [Fact] diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/LambdaDurableServiceClientTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/LambdaDurableServiceClientTests.cs index 2326f8544..287937dec 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/LambdaDurableServiceClientTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/LambdaDurableServiceClientTests.cs @@ -177,6 +177,60 @@ await client.CheckpointAsync( Assert.Equal("WAIT", call.Updates[1].Type); } + [Fact] + public async Task GetExecutionStateAsync_CopiesContextDetailsResultAndError() + { + var mockClient = new MockLambdaClient + { + GetExecutionStateHandler = _ => new GetDurableExecutionStateResponse + { + Operations = new List + { + new Operation + { + Id = "ctx-1", + Type = "CONTEXT", + Status = "SUCCEEDED", + Name = "phase", + ContextDetails = new Amazon.Lambda.Model.ContextDetails + { + Result = "\"ok\"" + } + }, + new Operation + { + Id = "ctx-2", + Type = "CONTEXT", + Status = "FAILED", + Name = "phase2", + ContextDetails = new Amazon.Lambda.Model.ContextDetails + { + Error = new SdkErrorObject + { + ErrorType = "System.InvalidOperationException", + ErrorMessage = "boom" + } + } + } + } + } + }; + var client = new LambdaDurableServiceClient(mockClient); + + var (operations, _) = await client.GetExecutionStateAsync("arn", "tok", "marker"); + + Assert.Equal(2, operations.Count); + + Assert.NotNull(operations[0].ContextDetails); + Assert.Equal("\"ok\"", operations[0].ContextDetails!.Result); + Assert.Null(operations[0].ContextDetails!.Error); + + Assert.NotNull(operations[1].ContextDetails); + Assert.NotNull(operations[1].ContextDetails!.Error); + Assert.Equal("System.InvalidOperationException", operations[1].ContextDetails!.Error!.ErrorType); + Assert.Equal("boom", operations[1].ContextDetails!.Error!.ErrorMessage); + } + [Fact] public async Task CheckpointAsync_ReturnsNewToken() {