Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Configuration for a child context.
/// </summary>
/// <remarks>
/// A child context is a logical sub-workflow with its own deterministic
/// operation-ID space, persisted as a <c>CONTEXT</c> operation. Use
/// <see cref="IDurableContext.RunInChildContextAsync{T}(System.Func{IDurableContext, System.Threading.Tasks.Task{T}}, string?, ChildContextConfig?, System.Threading.CancellationToken)"/>
/// (and overloads) to run code inside one.
/// </remarks>
public sealed class ChildContextConfig
{
/// <summary>
/// Operation sub-type label for observability (e.g. <c>"WaitForCallback"</c>).
/// Surfaces on the wire <c>OperationUpdate.SubType</c> field.
/// </summary>
public string? SubType { get; set; }

/// <summary>
/// Optional function to transform exceptions thrown by the child context's
/// user function before they surface to the caller. Useful for wrapping
/// low-level errors into domain-specific exceptions.
/// </summary>
/// <remarks>
/// Applied when the user function throws (the mapped exception propagates
/// to the caller of <c>RunInChildContextAsync</c>) and on replay of a
/// <c>FAILED</c> child context (the constructed
/// <see cref="ChildContextException"/> is mapped before being thrown).
/// </remarks>
public Func<Exception, Exception>? ErrorMapping { get; set; }
}
54 changes: 54 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,60 @@ public Task WaitAsync(
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

[RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
[RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
public Task<T> RunInChildContextAsync<T>(
Func<IDurableContext, Task<T>> func,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default)
=> RunChildContext(func, new ReflectionJsonCheckpointSerializer<T>(), name, config, cancellationToken);

public Task<T> RunInChildContextAsync<T>(
Func<IDurableContext, Task<T>> func,
ICheckpointSerializer<T> serializer,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default)
=> RunChildContext(func, serializer, name, config, cancellationToken);

public async Task RunInChildContextAsync(
Func<IDurableContext, Task> func,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default)
{
// Void child contexts don't carry a meaningful payload; wrap with a
// null-only serializer that doesn't touch reflection.
await RunChildContext<object?>(
async (ctx) => { await func(ctx); return null; },
NullCheckpointSerializer.Instance,
name, config, cancellationToken);
}

private Task<T> RunChildContext<T>(
Func<IDurableContext, Task<T>> func,
ICheckpointSerializer<T> serializer,
string? name,
ChildContextConfig? config,
CancellationToken cancellationToken)
{
var operationId = _idGenerator.NextId();

// Capture this DurableContext's collaborators; the child shares state,
// termination, batcher, ARN, and Lambda context — but uses a child
// OperationIdGenerator so its operation IDs are deterministically
// namespaced under the parent op ID.
IDurableContext ChildFactory(string parentOpId) => new DurableContext(
_state, _terminationManager, _idGenerator.CreateChild(parentOpId),
_durableExecutionArn, LambdaContext, _batcher);

var op = new ChildContextOperation<T>(
operationId, name, func, config, serializer, ChildFactory,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,31 @@ public StepException(string message) : base(message) { }
/// <summary>Creates a <see cref="StepException"/> wrapping an inner exception.</summary>
public StepException(string message, Exception innerException) : base(message, innerException) { }
}

/// <summary>
/// Thrown when a child context's user function fails. Surfaces from
/// <c>RunInChildContextAsync</c>; the underlying error is preserved on the
/// <see cref="ErrorType"/>/<see cref="ErrorData"/>/<see cref="OriginalStackTrace"/>
/// fields. Use <see cref="ChildContextConfig.ErrorMapping"/> to remap into a
/// domain-specific exception.
/// </summary>
public class ChildContextException : DurableExecutionException
{
/// <summary>
/// The child context's <see cref="ChildContextConfig.SubType"/>, if any.
/// </summary>
public string? SubType { get; init; }
/// <summary>The fully-qualified type name of the original exception.</summary>
public string? ErrorType { get; init; }
/// <summary>Optional structured error data attached by the user.</summary>
public string? ErrorData { get; init; }
/// <summary>Stack trace of the original exception, captured before serialization.</summary>
public IReadOnlyList<string>? OriginalStackTrace { get; init; }

/// <summary>Creates an empty <see cref="ChildContextException"/>.</summary>
public ChildContextException() { }
/// <summary>Creates a <see cref="ChildContextException"/> with the given message.</summary>
public ChildContextException(string message) : base(message) { }
/// <summary>Creates a <see cref="ChildContextException"/> wrapping an inner exception.</summary>
public ChildContextException(string message, Exception innerException) : base(message, innerException) { }
}
46 changes: 46 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,52 @@ Task WaitAsync(
TimeSpan duration,
string? name = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Run a user function inside a logical sub-workflow (a "child context").
/// The child has its own deterministic operation-ID space; its result is
/// checkpointed as a <c>CONTEXT</c> operation so subsequent invocations
/// replay the cached value without re-executing the func.
/// </summary>
/// <remarks>
/// Use child contexts to group related durable operations (e.g. a step plus
/// a wait plus a step) into a single observability/error-handling boundary.
/// On failure, surfaces as <see cref="ChildContextException"/>; supply
/// <see cref="ChildContextConfig.ErrorMapping"/> to remap into a
/// domain-specific exception.
/// The child context's return value is serialized to a checkpoint using
/// reflection-based <c>System.Text.Json</c>. For NativeAOT or trimmed
/// deployments, use the overload that takes an
/// <see cref="ICheckpointSerializer{T}"/>.
/// </remarks>
[RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
[RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
Task<T> RunInChildContextAsync<T>(
Func<IDurableContext, Task<T>> func,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Run a user function inside a child context with AOT-safe checkpoint
/// serialization. The supplied <paramref name="serializer"/> is used in
/// place of reflection-based JSON.
/// </summary>
Task<T> RunInChildContextAsync<T>(
Func<IDurableContext, Task<T>> func,
ICheckpointSerializer<T> serializer,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Run a user function inside a child context that returns no value.
/// </summary>
Task RunInChildContextAsync(
Func<IDurableContext, Task> func,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
using SdkErrorObject = Amazon.Lambda.Model.ErrorObject;
using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate;

namespace Amazon.Lambda.DurableExecution.Internal;

/// <summary>
/// Durable child context operation. Runs a user-supplied function inside a
/// nested <see cref="DurableContext"/> with its own deterministic operation-ID
/// space, persisting the function's result so subsequent invocations replay
/// the cached value without re-executing.
/// </summary>
/// <remarks>
/// Replay branches — example: <c>await ctx.RunInChildContextAsync(child =&gt; ..., name: "phase")</c>
/// <list type="bullet">
/// <item><b>Fresh</b>: no prior state → sync-flush CONTEXT START → run user
/// func → on success emit CONTEXT SUCCEED → on failure emit CONTEXT FAIL
/// and throw <see cref="ChildContextException"/>.</item>
/// <item><b>SUCCEEDED</b>: return cached deserialized result; user func is
/// NOT re-executed.</item>
/// <item><b>FAILED</b>: throw <see cref="ChildContextException"/> with the
/// recorded error; if <see cref="ChildContextConfig.ErrorMapping"/> is
/// set, the mapped exception is thrown instead.</item>
/// <item><b>STARTED</b> / <b>PENDING</b>: re-run the user func without
/// re-checkpointing START. The child's own operations recover from their
/// own checkpoints, so this is replay propagation; if a wait/callback
/// inside the child is still pending, the user func re-suspends.</item>
/// </list>
/// Unlike <see cref="StepOperation{T}"/>, child contexts have no retry strategy:
/// failure is terminal and surfaces immediately via
/// <see cref="ChildContextException"/>.
/// </remarks>
internal sealed class ChildContextOperation<T> : DurableOperation<T>
{
private readonly Func<IDurableContext, Task<T>> _func;
private readonly ChildContextConfig? _config;
private readonly ICheckpointSerializer<T> _serializer;
private readonly Func<string, IDurableContext> _childContextFactory;

public ChildContextOperation(
string operationId,
string? name,
Func<IDurableContext, Task<T>> func,
ChildContextConfig? config,
ICheckpointSerializer<T> serializer,
Func<string, IDurableContext> childContextFactory,
ExecutionState state,
TerminationManager termination,
string durableExecutionArn,
CheckpointBatcher? batcher = null)
: base(operationId, name, state, termination, durableExecutionArn, batcher)
{
_func = func;
_config = config;
_serializer = serializer;
_childContextFactory = childContextFactory;
}

protected override string OperationType => OperationTypes.Context;

protected override async Task<T> StartAsync(CancellationToken cancellationToken)
{
// Sync-flush CONTEXT START before user code so the service has a record
// of the parent context if the inner func suspends (e.g. a Wait inside
// the child terminates the workflow before SUCCEED is reached).
await EnqueueAsync(new SdkOperationUpdate
{
Id = OperationId,
Type = OperationTypes.Context,
Action = "START",
SubType = _config?.SubType,
Name = Name
}, cancellationToken);

return await ExecuteFunc(cancellationToken);
}

protected override Task<T> ReplayAsync(Operation existing, CancellationToken cancellationToken)
{
switch (existing.Status)
{
case OperationStatuses.Succeeded:
// Side-effecting code runs at most once: replay returns the
// cached result without invoking the user func.
return Task.FromResult(DeserializeResult(existing.ContextDetails?.Result));

case OperationStatuses.Failed:
throw MapFailureException(BuildChildContextException(existing));

case OperationStatuses.Started:
case OperationStatuses.Pending:
// Re-run the user func: the child's own operations replay from
// their own checkpoints. Do NOT re-checkpoint START — the
// original is still authoritative. If something inside the
// child is still pending (Wait, callback, retry) the user func
// will re-suspend on its own.
return ExecuteFunc(cancellationToken);

default:
throw new NonDeterministicExecutionException(
$"Child context operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay.");
}
}

private async Task<T> ExecuteFunc(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

var childContext = _childContextFactory(OperationId);

T result;
try
{
result = await _func(childContext);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
await EnqueueAsync(new SdkOperationUpdate
{
Id = OperationId,
Type = OperationTypes.Context,
Action = "FAIL",
SubType = _config?.SubType,
Name = Name,
Error = ToSdkError(ex)
}, cancellationToken);

throw MapFailureException(new ChildContextException(ex.Message, ex)
{
SubType = _config?.SubType,
ErrorType = ex.GetType().FullName
});
}

await EnqueueAsync(new SdkOperationUpdate
{
Id = OperationId,
Type = OperationTypes.Context,
Action = "SUCCEED",
SubType = _config?.SubType,
Name = Name,
Payload = SerializeResult(result)
}, cancellationToken);

return result;
}

private Exception MapFailureException(ChildContextException ex)
{
var mapper = _config?.ErrorMapping;
if (mapper == null) return ex;

var mapped = mapper(ex);
return mapped ?? ex;
}

private ChildContextException BuildChildContextException(Operation failedOp)
{
var err = failedOp.ContextDetails?.Error;
return new ChildContextException(err?.ErrorMessage ?? "Child context failed")
{
SubType = failedOp.SubType ?? _config?.SubType,
ErrorType = err?.ErrorType,
ErrorData = err?.ErrorData,
OriginalStackTrace = err?.StackTrace
};
}

private T DeserializeResult(string? serialized)
{
if (serialized == null) return default!;
return _serializer.Deserialize(serialized, new SerializationContext(OperationId, DurableExecutionArn));
}

private string SerializeResult(T value)
=> _serializer.Serialize(value, new SerializationContext(OperationId, DurableExecutionArn));

private static SdkErrorObject ToSdkError(Exception ex) => new()
{
ErrorType = ex.GetType().FullName,
ErrorMessage = ex.Message,
StackTrace = ex.StackTrace?.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries).ToList()
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ private static Internal.Operation MapFromSdkOperation(SdkOperation sdkOp)
ExecutionDetails = sdkOp.ExecutionDetails != null ? new Internal.ExecutionDetails
{
InputPayload = sdkOp.ExecutionDetails.InputPayload
} : null,
ContextDetails = sdkOp.ContextDetails != null ? new Internal.ContextDetails
{
Result = sdkOp.ContextDetails.Result,
Error = sdkOp.ContextDetails.Error != null ? new ErrorObject
{
ErrorType = sdkOp.ContextDetails.Error.ErrorType,
ErrorMessage = sdkOp.ContextDetails.Error.ErrorMessage
} : null
} : null
};
}
Expand Down
Loading
Loading