Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions Docs/durable-execution-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -1614,15 +1614,29 @@ public class CallbackException : DurableExecutionException
}

/// <summary>
/// Thrown when an invoked function fails.
/// Base exception for chained-invoke failures. Catch <c>InvokeException</c>
/// to handle every non-success terminal state uniformly, or pattern-match the
/// concrete subclasses (<c>InvokeFailedException</c>, <c>InvokeTimedOutException</c>,
/// <c>InvokeStoppedException</c>) to react differently to specific outcomes.
/// Mirrors the Java SDK's invoke exception tree.
/// </summary>
public class InvokeException : DurableExecutionException
{
public string? FunctionName { get; }
public string? ErrorType { get; }
public string? ErrorData { get; }
public string? FunctionName { get; init; }
public string? ErrorType { get; init; }
public string? ErrorData { get; init; }
public IReadOnlyList<string>? OriginalStackTrace { get; init; }
}

/// <summary>The chained function ran and threw.</summary>
public class InvokeFailedException : InvokeException { }

/// <summary>The chained function did not complete within the configured (or service) timeout.</summary>
public class InvokeTimedOutException : InvokeException { }

/// <summary>The chained execution was stopped by the service before reaching a normal terminal state.</summary>
public class InvokeStoppedException : InvokeException { }

/// <summary>
/// Thrown when a child context operation fails.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Configuration for chained invoke operations.
/// </summary>
/// <remarks>
/// Use with <see cref="IDurableContext.InvokeAsync{TPayload, TResult}(string, TPayload, string?, InvokeConfig?, System.Threading.CancellationToken)"/>
/// (and the AOT-safe overload) to configure a single chained invocation. Custom
/// payload/result serializers are supplied via the AOT-safe <c>InvokeAsync</c>
/// overload — there are intentionally no serializer fields here, matching the
/// pattern established by <see cref="StepConfig"/>.
/// </remarks>
public sealed class InvokeConfig
{
/// <summary>
/// Reserved for a future SDK/service contract; not currently honored.
/// </summary>
/// <remarks>
/// The AWS Lambda model <c>ChainedInvokeOptions</c> in
/// <c>AWSSDK.Lambda</c> 4.0.13.1 does not expose a per-invocation timeout
/// field, so this property is not propagated to the durable execution
/// service today. It is kept on the configuration surface so that callers
/// who set it now will pick up the intended behavior automatically once a
/// wire field is added (or via a follow-up SDK update); the service's
/// own service-level timeout still applies in the meantime. Track at
/// <see href="https://issues.amazon.com/issues/DOTNET-8661"/>.
/// </remarks>
[Obsolete("Timeout is reserved for a future SDK/service contract; not currently honored. Tracked at https://issues.amazon.com/issues/DOTNET-8661.")]
public TimeSpan Timeout { get; set; } = TimeSpan.Zero;

/// <summary>
/// Optional tenant identifier propagated to the chained invocation via
/// <c>ChainedInvokeOptions.TenantId</c>. Used to route the invocation to a
/// tenant-isolated function. Matches the <c>tenantId</c> field on the
/// Python, JavaScript, and Java SDKs.
/// </summary>
public string? TenantId { get; set; }
}
59 changes: 59 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,65 @@ private Task<T> RunChildContext<T>(
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

[RequiresUnreferencedCode("Reflection-based JSON for TPayload/TResult. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")]
[RequiresDynamicCode("Reflection-based JSON for TPayload/TResult. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")]
public Task<TResult> InvokeAsync<TPayload, TResult>(
string functionName,
TPayload payload,
string? name = null,
InvokeConfig? config = null,
CancellationToken cancellationToken = default)
=> RunInvoke(
functionName, payload,
new ReflectionJsonCheckpointSerializer<TPayload>(),
new ReflectionJsonCheckpointSerializer<TResult>(),
name, config, cancellationToken);

public Task<TResult> InvokeAsync<TPayload, TResult>(
string functionName,
TPayload payload,
ICheckpointSerializer<TPayload> payloadSerializer,
ICheckpointSerializer<TResult> resultSerializer,
string? name = null,
InvokeConfig? config = null,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(payloadSerializer);
ArgumentNullException.ThrowIfNull(resultSerializer);
return RunInvoke(
functionName, payload,
payloadSerializer, resultSerializer,
name, config, cancellationToken);
}

private Task<TResult> RunInvoke<TPayload, TResult>(
string functionName,
TPayload payload,
ICheckpointSerializer<TPayload> payloadSerializer,
ICheckpointSerializer<TResult> resultSerializer,
string? name,
InvokeConfig? config,
CancellationToken cancellationToken)
{
// Argument validation runs synchronously at the call site (matches the
// .NET convention of failing fast for misuse). Match Python/JS/Java
// parity: only check for null/empty here; the durable execution service
// enforces the qualified-ARN rule and surfaces a precise error when an
// unqualified identifier is used.
ArgumentNullException.ThrowIfNull(functionName);
if (string.IsNullOrWhiteSpace(functionName))
throw new ArgumentException("Function name must not be empty or whitespace.", nameof(functionName));

cancellationToken.ThrowIfCancellationRequested();

var operationId = _idGenerator.NextId();
var op = new InvokeOperation<TPayload, TResult>(
operationId, name, functionName, payload, config,
payloadSerializer, resultSerializer,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Thrown when a chained invoke operation reaches a non-success terminal state.
/// </summary>
/// <remarks>
/// Base class for the invoke exception tree. Catch <see cref="InvokeException"/>
/// to handle every chained-invoke failure mode uniformly, or pattern-match the
/// concrete subclasses to react differently to specific outcomes:
/// <list type="bullet">
/// <item><see cref="InvokeFailedException"/> — the chained function threw.</item>
/// <item><see cref="InvokeTimedOutException"/> — the configured (or service)
/// timeout elapsed before completion.</item>
/// <item><see cref="InvokeStoppedException"/> — the chained execution was
/// stopped by the service or an operator.</item>
/// </list>
/// Mirrors the Java SDK's <c>InvokeException</c> / <c>InvokeFailedException</c>
/// / <c>InvokeTimedOutException</c> / <c>InvokeStoppedException</c> tree; the
/// .NET SDK keeps <see cref="InvokeException"/> non-abstract so callers can also
/// rethrow it directly when wrapping fallback logic.
/// </remarks>
public class InvokeException : DurableExecutionException
{
/// <summary>The fully-qualified name of the invoked function (ARN, alias, or version).</summary>
public string? FunctionName { get; init; }

/// <summary>The fully-qualified type name of the original exception, when known.</summary>
public string? ErrorType { get; init; }

/// <summary>Optional structured error data attached by the invoked function.</summary>
public string? ErrorData { get; init; }

/// <summary>Stack trace of the original exception, captured before serialization.</summary>
public IReadOnlyList<string>? OriginalStackTrace { get; init; }

/// <summary>Creates an empty <see cref="InvokeException"/>.</summary>
public InvokeException() { }
/// <summary>Creates an <see cref="InvokeException"/> with the given message.</summary>
public InvokeException(string message) : base(message) { }
/// <summary>Creates an <see cref="InvokeException"/> wrapping an inner exception.</summary>
public InvokeException(string message, Exception innerException) : base(message, innerException) { }
}

/// <summary>
/// Thrown when a chained invoke operation completes with status <c>FAILED</c> —
/// the invoked function ran and threw.
/// </summary>
public class InvokeFailedException : InvokeException
{
/// <summary>Creates an empty <see cref="InvokeFailedException"/>.</summary>
public InvokeFailedException() { }
/// <summary>Creates an <see cref="InvokeFailedException"/> with the given message.</summary>
public InvokeFailedException(string message) : base(message) { }
/// <summary>Creates an <see cref="InvokeFailedException"/> wrapping an inner exception.</summary>
public InvokeFailedException(string message, Exception innerException) : base(message, innerException) { }
}

/// <summary>
/// Thrown when a chained invoke operation completes with status <c>TIMED_OUT</c>
/// — the invocation did not complete within the service-level timeout.
/// </summary>
public class InvokeTimedOutException : InvokeException
{
/// <summary>Creates an empty <see cref="InvokeTimedOutException"/>.</summary>
public InvokeTimedOutException() { }
/// <summary>Creates an <see cref="InvokeTimedOutException"/> with the given message.</summary>
public InvokeTimedOutException(string message) : base(message) { }
/// <summary>Creates an <see cref="InvokeTimedOutException"/> wrapping an inner exception.</summary>
public InvokeTimedOutException(string message, Exception innerException) : base(message, innerException) { }
}

/// <summary>
/// Thrown when a chained invoke operation completes with status <c>STOPPED</c>
/// — the invocation was stopped administratively by the durable execution
/// service before reaching a normal terminal state.
/// </summary>
public class InvokeStoppedException : InvokeException
{
/// <summary>Creates an empty <see cref="InvokeStoppedException"/>.</summary>
public InvokeStoppedException() { }
/// <summary>Creates an <see cref="InvokeStoppedException"/> with the given message.</summary>
public InvokeStoppedException(string message) : base(message) { }
/// <summary>Creates an <see cref="InvokeStoppedException"/> wrapping an inner exception.</summary>
public InvokeStoppedException(string message, Exception innerException) : base(message, innerException) { }
}
71 changes: 71 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,77 @@ Task RunInChildContextAsync(
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Invoke another durable Lambda function and await its result. The
/// invocation is checkpointed so it survives parent failures and is not
/// double-fired on replay.
/// </summary>
/// <remarks>
/// <para>
/// The chained function runs out-of-process: the SDK checkpoints a
/// <c>CHAINED_INVOKE START</c> with the supplied <paramref name="payload"/>
/// and suspends; the durable execution service runs the target function
/// asynchronously and re-invokes the parent workflow when it completes.
/// On resume, the cached result is deserialized and returned, or the
/// recorded failure surfaces as an <see cref="InvokeException"/> subclass.
/// </para>
/// <para>
/// <paramref name="functionName"/> must be a qualified identifier (version,
/// alias, or <c>$LATEST</c>). Unqualified ARNs are rejected by the durable
/// execution service. The SDK only validates that the value is non-null
/// and non-empty; ARN shape validation is left to the service to keep
/// behavior consistent with the Python, JavaScript, and Java SDKs.
/// </para>
/// <para>
/// The payload is serialized to a checkpoint using reflection-based
/// <c>System.Text.Json</c>; the result is deserialized the same way. For
/// NativeAOT or trimmed deployments, use the overload that takes
/// <see cref="ICheckpointSerializer{T}"/> parameters for both payload and
/// result.
/// </para>
/// </remarks>
/// <typeparam name="TPayload">The payload type sent to the chained function.</typeparam>
/// <typeparam name="TResult">The result type returned by the chained function.</typeparam>
/// <exception cref="ArgumentNullException"><paramref name="functionName"/> is null.</exception>
/// <exception cref="ArgumentException"><paramref name="functionName"/> is empty or whitespace.</exception>
/// <exception cref="InvokeFailedException">The chained function threw.</exception>
/// <exception cref="InvokeTimedOutException">The chained function did not complete within the configured timeout.</exception>
/// <exception cref="InvokeStoppedException">The chained execution was stopped by the service.</exception>
[RequiresUnreferencedCode("Reflection-based JSON for TPayload/TResult. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")]
[RequiresDynamicCode("Reflection-based JSON for TPayload/TResult. Use the ICheckpointSerializer overload for AOT/trimmed deployments.")]
Task<TResult> InvokeAsync<TPayload, TResult>(
string functionName,
TPayload payload,
string? name = null,
InvokeConfig? config = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Invoke another durable Lambda function with AOT-safe checkpoint
/// serialization. The supplied payload and result serializers are used in
/// place of reflection-based JSON.
/// </summary>
/// <remarks>
/// Two positional serializers are required (one for the payload, one for
/// the result) — mirroring the StepAsync AOT-safe overload's pattern of
/// taking <see cref="ICheckpointSerializer{T}"/> as a positional parameter.
/// </remarks>
/// <typeparam name="TPayload">The payload type sent to the chained function.</typeparam>
/// <typeparam name="TResult">The result type returned by the chained function.</typeparam>
/// <exception cref="ArgumentNullException"><paramref name="functionName"/>, <paramref name="payloadSerializer"/>, or <paramref name="resultSerializer"/> is null.</exception>
/// <exception cref="ArgumentException"><paramref name="functionName"/> is empty or whitespace.</exception>
/// <exception cref="InvokeFailedException">The chained function threw.</exception>
/// <exception cref="InvokeTimedOutException">The chained function did not complete within the configured timeout.</exception>
/// <exception cref="InvokeStoppedException">The chained execution was stopped by the service.</exception>
Task<TResult> InvokeAsync<TPayload, TResult>(
string functionName,
TPayload payload,
ICheckpointSerializer<TPayload> payloadSerializer,
ICheckpointSerializer<TResult> resultSerializer,
string? name = null,
InvokeConfig? config = null,
CancellationToken cancellationToken = default);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,6 @@ private static bool IsTerminalStatus(string? status) =>
status == OperationStatuses.Succeeded
|| status == OperationStatuses.Failed
|| status == OperationStatuses.Cancelled
|| status == OperationStatuses.Stopped;
|| status == OperationStatuses.Stopped
|| status == OperationStatuses.TimedOut;
}
Loading
Loading