Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 196 additions & 42 deletions Docs/durable-execution-design.md

Large diffs are not rendered by default.

62 changes: 61 additions & 1 deletion Libraries/Libraries.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 18
VisualStudioVersion = 18.5.11709.299 stable
VisualStudioVersion = 18.5.11709.299
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{AAB54E74-20B1-42ED-BC3D-CE9F7BC7FD12}"
EndProject
Expand Down Expand Up @@ -155,6 +155,14 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ResponseStreamingFunctionHa
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AspNetCoreStreamingApiGatewayTest", "test\Amazon.Lambda.RuntimeSupport.Tests\AspNetCoreStreamingApiGatewayTest\AspNetCoreStreamingApiGatewayTest.csproj", "{0768FA72-CF49-2B59-BC4C-E4CE579E5D93}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Amazon.Lambda.DurableExecution", "src\Amazon.Lambda.DurableExecution\Amazon.Lambda.DurableExecution.csproj", "{9097B5A4-E100-47FD-A676-0B666A36FAFF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Amazon.Lambda.DurableExecution.Tests", "test\Amazon.Lambda.DurableExecution.Tests\Amazon.Lambda.DurableExecution.Tests.csproj", "{57150BA6-3826-431F-8F58-B1D11FAFC5D4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Amazon.Lambda.DurableExecution.IntegrationTests", "test\Amazon.Lambda.DurableExecution.IntegrationTests\Amazon.Lambda.DurableExecution.IntegrationTests.csproj", "{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Amazon.Lambda.DurableExecution.AotPublishTest", "test\Amazon.Lambda.DurableExecution.AotPublishTest\Amazon.Lambda.DurableExecution.AotPublishTest.csproj", "{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -969,6 +977,54 @@ Global
{0768FA72-CF49-2B59-BC4C-E4CE579E5D93}.Release|x64.Build.0 = Release|Any CPU
{0768FA72-CF49-2B59-BC4C-E4CE579E5D93}.Release|x86.ActiveCfg = Release|Any CPU
{0768FA72-CF49-2B59-BC4C-E4CE579E5D93}.Release|x86.Build.0 = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|x64.ActiveCfg = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|x64.Build.0 = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|x86.ActiveCfg = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|x86.Build.0 = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|Any CPU.Build.0 = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|x64.ActiveCfg = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|x64.Build.0 = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|x86.ActiveCfg = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|x86.Build.0 = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|x64.ActiveCfg = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|x64.Build.0 = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|x86.ActiveCfg = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|x86.Build.0 = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|Any CPU.Build.0 = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|x64.ActiveCfg = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|x64.Build.0 = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|x86.ActiveCfg = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|x86.Build.0 = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|x64.ActiveCfg = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|x64.Build.0 = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|x86.ActiveCfg = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|x86.Build.0 = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|Any CPU.Build.0 = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|x64.ActiveCfg = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|x64.Build.0 = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|x86.ActiveCfg = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|x86.Build.0 = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|x64.ActiveCfg = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|x64.Build.0 = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|x86.ActiveCfg = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|x86.Build.0 = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|Any CPU.Build.0 = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|x64.ActiveCfg = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|x64.Build.0 = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|x86.ActiveCfg = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1045,6 +1101,10 @@ Global
{80594C21-C6EB-469E-83CC-68F9F661CA5E} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9} = {B5BD0336-7D08-492C-8489-42C987E29B39}
{0768FA72-CF49-2B59-BC4C-E4CE579E5D93} = {B5BD0336-7D08-492C-8489-42C987E29B39}
{9097B5A4-E100-47FD-A676-0B666A36FAFF} = {AAB54E74-20B1-42ED-BC3D-CE9F7BC7FD12}
{57150BA6-3826-431F-8F58-B1D11FAFC5D4} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {503678A4-B8D1-4486-8915-405A3E9CF0EB}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
<EnableTrimAnalyzer>true</EnableTrimAnalyzer>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<WarningsAsErrors>IL2026,IL2067,IL2075,IL3050</WarningsAsErrors>
</PropertyGroup>

<ItemGroup>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace Amazon.Lambda.DurableExecution;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather not do the pattern of putting classes in folders that don't match the namespace. Eventually somebody will add a file in the folder and the IDE will default to matching the folder.

For this file I'm indifferent if you want to move this to the parent folder or update the using statement. Especially since the file isn't really defined yet so not sure how common you expect users to use this.


/// <summary>
/// Configuration for step execution.
/// </summary>
public sealed class StepConfig
{
// TODO: Retry support is deferred to a follow-up PR. When added, this is
// where RetryStrategy and Semantics (AtLeastOncePerRetry / AtMostOncePerRetry)
// will live. The follow-up needs to use service-mediated retries (checkpoint
// a RETRY operation + suspend the Lambda) rather than an in-process Task.Delay
// loop, to avoid billing Lambda compute time during retry backoff.
}
147 changes: 147 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
using System.Diagnostics.CodeAnalysis;
using Amazon.Lambda.Core;
using Amazon.Lambda.DurableExecution.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Implementation of <see cref="IDurableContext"/>. Constructs and dispatches
/// per-operation classes (<see cref="StepOperation{T}"/>, <see cref="WaitOperation"/>);
/// the replay logic lives in those classes.
/// </summary>
internal sealed class DurableContext : IDurableContext
{
private readonly ExecutionState _state;
private readonly TerminationManager _terminationManager;
private readonly OperationIdGenerator _idGenerator;
private readonly string _durableExecutionArn;
private readonly CheckpointBatcher? _batcher;

public DurableContext(
ExecutionState state,
TerminationManager terminationManager,
OperationIdGenerator idGenerator,
string durableExecutionArn,
ILambdaContext lambdaContext,
CheckpointBatcher? batcher = null)
{
_state = state;
_terminationManager = terminationManager;
_idGenerator = idGenerator;
_durableExecutionArn = durableExecutionArn;
_batcher = batcher;
LambdaContext = lambdaContext;
}

// Replay-safe logger ships in a follow-up PR; see IDurableContext.Logger doc.
public ILogger Logger => NullLogger.Instance;
Comment thread
GarrettBeatty marked this conversation as resolved.
public IExecutionContext ExecutionContext => new DurableExecutionContext(_durableExecutionArn);
public ILambdaContext LambdaContext { get; }
Comment thread
GarrettBeatty marked this conversation as resolved.

[RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
[RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
public Task<T> StepAsync<T>(
Func<IStepContext, Task<T>> func,
string? name = null,
StepConfig? config = null,
CancellationToken cancellationToken = default)
=> RunStep(func, new ReflectionJsonCheckpointSerializer<T>(), name, config, cancellationToken);

public async Task StepAsync(
Func<IStepContext, Task> func,
string? name = null,
StepConfig? config = null,
CancellationToken cancellationToken = default)
{
// Void steps don't carry a meaningful payload; we wrap with a null-only
// serializer that doesn't touch reflection.
await RunStep<object?>(
async (ctx) => { await func(ctx); return null; },
NullCheckpointSerializer.Instance,
name, config, cancellationToken);
}

public Task<T> StepAsync<T>(
Func<IStepContext, Task<T>> func,
ICheckpointSerializer<T> serializer,
string? name = null,
StepConfig? config = null,
CancellationToken cancellationToken = default)
=> RunStep(func, serializer, name, config, cancellationToken);


private Task<T> RunStep<T>(
Func<IStepContext, Task<T>> func,
ICheckpointSerializer<T> serializer,
string? name,
StepConfig? config,
CancellationToken cancellationToken)
{
var operationId = _idGenerator.NextId();
var op = new StepOperation<T>(
operationId, name, func, config, serializer, Logger,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

public Task WaitAsync(
TimeSpan duration,
string? name = null,
CancellationToken cancellationToken = default)
{
// Service timer granularity is 1 second; sub-second waits would round to 0.
// WaitOptions.WaitSeconds is integer in [1, 31_622_400] (1 second to ~1 year).
if (duration < TimeSpan.FromSeconds(1))
throw new ArgumentOutOfRangeException(nameof(duration), duration, "Wait duration must be at least 1 second.");

if (duration > TimeSpan.FromSeconds(31_622_400))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be validating this on our end?

throw new ArgumentOutOfRangeException(nameof(duration), duration, "Wait duration must be at most 31,622,400 seconds (~1 year).");

cancellationToken.ThrowIfCancellationRequested();

var operationId = _idGenerator.NextId();
var waitSeconds = (int)Math.Max(1, Math.Ceiling(duration.TotalSeconds));
Comment thread
GarrettBeatty marked this conversation as resolved.
var op = new WaitOperation(
operationId, name, waitSeconds,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}
}

/// <summary>
/// Trim-safe serializer used by the void <c>StepAsync</c> overloads, which never
/// carry a meaningful payload. Always serializes to <c>"null"</c> and discards
/// on deserialize.
/// </summary>
internal sealed class NullCheckpointSerializer : ICheckpointSerializer<object?>
{
public static NullCheckpointSerializer Instance { get; } = new();
public string Serialize(object? value, SerializationContext context) => "null";
public object? Deserialize(string data, SerializationContext context) => null;
}

internal sealed class DurableExecutionContext : IExecutionContext
{
public DurableExecutionContext(string durableExecutionArn)
{
DurableExecutionArn = durableExecutionArn;
}

public string DurableExecutionArn { get; }
}

internal sealed class StepContext : IStepContext
{
public StepContext(string operationId, int attemptNumber, ILogger logger)
{
OperationId = operationId;
AttemptNumber = attemptNumber;
Logger = logger;
}

public ILogger Logger { get; }
public int AttemptNumber { get; }
public string OperationId { get; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
using Amazon.Lambda.DurableExecution.Internal;

namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// The result of running a durable execution handler.
/// </summary>
internal sealed class HandlerResult<TResult>
{
public required InvocationStatus Status { get; init; }
public TResult? Result { get; init; }
public string? Message { get; init; }
public Exception? Exception { get; init; }
}

/// <summary>
/// Core orchestration engine for durable execution. Races user code against
/// a termination signal using Task.WhenAny. When user code completes, returns
/// SUCCEEDED/FAILED. When termination wins (wait, callback, invoke), returns PENDING.
/// </summary>
internal static class DurableExecutionHandler
{
/// <summary>
/// Runs the user's workflow function within the durable execution engine.
/// </summary>
/// <remarks>
/// <para>
/// Suspension flow — example: <c>await ctx.WaitAsync(TimeSpan.FromSeconds(5))</c>:
/// </para>
/// <code>
/// user code DurableContext TerminationMgr RunAsync
/// ───────── ────────────── ────────────── ────────
/// WaitAsync(5s) ─────► queue WAIT START
/// checkpoint
/// Terminate() ──────► TerminationTask
/// completes
/// ◄────── new TCS().Task
/// (never completes)
/// await blocks
/// forever WhenAny:
/// ── termination wins
/// ── userTask abandoned
/// ── return Pending
/// </code>
/// <para>
/// Key insight: <c>WaitAsync</c> never returns a completed Task — it hands back
/// a TaskCompletionSource that is never resolved. The user's <c>await</c> blocks
/// indefinitely. The escape signal is <c>terminationManager.Terminate()</c>,
/// which <c>Task.WhenAny</c> picks up. We return Pending; the dangling user
/// Task is GC'd. The service flushes checkpoints, fires the wait timer, then
/// re-invokes Lambda — on replay, <c>WaitAsync</c> sees the matching SUCCEED
/// checkpoint and returns <c>Task.CompletedTask</c> normally.
/// </para>
/// <para>
/// The same pattern applies to retries (<c>RetryScheduled</c>), callbacks
/// (<c>CallbackPending</c>), and chained invokes (<c>InvokePending</c>).
/// </para>
/// </remarks>
/// <typeparam name="TResult">The workflow return type.</typeparam>
/// <param name="executionState">Hydrated execution state from prior invocations.</param>
/// <param name="terminationManager">Manages the suspension signal.</param>
/// <param name="userHandler">The user's workflow function receiving a DurableContext.</param>
/// <returns>The handler result indicating SUCCEEDED, FAILED, or PENDING.</returns>
internal static async Task<HandlerResult<TResult>> RunAsync<TResult>(
ExecutionState executionState,
TerminationManager terminationManager,
Func<Task<TResult>> userHandler)
{
// Run user code on a threadpool thread so it executes independently of
// the termination signal. When TerminationManager fires (e.g., WaitAsync),
// we need the WhenAny race below to resolve immediately without waiting
// for the user task to reach an await point.
var userTask = Task.Run(userHandler);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason for this is imagine the user had

 async Task<TestResult> Workflow(TestEvent input, IDurableContext ctx)
  {
      // Imagine the user does CPU work or sync I/O before any await:
      Thread.Sleep(2000);                                // or a long compute loop
      await ctx.WaitAsync(TimeSpan.FromSeconds(5));      // first real await
      ...
  }

If we called userHandler() directly instead of Task.Run(userHandler):

var userTask = userHandler(); // ← starts running RIGHT HERE, synchronously
var winner = await Task.WhenAny(userTask, terminationManager.TerminationTask);

The userHandler() invocation runs synchronously up to the first real await. If the user sleeps, blocks on sync I/O, or does any non-yielding work first, we don't even reach the await Task.WhenAny(...) line yet. The wrapper is stuck inside the user's call.


// Race: user code completing vs. termination signal (wait/callback/retry).
// If termination wins, we return PENDING and the abandoned userTask is never awaited.
var winner = await Task.WhenAny(userTask, terminationManager.TerminationTask);

if (winner == terminationManager.TerminationTask)
{
var terminationResult = await terminationManager.TerminationTask;

if (terminationResult.Exception != null)
{
return new HandlerResult<TResult>
{
Status = InvocationStatus.Failed,
Message = terminationResult.Exception.Message,
Exception = terminationResult.Exception
};
}

return new HandlerResult<TResult>
{
Status = InvocationStatus.Pending,
Message = terminationResult.Message
};
}

try
{
var result = await userTask;
return new HandlerResult<TResult>
{
Status = InvocationStatus.Succeeded,
Result = result
};
}
catch (Exception ex)
{
return new HandlerResult<TResult>
{
Status = InvocationStatus.Failed,
Message = ex.Message,
Exception = ex
};
}
}
}
Loading
Loading