Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using Amazon.Lambda.Core;
using Amazon.Lambda.DurableExecution.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace Amazon.Lambda.DurableExecution;

Expand All @@ -18,6 +17,7 @@ internal sealed class DurableContext : IDurableContext
private readonly OperationIdGenerator _idGenerator;
private readonly string _durableExecutionArn;
private readonly CheckpointBatcher? _batcher;
private ILogger _logger;

public DurableContext(
ExecutionState state,
Expand All @@ -33,13 +33,25 @@ public DurableContext(
_durableExecutionArn = durableExecutionArn;
_batcher = batcher;
LambdaContext = lambdaContext;
_logger = new ReplayAwareLogger(new LambdaCoreLogger(), state, modeAware: true);
}

// Replay-safe logger ships in a follow-up PR; see IDurableContext.Logger doc.
public ILogger Logger => NullLogger.Instance;
public ILogger Logger => _logger;
public IExecutionContext ExecutionContext => new DurableExecutionContext(_durableExecutionArn);
public ILambdaContext LambdaContext { get; }

public void ConfigureLogger(LoggerConfig config)
{
if (config == null) throw new ArgumentNullException(nameof(config));

// If the user supplies a CustomLogger, wrap it. Otherwise re-wrap the
// existing inner logger (unwrapping if it was already a ReplayAwareLogger)
// so toggling ModeAware works without losing the previous custom logger.
var inner = config.CustomLogger
?? (_logger is ReplayAwareLogger existing ? existing.Inner : _logger);
_logger = new ReplayAwareLogger(inner, _state, config.ModeAware);
}

[RequiresUnreferencedCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
[RequiresDynamicCode("Reflection-based JSON for T. Use the ICheckpointSerializer<T> overload for AOT/trimmed deployments.")]
public Task<T> StepAsync<T>(
Expand Down
18 changes: 15 additions & 3 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Amazon.Lambda.DurableExecution.Services;
using Amazon.Lambda.Model;
using Amazon.Runtime;
using Microsoft.Extensions.Logging;

namespace Amazon.Lambda.DurableExecution;

Expand Down Expand Up @@ -180,9 +181,20 @@ private static async Task<DurableExecutionInvocationOutput> WrapAsyncCore<TInput
HandlerResult<TOutput> result;
try
{
result = await DurableExecutionHandler.RunAsync<TOutput>(
state, terminationManager,
async () => await workflow(userPayload, context));
// Push execution-level metadata into a logging scope so structured
// providers (the runtime's JSON formatter, Serilog, Powertools,
// etc.) tag every log line emitted by user code with the
// execution ARN and request id.
using (context.Logger.BeginScope(new Dictionary<string, object>
{
["durableExecutionArn"] = invocationInput.DurableExecutionArn,
["awsRequestId"] = lambdaContext.AwsRequestId ?? string.Empty,
}))
{
result = await DurableExecutionHandler.RunAsync<TOutput>(
state, terminationManager,
async () => await workflow(userPayload, context));
}

await batcher.DrainAsync();
}
Expand Down
21 changes: 16 additions & 5 deletions Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,21 @@ namespace Amazon.Lambda.DurableExecution;
public interface IDurableContext
{
/// <summary>
/// A logger scoped to the durable execution. Currently returns
/// <see cref="Microsoft.Extensions.Logging.Abstractions.NullLogger.Instance"/>;
/// the replay-safe <c>DurableLogger</c> (suppresses messages during replay)
/// ships in a follow-up PR.
/// Replay-safe logger. Messages emitted while the workflow is re-deriving
/// prior operations from checkpointed state are suppressed by default, so
/// a 30-step workflow re-invoked 30 times still emits each line once.
/// Use this instead of <c>Console.WriteLine</c> or other ambient loggers,
/// which will repeat on every replay. Replace the underlying logger or
/// disable replay-aware filtering via <see cref="ConfigureLogger"/>.
/// </summary>
ILogger Logger { get; }

/// <summary>
/// Swap the underlying logger or toggle replay-aware filtering. Idempotent —
/// later calls overwrite earlier configuration.
/// </summary>
void ConfigureLogger(LoggerConfig config);

/// <summary>
/// Metadata about the current durable execution.
/// </summary>
Expand Down Expand Up @@ -81,7 +89,10 @@ Task WaitAsync(
public interface IStepContext
{
/// <summary>
/// Logger scoped to this step.
/// Logger scoped to this step. Same instance as
/// <see cref="IDurableContext.Logger"/>; emits within an
/// <see cref="ILogger.BeginScope{TState}"/> that carries the step's
/// <c>operationId</c>, <c>operationName</c>, and <c>attempt</c>.
/// </summary>
ILogger Logger { get; }

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using Microsoft.Extensions.Logging;
using CoreLambdaLogger = Amazon.Lambda.Core.LambdaLogger;

namespace Amazon.Lambda.DurableExecution.Internal;

/// <summary>
/// Default <see cref="ILogger"/> for <see cref="DurableContext"/>. Routes log
/// records through <see cref="CoreLambdaLogger"/> so they flow into the same
/// pipeline used by the rest of the AWS Lambda for .NET runtime — the runtime
/// host installs a redirector that produces structured JSON when
/// <c>AWS_LAMBDA_LOG_FORMAT=JSON</c> and honors <c>AWS_LAMBDA_LOG_LEVEL</c>.
/// </summary>
/// <remarks>
/// In-package adapter to avoid forcing a dependency on
/// <c>Amazon.Lambda.Logging.AspNetCore</c>; users who want a richer experience
/// (Serilog, Powertools, etc.) can swap their own logger via
/// <see cref="IDurableContext.ConfigureLogger"/>.
///
/// Implementation note: we always pass the pre-rendered message to
/// <see cref="CoreLambdaLogger.Log(string, string, object[])"/> rather than the
/// raw template plus args. The runtime's text-mode formatter feeds args through
/// <c>string.Format</c>, which throws on named placeholders (<c>{OrderId}</c>);
/// rendering up-front via the supplied formatter substitutes them correctly
/// regardless of the runtime log format.
/// </remarks>
internal sealed class LambdaCoreLogger : ILogger
{
private static readonly NullScope SharedScope = new();

public IDisposable? BeginScope<TState>(TState state) where TState : notnull => SharedScope;

// Level filtering is performed by the runtime layer (AWS_LAMBDA_LOG_LEVEL).
public bool IsEnabled(LogLevel logLevel) => logLevel != LogLevel.None;

public void Log<TState>(
LogLevel logLevel,
EventId eventId,
TState state,
Exception? exception,
Func<TState, Exception?, string> formatter)
{
if (!IsEnabled(logLevel)) return;

var message = formatter(state, exception);
var levelName = logLevel.ToString();

if (exception != null)
{
CoreLambdaLogger.Log(levelName, exception, message);
}
else
{
CoreLambdaLogger.Log(levelName, message);
}
}

private sealed class NullScope : IDisposable
{
public void Dispose() { }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using Microsoft.Extensions.Logging;

namespace Amazon.Lambda.DurableExecution.Internal;

/// <summary>
/// <see cref="ILogger"/> decorator that suppresses messages while the workflow
/// is replaying prior operations. Reads <see cref="ExecutionState.IsReplaying"/>
/// on every call so it correctly transitions to passthrough the moment the
/// state's per-operation tracker decides we've caught up to fresh execution.
/// </summary>
/// <remarks>
/// Mirrors the suppression behavior of the Python and Java durable execution
/// SDKs: replay <see cref="Log{TState}"/> calls return without invoking the
/// inner logger. <see cref="BeginScope{TState}"/> always delegates so scopes
/// stay balanced — suppression only applies at log emission.
/// </remarks>
internal sealed class ReplayAwareLogger : ILogger
{
private readonly ILogger _inner;
private readonly ExecutionState _state;
private readonly bool _modeAware;

public ReplayAwareLogger(ILogger inner, ExecutionState state, bool modeAware)
{
_inner = inner;
_state = state;
_modeAware = modeAware;
}

/// <summary>The wrapped logger; exposed so <c>ConfigureLogger</c> can rewrap without losing it.</summary>
public ILogger Inner => _inner;

/// <summary>Whether replay suppression is active.</summary>
public bool ModeAware => _modeAware;

public IDisposable? BeginScope<TState>(TState state) where TState : notnull
=> _inner.BeginScope(state);

public bool IsEnabled(LogLevel logLevel)
{
if (ShouldSuppress()) return false;
return _inner.IsEnabled(logLevel);
}

public void Log<TState>(
LogLevel logLevel,
EventId eventId,
TState state,
Exception? exception,
Func<TState, Exception?, string> formatter)
{
if (ShouldSuppress()) return;
_inner.Log(logLevel, eventId, state, exception, formatter);
}

private bool ShouldSuppress() => _modeAware && _state.IsReplaying;
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,21 @@ private async Task<T> ExecuteFunc(int attemptNumber, CancellationToken cancellat
try
{
var stepContext = new StepContext(OperationId, attemptNumber, _logger);
var result = await _func(stepContext);

// Step-scoped metadata so structured log providers tag user code
// lines with the operation id, name, and current attempt. Wrap
// only the user-func call — checkpoint emission shouldn't carry
// step metadata into any side-channel logging.
T result;
using (_logger.BeginScope(new Dictionary<string, object>
{
["operationId"] = OperationId,
["operationName"] = Name ?? string.Empty,
["attempt"] = attemptNumber,
}))
{
result = await _func(stepContext);
}

await EnqueueAsync(new SdkOperationUpdate
{
Expand Down
24 changes: 24 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/LoggerConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using Microsoft.Extensions.Logging;

namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Configuration for <see cref="IDurableContext.ConfigureLogger"/>. Lets users
/// swap the underlying <see cref="ILogger"/> (e.g. Serilog, AWS Lambda Powertools)
/// or disable replay-aware filtering for debugging.
/// </summary>
public sealed class LoggerConfig
{
/// <summary>
/// Optional <see cref="ILogger"/> to use instead of the SDK default. When
/// null, the durable context keeps its existing inner logger.
/// </summary>
public ILogger? CustomLogger { get; init; }

/// <summary>
/// When true (default), messages are suppressed while the workflow is
/// re-deriving prior operations from checkpointed state. Set to false to
/// see every log line on every replay (useful for local debugging).
/// </summary>
public bool ModeAware { get; init; } = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<PackageReference Include="AWSSDK.ECR" Version="4.0.7" />
<PackageReference Include="AWSSDK.Lambda" Version="4.0.13.1" />
<PackageReference Include="AWSSDK.SecurityToken" Version="4.0.6.3" />
<PackageReference Include="AWSSDK.CloudWatchLogs" Version="4.0.20" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="18.5.1" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3" />
Expand Down
Loading
Loading