diff --git a/.autover/changes/f0d5a912-bcfa-4244-96cb-ac3c847f877c.json b/.autover/changes/f0d5a912-bcfa-4244-96cb-ac3c847f877c.json new file mode 100644 index 000000000..efbffc4c0 --- /dev/null +++ b/.autover/changes/f0d5a912-bcfa-4244-96cb-ac3c847f877c.json @@ -0,0 +1,27 @@ +{ + "Projects": [ + { + "Name": "Amazon.Lambda.AspNetCoreServer", + "Type": "Major", + "ChangelogMessages": [ + "[Breaking] Update build targets from .NET 6 and 8 to .NET 8 and 10", + "[Preview] Add support Lambda Response Streaming enabled by setting the EnableResponseStreaming property from the base class AbstractAspNetCoreFunction" + ] + }, + { + "Name": "Amazon.Lambda.AspNetCoreServer.Hosting", + "Type": "Major", + "ChangelogMessages": [ + "[Breaking] Update build targets from .NET 6 and 8 to .NET 8 and 10", + "[Preview] Add support Lambda Response Streaming enabled by setting the EnableResponseStreaming property on the HostingOptions object passed into the AddAWSLambdaHosting method" + ] + }, + { + "Name": "Amazon.Lambda.Logging.AspNetCore", + "Type": "Major", + "ChangelogMessages": [ + "[Breaking] Update build targets from .NET 6 and 8 to .NET 8 and 10" + ] + } + ] +} \ No newline at end of file diff --git a/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/Amazon.Lambda.AspNetCoreServer.Hosting.csproj b/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/Amazon.Lambda.AspNetCoreServer.Hosting.csproj index 2ed732314..a22fd248c 100644 --- a/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/Amazon.Lambda.AspNetCoreServer.Hosting.csproj +++ b/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/Amazon.Lambda.AspNetCoreServer.Hosting.csproj @@ -4,7 +4,7 @@ Package for running ASP.NET Core applications using the Minimal API style as a AWS Lambda function. - net6.0;net8.0 + net8.0;net10.0 enable enable 1.10.0 diff --git a/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/HostingOptions.cs b/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/HostingOptions.cs index d4fd7937c..45040b6fc 100644 --- a/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/HostingOptions.cs +++ b/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/HostingOptions.cs @@ -9,6 +9,12 @@ namespace Amazon.Lambda.AspNetCoreServer.Hosting; /// public class HostingOptions { + internal const string ParameterizedPreviewMessage = + "Response streaming is in preview till a new version of .NET Lambda runtime client that supports response streaming " + + "has been deployed to the .NET Lambda managed runtime. Till deployment has been made the feature can be used by deploying as an " + + "executable including the latest version of Amazon.Lambda.RuntimeSupport and setting the \"EnablePreviewFeatures\" in the Lambda " + + "project file to \"true\""; + /// /// The ILambdaSerializer used by Lambda to convert the incoming event JSON into the .NET event type and serialize the .NET response type /// back to JSON to return to Lambda. @@ -27,6 +33,14 @@ public class HostingOptions /// public bool IncludeUnhandledExceptionDetailInResponse { get; set; } = false; + /// + /// When true, the Lambda hosting server will invoke StreamingFunctionHandlerAsync + /// instead of FunctionHandlerAsync, enabling Lambda response streaming. + /// Requires net8.0 or later. + /// + [System.Runtime.Versioning.RequiresPreviewFeatures(ParameterizedPreviewMessage)] + public bool EnableResponseStreaming { get; set; } = false; + /// /// Callback invoked after request marshalling to customize the HTTP request feature. /// Receives the IHttpRequestFeature, Lambda request object, and ILambdaContext. diff --git a/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/Internal/GetBeforeSnapshotRequestsCollector.cs b/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/Internal/GetBeforeSnapshotRequestsCollector.cs index 8cbb12d8f..1d9ee854f 100644 --- a/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/Internal/GetBeforeSnapshotRequestsCollector.cs +++ b/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/Internal/GetBeforeSnapshotRequestsCollector.cs @@ -5,7 +5,6 @@ namespace Amazon.Lambda.AspNetCoreServer.Hosting.Internal; -#if NET8_0_OR_GREATER /// /// Helper class for storing Requests for /// @@ -14,4 +13,3 @@ internal class GetBeforeSnapshotRequestsCollector { public HttpRequestMessage? Request { get; set; } } -#endif diff --git a/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/Internal/LambdaRuntimeSupportServer.cs b/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/Internal/LambdaRuntimeSupportServer.cs index f50a37f7b..4218d463e 100644 --- a/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/Internal/LambdaRuntimeSupportServer.cs +++ b/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/Internal/LambdaRuntimeSupportServer.cs @@ -78,8 +78,13 @@ public APIGatewayHttpApiV2LambdaRuntimeSupportServer(IServiceProvider servicePro /// protected override HandlerWrapper CreateHandlerWrapper(IServiceProvider serviceProvider) { - var handler = new APIGatewayHttpApiV2MinimalApi(serviceProvider).FunctionHandlerAsync; - return HandlerWrapper.GetHandlerWrapper(handler, this.Serializer); + var handler = new APIGatewayHttpApiV2MinimalApi(serviceProvider); +#pragma warning disable CA2252 + var hostingOptions = serviceProvider.GetService(); + handler.EnableResponseStreaming = hostingOptions?.EnableResponseStreaming ?? false; +#pragma warning restore CA2252 + Func> bufferedHandler = handler.FunctionHandlerAsync; + return HandlerWrapper.GetHandlerWrapper(bufferedHandler, this.Serializer); } /// @@ -87,9 +92,7 @@ protected override HandlerWrapper CreateHandlerWrapper(IServiceProvider serviceP /// public class APIGatewayHttpApiV2MinimalApi : APIGatewayHttpApiV2ProxyFunction { - #if NET8_0_OR_GREATER private readonly IEnumerable _beforeSnapshotRequestsCollectors; - #endif private readonly HostingOptions? _hostingOptions; /// @@ -99,9 +102,7 @@ public class APIGatewayHttpApiV2MinimalApi : APIGatewayHttpApiV2ProxyFunction public APIGatewayHttpApiV2MinimalApi(IServiceProvider serviceProvider) : base(serviceProvider) { - #if NET8_0_OR_GREATER _beforeSnapshotRequestsCollectors = serviceProvider.GetServices(); - #endif // Retrieve HostingOptions from service provider (may be null for backward compatibility) _hostingOptions = serviceProvider.GetService(); @@ -127,14 +128,12 @@ public APIGatewayHttpApiV2MinimalApi(IServiceProvider serviceProvider) } } - #if NET8_0_OR_GREATER protected override IEnumerable GetBeforeSnapshotRequests() { foreach (var collector in _beforeSnapshotRequestsCollectors) if (collector.Request != null) yield return collector.Request; } - #endif protected override void PostMarshallRequestFeature(IHttpRequestFeature aspNetCoreRequestFeature, APIGatewayEvents.APIGatewayHttpApiV2ProxyRequest lambdaRequest, ILambdaContext lambdaContext) { @@ -208,8 +207,13 @@ public APIGatewayRestApiLambdaRuntimeSupportServer(IServiceProvider serviceProvi /// protected override HandlerWrapper CreateHandlerWrapper(IServiceProvider serviceProvider) { - var handler = new APIGatewayRestApiMinimalApi(serviceProvider).FunctionHandlerAsync; - return HandlerWrapper.GetHandlerWrapper(handler, this.Serializer); + var handler = new APIGatewayRestApiMinimalApi(serviceProvider); +#pragma warning disable CA2252 + var hostingOptions = serviceProvider.GetService(); + handler.EnableResponseStreaming = hostingOptions?.EnableResponseStreaming ?? false; +#pragma warning restore CA2252 + Func> bufferedHandler = handler.FunctionHandlerAsync; + return HandlerWrapper.GetHandlerWrapper(bufferedHandler, this.Serializer); } /// @@ -217,9 +221,7 @@ protected override HandlerWrapper CreateHandlerWrapper(IServiceProvider serviceP /// public class APIGatewayRestApiMinimalApi : APIGatewayProxyFunction { - #if NET8_0_OR_GREATER private readonly IEnumerable _beforeSnapshotRequestsCollectors; - #endif private readonly HostingOptions? _hostingOptions; /// @@ -229,9 +231,7 @@ public class APIGatewayRestApiMinimalApi : APIGatewayProxyFunction public APIGatewayRestApiMinimalApi(IServiceProvider serviceProvider) : base(serviceProvider) { - #if NET8_0_OR_GREATER _beforeSnapshotRequestsCollectors = serviceProvider.GetServices(); - #endif // Retrieve HostingOptions from service provider (may be null for backward compatibility) _hostingOptions = serviceProvider.GetService(); @@ -257,14 +257,12 @@ public APIGatewayRestApiMinimalApi(IServiceProvider serviceProvider) } } - #if NET8_0_OR_GREATER protected override IEnumerable GetBeforeSnapshotRequests() { foreach (var collector in _beforeSnapshotRequestsCollectors) if (collector.Request != null) yield return collector.Request; } - #endif protected override void PostMarshallRequestFeature(IHttpRequestFeature aspNetCoreRequestFeature, APIGatewayEvents.APIGatewayProxyRequest lambdaRequest, ILambdaContext lambdaContext) { @@ -338,8 +336,13 @@ public ApplicationLoadBalancerLambdaRuntimeSupportServer(IServiceProvider servic /// protected override HandlerWrapper CreateHandlerWrapper(IServiceProvider serviceProvider) { - var handler = new ApplicationLoadBalancerMinimalApi(serviceProvider).FunctionHandlerAsync; - return HandlerWrapper.GetHandlerWrapper(handler, this.Serializer); + var handler = new ApplicationLoadBalancerMinimalApi(serviceProvider); +#pragma warning disable CA2252 + var hostingOptions = serviceProvider.GetService(); + handler.EnableResponseStreaming = hostingOptions?.EnableResponseStreaming ?? false; +#pragma warning restore CA2252 + Func> bufferedHandler = handler.FunctionHandlerAsync; + return HandlerWrapper.GetHandlerWrapper(bufferedHandler, this.Serializer); } /// @@ -347,9 +350,7 @@ protected override HandlerWrapper CreateHandlerWrapper(IServiceProvider serviceP /// public class ApplicationLoadBalancerMinimalApi : ApplicationLoadBalancerFunction { - #if NET8_0_OR_GREATER private readonly IEnumerable _beforeSnapshotRequestsCollectors; - #endif private readonly HostingOptions? _hostingOptions; /// @@ -359,9 +360,7 @@ public class ApplicationLoadBalancerMinimalApi : ApplicationLoadBalancerFunction public ApplicationLoadBalancerMinimalApi(IServiceProvider serviceProvider) : base(serviceProvider) { - #if NET8_0_OR_GREATER _beforeSnapshotRequestsCollectors = serviceProvider.GetServices(); - #endif // Retrieve HostingOptions from service provider (may be null for backward compatibility) _hostingOptions = serviceProvider.GetService(); @@ -387,14 +386,12 @@ public ApplicationLoadBalancerMinimalApi(IServiceProvider serviceProvider) } } - #if NET8_0_OR_GREATER protected override IEnumerable GetBeforeSnapshotRequests() { foreach (var collector in _beforeSnapshotRequestsCollectors) if (collector.Request != null) yield return collector.Request; } - #endif protected override void PostMarshallRequestFeature(IHttpRequestFeature aspNetCoreRequestFeature, ApplicationLoadBalancerEvents.ApplicationLoadBalancerRequest lambdaRequest, ILambdaContext lambdaContext) { diff --git a/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/ServiceCollectionExtensions.cs b/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/ServiceCollectionExtensions.cs index aa952bc54..bd4089df0 100644 --- a/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/ServiceCollectionExtensions.cs +++ b/Libraries/src/Amazon.Lambda.AspNetCoreServer.Hosting/ServiceCollectionExtensions.cs @@ -88,7 +88,6 @@ public static IServiceCollection AddAWSLambdaHosting(this IServiceCollection ser return services; } - #if NET8_0_OR_GREATER /// /// Adds a > that will be used to invoke /// Routes in your lambda function in order to initialize the ASP.NET Core and Lambda pipelines @@ -142,7 +141,6 @@ public static IServiceCollection AddAWSLambdaBeforeSnapshotRequest(this IService return services; } - #endif private static bool TryLambdaSetup(IServiceCollection services, LambdaEventSource eventSource, Action? configure, out HostingOptions? hostingOptions) { diff --git a/Libraries/src/Amazon.Lambda.AspNetCoreServer/AbstractAspNetCoreFunction.cs b/Libraries/src/Amazon.Lambda.AspNetCoreServer/AbstractAspNetCoreFunction.cs index b24a9fd61..0cfe1a97e 100644 --- a/Libraries/src/Amazon.Lambda.AspNetCoreServer/AbstractAspNetCoreFunction.cs +++ b/Libraries/src/Amazon.Lambda.AspNetCoreServer/AbstractAspNetCoreFunction.cs @@ -24,6 +24,12 @@ namespace Amazon.Lambda.AspNetCoreServer /// public abstract class AbstractAspNetCoreFunction { + internal const string ParameterizedPreviewMessage = + "Response streaming is in preview till a new version of .NET Lambda runtime client that supports response streaming " + + "has been deployed to the .NET Lambda managed runtime. Till deployment has been made the feature can be used by deploying as an " + + "executable including the latest version of Amazon.Lambda.RuntimeSupport and setting the \"EnablePreviewFeatures\" in the Lambda " + + "project file to \"true\""; + /// /// Key to access the ILambdaContext object from the HttpContext.Items collection. /// @@ -194,6 +200,15 @@ public void RegisterResponseContentEncodingForContentEncoding(string contentEnco /// public bool IncludeUnhandledExceptionDetailInResponse { get; set; } + /// + /// When true, writes the response directly to a + /// instead of + /// buffering it and returning a typed response object (which will be null). + /// Requires net8.0 or later. + /// + [System.Runtime.Versioning.RequiresPreviewFeatures(ParameterizedPreviewMessage)] + public virtual bool EnableResponseStreaming { get; set; } = false; + /// /// Method to initialize the web builder before starting the web host. In a typical Web API this is similar to the main function. @@ -255,7 +270,6 @@ protected virtual IHostBuilder CreateHostBuilder() return builder; } - #if NET8_0_OR_GREATER /// /// Return one or more s that will be used to invoke /// Routes in your lambda function in order to initialize the ASP.NET Core and Lambda pipelines @@ -294,7 +308,6 @@ protected virtual IHostBuilder CreateHostBuilder() /// protected virtual IEnumerable GetBeforeSnapshotRequests() => Enumerable.Empty(); - #endif private protected bool IsStarted { @@ -306,8 +319,6 @@ private protected bool IsStarted private void AddRegisterBeforeSnapshot() { - #if NET8_0_OR_GREATER - Amazon.Lambda.Core.SnapshotRestore.RegisterBeforeSnapshot(async () => { var beforeSnapstartRequests = GetBeforeSnapshotRequests(); @@ -339,8 +350,6 @@ private void AddRegisterBeforeSnapshot() } } }); - - #endif } /// @@ -475,6 +484,14 @@ public virtual async Task FunctionHandlerAsync(TREQUEST request, ILam PostMarshallItemsFeatureFeature(itemFeatures, request, lambdaContext); } +#pragma warning disable CA2252 + if (EnableResponseStreaming) + { + await ExecuteStreamingRequestAsync(features, request, lambdaContext); + return default; + } +#pragma warning restore CA2252 + var scope = this._hostServices.CreateScope(); try { @@ -509,41 +526,7 @@ protected async Task ProcessRequest(ILambdaContext lambdaContext, obj { try { - await this._server.Application.ProcessRequestAsync(context); - } - catch (AggregateException agex) - { - ex = agex; - _logger.LogError(agex, $"Caught AggregateException: '{agex}'"); - var sb = new StringBuilder(); - foreach (var newEx in agex.InnerExceptions) - { - sb.AppendLine(this.ErrorReport(newEx)); - } - - _logger.LogError(sb.ToString()); - ((IHttpResponseFeature)features).StatusCode = 500; - } - catch (ReflectionTypeLoadException rex) - { - ex = rex; - _logger.LogError(rex, $"Caught ReflectionTypeLoadException: '{rex}'"); - var sb = new StringBuilder(); - foreach (var loaderException in rex.LoaderExceptions) - { - var fileNotFoundException = loaderException as FileNotFoundException; - if (fileNotFoundException != null && !string.IsNullOrEmpty(fileNotFoundException.FileName)) - { - sb.AppendLine($"Missing file: {fileNotFoundException.FileName}"); - } - else - { - sb.AppendLine(this.ErrorReport(loaderException)); - } - } - - _logger.LogError(sb.ToString()); - ((IHttpResponseFeature)features).StatusCode = 500; + await RunPipelineAsync(context, features); } catch (Exception e) { @@ -697,5 +680,179 @@ protected virtual void PostMarshallResponseFeature(IHttpResponseFeature aspNetCo /// /// protected abstract TRESPONSE MarshallResponse(IHttpResponseFeature responseFeatures, ILambdaContext lambdaContext, int statusCodeIfNotSet = 200); + + /// + /// Builds an from the current + /// ASP.NET Core response feature. The status code defaults to 200 when + /// is 0. Set-Cookie header values are moved to ; + /// all other headers are placed in . + /// + /// The ASP.NET Core response feature for the current invocation. + /// A populated . + [System.Runtime.Versioning.RequiresPreviewFeatures(ParameterizedPreviewMessage)] + protected virtual Amazon.Lambda.Core.ResponseStreaming.HttpResponseStreamPrelude BuildStreamingPrelude(IHttpResponseFeature responseFeature) + { + var prelude = new Amazon.Lambda.Core.ResponseStreaming.HttpResponseStreamPrelude + { + StatusCode = (System.Net.HttpStatusCode)(responseFeature.StatusCode != 0 ? responseFeature.StatusCode : 200) + }; + + foreach (var kvp in responseFeature.Headers) + { + if (string.Equals(kvp.Key, "Set-Cookie", StringComparison.OrdinalIgnoreCase)) + { + foreach (var value in kvp.Value) + { + prelude.Cookies.Add(value); + } + } + else + { + prelude.MultiValueHeaders[kvp.Key] = kvp.Value.ToArray(); + } + } + + return prelude; + } + + /// + /// Creates a for writing the streaming Lambda response. + /// The default implementation calls . + /// Subclasses may override this method to substitute a different stream (e.g. a + /// in unit tests). + /// + /// The HTTP response prelude containing status code and headers. + /// A writable for the response body. + [System.Runtime.Versioning.RequiresPreviewFeatures(ParameterizedPreviewMessage)] + protected virtual System.IO.Stream CreateLambdaResponseStream( + Amazon.Lambda.Core.ResponseStreaming.HttpResponseStreamPrelude prelude) + { + return Amazon.Lambda.Core.ResponseStreaming.LambdaResponseStreamFactory.CreateHttpStream(prelude); + } + + /// + /// Executes the streaming response path. Called by when + /// is true. Writes the response directly to a + /// . + /// + [System.Runtime.Versioning.RequiresPreviewFeatures(ParameterizedPreviewMessage)] + private async Task ExecuteStreamingRequestAsync(InvokeFeatures features, TREQUEST request, ILambdaContext lambdaContext) + { + var responseFeature = (IHttpResponseFeature)features; + System.IO.Stream lambdaStream = null; + bool streamOpened = false; + + async Task OpenStream() + { + var prelude = BuildStreamingPrelude(responseFeature); + _logger.LogDebug("Opening Lambda response stream with Status code {StatusCode}", prelude.StatusCode); + var stream = CreateLambdaResponseStream(prelude); + lambdaStream = stream; + streamOpened = true; + return stream; + } + + var streamingBodyFeature = new Internal.StreamingResponseBodyFeature(_logger, responseFeature, OpenStream); + features[typeof(IHttpResponseBodyFeature)] = streamingBodyFeature; + + var scope = this._hostServices.CreateScope(); + Exception pipelineException = null; + try + { + ((IServiceProvidersFeature)features).RequestServices = scope.ServiceProvider; + + var context = this.CreateContext(features); + try + { + try + { + await RunPipelineAsync(context, features); + await streamingBodyFeature.CompleteAsync(); + } + catch (Exception e) + { + pipelineException = e; + + if (!streamOpened && IncludeUnhandledExceptionDetailInResponse) + { + var errorPrelude = new Amazon.Lambda.Core.ResponseStreaming.HttpResponseStreamPrelude + { + StatusCode = System.Net.HttpStatusCode.InternalServerError + }; + var errorStream = CreateLambdaResponseStream(errorPrelude); + lambdaStream = errorStream; + streamOpened = true; + var errorBytes = System.Text.Encoding.UTF8.GetBytes(ErrorReport(e)); + await errorStream.WriteAsync(errorBytes, 0, errorBytes.Length); + } + else if (streamOpened) + { + _logger.LogError(e, $"Unhandled exception after response stream was opened: {ErrorReport(e)}"); + } + else + { + _logger.LogError(e, $"Unknown error responding to request: {ErrorReport(e)}"); + } + } + } + finally + { + if (lambdaStream != null) + { + lambdaStream.Dispose(); + } + + if (features.ResponseCompletedEvents != null) + { + await features.ResponseCompletedEvents.ExecuteAsync(); + } + + this._server.Application.DisposeContext(context, pipelineException); + } + } + finally + { + scope.Dispose(); + } + } + + /// + /// Invokes the ASP.NET Core pipeline for the given context, handling + /// and with + /// detailed logging. Any other exception is rethrown to the caller. + /// + private async Task RunPipelineAsync(object context, InvokeFeatures features) + { + try + { + await this._server.Application.ProcessRequestAsync(context); + } + catch (AggregateException agex) + { + _logger.LogError(agex, $"Caught AggregateException: '{agex}'"); + var sb = new StringBuilder(); + foreach (var newEx in agex.InnerExceptions) + sb.AppendLine(this.ErrorReport(newEx)); + _logger.LogError(sb.ToString()); + ((IHttpResponseFeature)features).StatusCode = 500; + throw; + } + catch (ReflectionTypeLoadException rex) + { + _logger.LogError(rex, $"Caught ReflectionTypeLoadException: '{rex}'"); + var sb = new StringBuilder(); + foreach (var loaderException in rex.LoaderExceptions) + { + var fileNotFoundException = loaderException as FileNotFoundException; + if (fileNotFoundException != null && !string.IsNullOrEmpty(fileNotFoundException.FileName)) + sb.AppendLine($"Missing file: {fileNotFoundException.FileName}"); + else + sb.AppendLine(this.ErrorReport(loaderException)); + } + _logger.LogError(sb.ToString()); + ((IHttpResponseFeature)features).StatusCode = 500; + throw; + } + } } } diff --git a/Libraries/src/Amazon.Lambda.AspNetCoreServer/Amazon.Lambda.AspNetCoreServer.csproj b/Libraries/src/Amazon.Lambda.AspNetCoreServer/Amazon.Lambda.AspNetCoreServer.csproj index 561616cd6..ea382d609 100644 --- a/Libraries/src/Amazon.Lambda.AspNetCoreServer/Amazon.Lambda.AspNetCoreServer.csproj +++ b/Libraries/src/Amazon.Lambda.AspNetCoreServer/Amazon.Lambda.AspNetCoreServer.csproj @@ -4,7 +4,7 @@ Amazon.Lambda.AspNetCoreServer makes it easy to run ASP.NET Core Web API applications as AWS Lambda functions. - net6.0;net8.0 + net8.0;net10.0 Amazon.Lambda.AspNetCoreServer 9.2.1 Amazon.Lambda.AspNetCoreServer @@ -27,7 +27,11 @@ - + + + + + diff --git a/Libraries/src/Amazon.Lambda.AspNetCoreServer/Internal/HttpRequestMessageConverter.cs b/Libraries/src/Amazon.Lambda.AspNetCoreServer/Internal/HttpRequestMessageConverter.cs index 285fb3898..016f3dabb 100644 --- a/Libraries/src/Amazon.Lambda.AspNetCoreServer/Internal/HttpRequestMessageConverter.cs +++ b/Libraries/src/Amazon.Lambda.AspNetCoreServer/Internal/HttpRequestMessageConverter.cs @@ -1,17 +1,10 @@ -#if NET8_0_OR_GREATER using System; -using System.Collections.Generic; using System.Linq; using System.Net.Http; -using System.Text; -using System.Text.Json; -using System.Text.Json.Serialization; using System.Threading.Tasks; using Amazon.Lambda.APIGatewayEvents; using Amazon.Lambda.ApplicationLoadBalancerEvents; -using Microsoft.AspNetCore.Identity.Data; using Microsoft.AspNetCore.WebUtilities; -using Microsoft.Extensions.Primitives; namespace Amazon.Lambda.AspNetCoreServer.Internal { @@ -118,4 +111,3 @@ private static async Task ReadContent(HttpRequestMessage r) } } } -#endif diff --git a/Libraries/src/Amazon.Lambda.AspNetCoreServer/Internal/InvokeFeatures.cs b/Libraries/src/Amazon.Lambda.AspNetCoreServer/Internal/InvokeFeatures.cs index 398817af2..987878311 100644 --- a/Libraries/src/Amazon.Lambda.AspNetCoreServer/Internal/InvokeFeatures.cs +++ b/Libraries/src/Amazon.Lambda.AspNetCoreServer/Internal/InvokeFeatures.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections; using System.Collections.Generic; using System.Diagnostics; @@ -28,13 +28,9 @@ public class InvokeFeatures : IFeatureCollection, IServiceProvidersFeature, ITlsConnectionFeature, IHttpRequestIdentifierFeature, - IHttpResponseBodyFeature - -#if NET6_0_OR_GREATER ,IHttpRequestBodyDetectionFeature ,IHttpActivityFeature -#endif /* , IHttpUpgradeFeature, @@ -54,11 +50,8 @@ public InvokeFeatures() this[typeof(ITlsConnectionFeature)] = this; this[typeof(IHttpResponseBodyFeature)] = this; this[typeof(IHttpRequestIdentifierFeature)] = this; - -#if NET6_0_OR_GREATER this[typeof(IHttpRequestBodyDetectionFeature)] = this; this[typeof(IHttpActivityFeature)] = this; -#endif } #region IFeatureCollection @@ -385,7 +378,6 @@ string IHttpRequestIdentifierFeature.TraceIdentifier #endregion -#if NET6_0_OR_GREATER bool IHttpRequestBodyDetectionFeature.CanHaveBody { get @@ -396,6 +388,5 @@ bool IHttpRequestBodyDetectionFeature.CanHaveBody } Activity IHttpActivityFeature.Activity { get; set; } -#endif } } diff --git a/Libraries/src/Amazon.Lambda.AspNetCoreServer/Internal/StreamingResponseBodyFeature.cs b/Libraries/src/Amazon.Lambda.AspNetCoreServer/Internal/StreamingResponseBodyFeature.cs new file mode 100644 index 000000000..9ee190d74 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.AspNetCoreServer/Internal/StreamingResponseBodyFeature.cs @@ -0,0 +1,234 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +using System; +using System.IO; +using System.IO.Pipelines; +using System.Runtime.Versioning; +using System.Threading; +using System.Threading.Tasks; + +using Microsoft.AspNetCore.Http.Features; + +using Amazon.Lambda.Core.ResponseStreaming; +using Microsoft.Extensions.Logging; + +namespace Amazon.Lambda.AspNetCoreServer.Internal +{ + /// + /// An implementation that supports Lambda response streaming. + /// Uses a two-phase approach: bytes written before are buffered in a + /// ; after all writes go directly to the + /// obtained from the stream opener delegate. + /// + [RequiresPreviewFeatures(AbstractAspNetCoreFunction.ParameterizedPreviewMessage)] + internal class StreamingResponseBodyFeature : IHttpResponseBodyFeature + { + private readonly ILogger _logger; + private readonly IHttpResponseFeature _responseFeature; + private readonly Func> _streamOpener; + + private Stream _lambdaStream; // null until StartAsync completes + private MemoryStream _preStartBuffer; // accumulates bytes written before StartAsync + private bool _started; + private PipeWriter _pipeWriter; // lazily created; always wraps the live Stream + + /// + /// Initializes a new instance of . + /// + /// + /// The for the current invocation. Used to fire + /// OnStarting callbacks when is called. + /// + /// + /// A delegate that, when invoked, builds the from + /// the response headers and calls + /// to obtain the . + /// + public StreamingResponseBodyFeature( + ILogger logger, + IHttpResponseFeature responseFeature, + Func> streamOpener) + { + _logger = logger; + _responseFeature = responseFeature ?? throw new ArgumentNullException(nameof(responseFeature)); + _streamOpener = streamOpener ?? throw new ArgumentNullException(nameof(streamOpener)); + } + + /// + /// Initializes a new instance without a logger (for use in tests). + /// + internal StreamingResponseBodyFeature( + IHttpResponseFeature responseFeature, + Func> streamOpener) + : this(null, responseFeature, streamOpener) { } + + /// + /// + /// Returns the once has been + /// called; otherwise returns a lazy-initialized that buffers + /// bytes until the stream is opened. + /// + public Stream Stream => _lambdaStream ?? (_preStartBuffer ??= new MemoryStream()); + + /// + /// + /// Returns a that calls on first + /// flush/write so that the Lambda stream is opened (and the HTTP prelude is sent) + /// as soon as the application first flushes, rather than waiting until the end. + /// + public PipeWriter Writer => _pipeWriter ??= new StartOnFlushPipeWriter(this); + + /// + /// + /// Fires all registered OnStarting callbacks, then calls the stream opener delegate + /// to obtain the , and finally flushes any bytes that + /// were buffered before this method was called. + /// + public async Task StartAsync(CancellationToken cancellationToken = default) + { + _logger?.LogInformation("Starting response streaming"); + + if (_started) return; + _started = true; + + // Fire OnStarting callbacks registered on the response feature. + // InvokeFeatures (which implements IHttpResponseFeature) stores these in + // ResponseStartingEvents, which is internal to this assembly. + if (_responseFeature is InvokeFeatures invokeFeatures && + invokeFeatures.ResponseStartingEvents != null) + { + await invokeFeatures.ResponseStartingEvents.ExecuteAsync(); + } + + // Open the Lambda response stream (this writes the HTTP prelude). + _lambdaStream = await _streamOpener(); + + // Flush any bytes that were written before StartAsync was called. + if (_preStartBuffer != null && _preStartBuffer.Length > 0) + { + _preStartBuffer.Position = 0; + await _preStartBuffer.CopyToAsync(_lambdaStream, cancellationToken); + } + } + + /// + public async Task CompleteAsync() + { + await StartAsync(); + + if (_pipeWriter != null) + { + await _pipeWriter.FlushAsync(); + } + } + + /// + /// No-op: the stream is already unbuffered once opened. + public void DisableBuffering() + { + // Intentional no-op per design: the Lambda response stream is already unbuffered. + } + + /// + /// + /// Calls to ensure the stream is open, then reads the specified + /// byte range from the file and writes it to the . + /// + public async Task SendFileAsync( + string path, + long offset, + long? count, + CancellationToken cancellationToken = default) + { + await StartAsync(cancellationToken); + + var fileInfo = new FileInfo(path); + if (offset < 0 || offset > fileInfo.Length) + throw new ArgumentOutOfRangeException(nameof(offset), offset, string.Empty); + if (count.HasValue && (count.Value < 0 || count.Value > fileInfo.Length - offset)) + throw new ArgumentOutOfRangeException(nameof(count), count, string.Empty); + + cancellationToken.ThrowIfCancellationRequested(); + + const int bufferSize = 1024 * 16; + var fileStream = new FileStream( + path, + FileMode.Open, + FileAccess.Read, + FileShare.ReadWrite, + bufferSize: bufferSize, + options: FileOptions.Asynchronous | FileOptions.SequentialScan); + + using (fileStream) + { + fileStream.Seek(offset, SeekOrigin.Begin); + await Utilities.CopyToAsync(fileStream, _lambdaStream, count, bufferSize, cancellationToken); + } + } + + // ----------------------------------------------------------------------- + // StartOnFlushPipeWriter + // + // A PipeWriter wrapper that ensures StartAsync is called (opening the Lambda + // stream and sending the HTTP prelude) the first time the application flushes + // or completes the writer — not just at the very end of the request. + // + // The inner PipeWriter is created lazily against the *live* Stream property + // so it always targets the correct underlying stream (Lambda stream after + // StartAsync, pre-start buffer before). + // ----------------------------------------------------------------------- + private sealed class StartOnFlushPipeWriter : PipeWriter + { + private readonly StreamingResponseBodyFeature _feature; + private PipeWriter _inner; + + // The inner writer must be recreated after StartAsync because Stream + // switches from _preStartBuffer to _lambdaStream at that point. + private PipeWriter Inner => _inner ??= PipeWriter.Create(_feature.Stream); + + public StartOnFlushPipeWriter(StreamingResponseBodyFeature feature) + { + _feature = feature; + } + + public override void Advance(int bytes) => Inner.Advance(bytes); + + public override Memory GetMemory(int sizeHint = 0) => Inner.GetMemory(sizeHint); + + public override Span GetSpan(int sizeHint = 0) => Inner.GetSpan(sizeHint); + + public override async ValueTask FlushAsync(CancellationToken cancellationToken = default) + { + if (!_feature._started) + { + // Flush buffered bytes into the pre-start buffer first, then open the stream. + await Inner.FlushAsync(cancellationToken); + // Recreate inner writer against the Lambda stream after StartAsync. + _inner = null; + await _feature.StartAsync(cancellationToken); + // Inner now wraps _lambdaStream; nothing extra to flush (StartAsync already + // copied the pre-start buffer across). + return new FlushResult(isCanceled: false, isCompleted: false); + } + + return await Inner.FlushAsync(cancellationToken); + } + + public override async ValueTask CompleteAsync(Exception exception = null) + { + if (!_feature._started) + { + await Inner.FlushAsync(); + _inner = null; + await _feature.StartAsync(); + } + await Inner.CompleteAsync(exception); + } + + // Complete (sync) — delegate + public override void Complete(Exception exception = null) => Inner.Complete(exception); + + public override void CancelPendingFlush() => Inner.CancelPendingFlush(); + } + } +} diff --git a/Libraries/src/Amazon.Lambda.Logging.AspNetCore/Amazon.Lambda.Logging.AspNetCore.csproj b/Libraries/src/Amazon.Lambda.Logging.AspNetCore/Amazon.Lambda.Logging.AspNetCore.csproj index 673a9ca30..7db23986c 100644 --- a/Libraries/src/Amazon.Lambda.Logging.AspNetCore/Amazon.Lambda.Logging.AspNetCore.csproj +++ b/Libraries/src/Amazon.Lambda.Logging.AspNetCore/Amazon.Lambda.Logging.AspNetCore.csproj @@ -4,7 +4,7 @@ Amazon Lambda .NET Core support - Logging ASP.NET Core package. - net6.0;net8.0 + net8.0;net10.0 Amazon.Lambda.Logging.AspNetCore 4.1.1 Amazon.Lambda.Logging.AspNetCore diff --git a/Libraries/test/Amazon.Lambda.AspNetCoreServer.Hosting.Tests/AddAWSLambdaBeforeSnapshotRequestTests.cs b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Hosting.Tests/AddAWSLambdaBeforeSnapshotRequestTests.cs index b4419b1a7..3505d8bb3 100644 --- a/Libraries/test/Amazon.Lambda.AspNetCoreServer.Hosting.Tests/AddAWSLambdaBeforeSnapshotRequestTests.cs +++ b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Hosting.Tests/AddAWSLambdaBeforeSnapshotRequestTests.cs @@ -14,7 +14,6 @@ namespace Amazon.Lambda.AspNetCoreServer.Hosting.Tests; /// public class AddAWSLambdaBeforeSnapshotRequestTests { - #if NET8_0_OR_GREATER [Theory] [InlineData(LambdaEventSource.HttpApi)] [InlineData(LambdaEventSource.RestApi)] @@ -55,5 +54,4 @@ await Task.WhenAny( Assert.True(callbackDidTheCallback); } - #endif } diff --git a/Libraries/test/Amazon.Lambda.AspNetCoreServer.Hosting.Tests/ResponseStreamingHostingTests.cs b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Hosting.Tests/ResponseStreamingHostingTests.cs new file mode 100644 index 000000000..f70f91629 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Hosting.Tests/ResponseStreamingHostingTests.cs @@ -0,0 +1,254 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + + +using System.Runtime.Versioning; +using Amazon.Lambda.AspNetCoreServer.Hosting.Internal; +using Amazon.Lambda.AspNetCoreServer.Test; +using Amazon.Lambda.Core; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; +using Microsoft.Extensions.DependencyInjection; +using Xunit; + +namespace Amazon.Lambda.AspNetCoreServer.Hosting.Tests; + +/// +/// Tests for response streaming integration in hosting (Requirement 10). +/// +[RequiresPreviewFeatures] +public class ResponseStreamingHostingTests +{ + [Fact] + public void EnableResponseStreaming_DefaultsToFalse() + { + var options = new HostingOptions(); + Assert.False(options.EnableResponseStreaming); + } + + [Fact] + public void EnableResponseStreaming_CanBeSetToTrue() + { + var options = new HostingOptions { EnableResponseStreaming = true }; + Assert.True(options.EnableResponseStreaming); + } + + [Fact] + public void AddAWSLambdaHosting_ConfigureCallback_CanSetEnableResponseStreamingTrue() + { + var services = new ServiceCollection(); + using var envHelper = new EnvironmentVariableHelper("AWS_LAMBDA_FUNCTION_NAME", "test-function"); + +#pragma warning disable CA2252 + services.AddAWSLambdaHosting(LambdaEventSource.HttpApi, options => + { + options.EnableResponseStreaming = true; + }); +#pragma warning restore CA2252 + + var sp = services.BuildServiceProvider(); + var hostingOptions = sp.GetService(); + + Assert.NotNull(hostingOptions); + Assert.True(hostingOptions.EnableResponseStreaming); + } + + [Fact] + public void AddAWSLambdaHosting_WithoutCallback_EnableResponseStreamingRemainsDefault() + { + var services = new ServiceCollection(); + using var envHelper = new EnvironmentVariableHelper("AWS_LAMBDA_FUNCTION_NAME", "test-function"); + + services.AddAWSLambdaHosting(LambdaEventSource.HttpApi); + + var sp = services.BuildServiceProvider(); + var hostingOptions = sp.GetService(); + + Assert.NotNull(hostingOptions); + Assert.False(hostingOptions.EnableResponseStreaming); + } + + + // Helper: build a minimal IServiceProvider with the given HostingOptions + private static IServiceProvider BuildServiceProvider(HostingOptions hostingOptions) + { + var services = new ServiceCollection(); + services.AddSingleton(hostingOptions); + services.AddSingleton(new DefaultLambdaJsonSerializer()); + services.AddLogging(); + return services.BuildServiceProvider(); + } + + // ---- APIGatewayHttpApiV2 ---- + + [Fact] + public void HttpApiV2_CreateHandlerWrapper_StreamingFalse_TargetsFunctionHandlerAsync() + { + var options = new HostingOptions { EnableResponseStreaming = false }; + var sp = BuildServiceProvider(options); + + var server = new TestableHttpApiV2Server(sp); + var wrapper = server.PublicCreateHandlerWrapper(sp); + + // The handler delegate target method should be FunctionHandlerAsync (not streaming) + var methodName = GetHandlerDelegateMethodName(wrapper); + Assert.Contains("FunctionHandlerAsync", methodName); + Assert.DoesNotContain("Streaming", methodName); + } + + [Fact] + public void HttpApiV2_CreateHandlerWrapper_StreamingTrue_TargetsFunctionHandlerAsync() + { + var options = new HostingOptions { EnableResponseStreaming = true }; + var sp = BuildServiceProvider(options); + + var server = new TestableHttpApiV2Server(sp); + var wrapper = server.PublicCreateHandlerWrapper(sp); + + var methodName = GetHandlerDelegateMethodName(wrapper); + Assert.Contains("FunctionHandlerAsync", methodName); + } + + // ---- APIGatewayRestApi ---- + + [Fact] + public void RestApi_CreateHandlerWrapper_StreamingFalse_TargetsFunctionHandlerAsync() + { + var options = new HostingOptions { EnableResponseStreaming = false }; + var sp = BuildServiceProvider(options); + + var server = new TestableRestApiServer(sp); + var wrapper = server.PublicCreateHandlerWrapper(sp); + + var methodName = GetHandlerDelegateMethodName(wrapper); + Assert.Contains("FunctionHandlerAsync", methodName); + Assert.DoesNotContain("Streaming", methodName); + } + + [Fact] + public void RestApi_CreateHandlerWrapper_StreamingTrue_TargetsFunctionHandlerAsync() + { + var options = new HostingOptions { EnableResponseStreaming = true }; + var sp = BuildServiceProvider(options); + + var server = new TestableRestApiServer(sp); + var wrapper = server.PublicCreateHandlerWrapper(sp); + + var methodName = GetHandlerDelegateMethodName(wrapper); + Assert.Contains("FunctionHandlerAsync", methodName); + } + + // ---- ApplicationLoadBalancer ---- + + [Fact] + public void Alb_CreateHandlerWrapper_StreamingFalse_TargetsFunctionHandlerAsync() + { + var options = new HostingOptions { EnableResponseStreaming = false }; + var sp = BuildServiceProvider(options); + + var server = new TestableAlbServer(sp); + var wrapper = server.PublicCreateHandlerWrapper(sp); + + var methodName = GetHandlerDelegateMethodName(wrapper); + Assert.Contains("FunctionHandlerAsync", methodName); + Assert.DoesNotContain("Streaming", methodName); + } + + [Fact] + public void Alb_CreateHandlerWrapper_StreamingTrue_TargetsFunctionHandlerAsync() + { + var options = new HostingOptions { EnableResponseStreaming = true }; + var sp = BuildServiceProvider(options); + + var server = new TestableAlbServer(sp); + var wrapper = server.PublicCreateHandlerWrapper(sp); + + var methodName = GetHandlerDelegateMethodName(wrapper); + Assert.Contains("FunctionHandlerAsync", methodName); + } + + // ------------------------------------------------------------------------- + // Helpers + // ------------------------------------------------------------------------- + + /// + /// Extracts the method name from the delegate stored inside a HandlerWrapper. + /// HandlerWrapper.Handler is a LambdaBootstrapHandler (a delegate). The actual + /// user-supplied delegate is captured in a closure, so we walk the closure's + /// fields to find the innermost Func/delegate and read its Method.Name. + /// + private static string GetHandlerDelegateMethodName(HandlerWrapper wrapper) + { + // HandlerWrapper.Handler is the LambdaBootstrapHandler delegate. + // It is an async lambda that closes over the user-supplied handler delegate. + // We use reflection to dig through the closure chain until we find a field + // whose type is a delegate with a Method.Name we can inspect. + var handler = wrapper.Handler; + return FindDelegateMethodName(handler.Target, visited: new HashSet(ReferenceEqualityComparer.Instance)); + } + + private static string FindDelegateMethodName(object? target, HashSet visited) + { + if (target == null || !visited.Add(target)) + return string.Empty; + + foreach (var field in target.GetType().GetFields( + System.Reflection.BindingFlags.Instance | + System.Reflection.BindingFlags.NonPublic | + System.Reflection.BindingFlags.Public)) + { + var value = field.GetValue(target); + if (value == null) continue; + + if (value is Delegate d) + { + var name = d.Method.Name; + // Skip compiler-generated method names (lambdas / state machines) + if (!name.StartsWith("<") && !name.Contains("MoveNext")) + return name; + + // Recurse into the delegate's own closure + var inner = FindDelegateMethodName(d.Target, visited); + if (!string.IsNullOrEmpty(inner)) + return inner; + } + else if (value.GetType().IsClass && !value.GetType().IsPrimitive + && value.GetType().Namespace?.StartsWith("System") == false) + { + var inner = FindDelegateMethodName(value, visited); + if (!string.IsNullOrEmpty(inner)) + return inner; + } + } + + return string.Empty; + } + + // ------------------------------------------------------------------------- + // Testable server subclasses that expose CreateHandlerWrapper publicly + // ------------------------------------------------------------------------- + + private class TestableHttpApiV2Server : APIGatewayHttpApiV2LambdaRuntimeSupportServer + { + public TestableHttpApiV2Server(IServiceProvider sp) : base(sp) { } + + public HandlerWrapper PublicCreateHandlerWrapper(IServiceProvider sp) + => CreateHandlerWrapper(sp); + } + + private class TestableRestApiServer : APIGatewayRestApiLambdaRuntimeSupportServer + { + public TestableRestApiServer(IServiceProvider sp) : base(sp) { } + + public HandlerWrapper PublicCreateHandlerWrapper(IServiceProvider sp) + => CreateHandlerWrapper(sp); + } + + private class TestableAlbServer : ApplicationLoadBalancerLambdaRuntimeSupportServer + { + public TestableAlbServer(IServiceProvider sp) : base(sp) { } + + public HandlerWrapper PublicCreateHandlerWrapper(IServiceProvider sp) + => CreateHandlerWrapper(sp); + } +} diff --git a/Libraries/test/Amazon.Lambda.AspNetCoreServer.Hosting.Tests/ResponseStreamingPropertyTests.cs b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Hosting.Tests/ResponseStreamingPropertyTests.cs new file mode 100644 index 000000000..43ebc4dd4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Hosting.Tests/ResponseStreamingPropertyTests.cs @@ -0,0 +1,129 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using System.Runtime.Versioning; + +using Amazon.Lambda.AspNetCoreServer.Hosting.Internal; +using Amazon.Lambda.Core; +using Amazon.Lambda.RuntimeSupport; +using Amazon.Lambda.Serialization.SystemTextJson; + +using Microsoft.Extensions.DependencyInjection; + +using Xunit; + +namespace Amazon.Lambda.AspNetCoreServer.Hosting.Tests; + +[RequiresPreviewFeatures] +public class ResponseStreamingPropertyTests +{ + private static IServiceProvider BuildServiceProvider(HostingOptions hostingOptions) + { + var services = new ServiceCollection(); + services.AddSingleton(hostingOptions); + services.AddSingleton(new DefaultLambdaJsonSerializer()); + services.AddLogging(); + return services.BuildServiceProvider(); + } + + private static string GetHandlerDelegateMethodName(HandlerWrapper wrapper) + { + var handler = wrapper.Handler; + return FindDelegateMethodName(handler.Target, new HashSet(ReferenceEqualityComparer.Instance)); + } + + private static string FindDelegateMethodName(object? target, HashSet visited) + { + if (target == null || !visited.Add(target)) + return string.Empty; + + foreach (var field in target.GetType().GetFields( + System.Reflection.BindingFlags.Instance | + System.Reflection.BindingFlags.NonPublic | + System.Reflection.BindingFlags.Public)) + { + var value = field.GetValue(target); + if (value == null) continue; + + if (value is Delegate d) + { + var name = d.Method.Name; + if (!name.StartsWith("<") && !name.Contains("MoveNext")) + return name; + var inner = FindDelegateMethodName(d.Target, visited); + if (!string.IsNullOrEmpty(inner)) return inner; + } + else if (value.GetType().IsClass && !value.GetType().IsPrimitive + && value.GetType().Namespace?.StartsWith("System") == false) + { + var inner = FindDelegateMethodName(value, visited); + if (!string.IsNullOrEmpty(inner)) return inner; + } + } + + return string.Empty; + } + + private class TestableHttpApiV2Server : APIGatewayHttpApiV2LambdaRuntimeSupportServer + { + public TestableHttpApiV2Server(IServiceProvider sp) : base(sp) { } + public HandlerWrapper PublicCreateHandlerWrapper(IServiceProvider sp) => CreateHandlerWrapper(sp); + } + + private class TestableRestApiServer : APIGatewayRestApiLambdaRuntimeSupportServer + { + public TestableRestApiServer(IServiceProvider sp) : base(sp) { } + public HandlerWrapper PublicCreateHandlerWrapper(IServiceProvider sp) => CreateHandlerWrapper(sp); + } + + private class TestableAlbServer : ApplicationLoadBalancerLambdaRuntimeSupportServer + { + public TestableAlbServer(IServiceProvider sp) : base(sp) { } + public HandlerWrapper PublicCreateHandlerWrapper(IServiceProvider sp) => CreateHandlerWrapper(sp); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public void Property9_HttpApiV2_StreamingFlag_RoutesCorrectly(bool enableStreaming) + { + var options = new HostingOptions { EnableResponseStreaming = enableStreaming }; + var sp = BuildServiceProvider(options); + var server = new TestableHttpApiV2Server(sp); + var wrapper = server.PublicCreateHandlerWrapper(sp); + var methodName = GetHandlerDelegateMethodName(wrapper); + + Assert.Contains("FunctionHandlerAsync", methodName); + Assert.DoesNotContain("StreamingFunctionHandlerAsync", methodName); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public void Property9_RestApi_StreamingFlag_RoutesCorrectly(bool enableStreaming) + { + var options = new HostingOptions { EnableResponseStreaming = enableStreaming }; + var sp = BuildServiceProvider(options); + var server = new TestableRestApiServer(sp); + var wrapper = server.PublicCreateHandlerWrapper(sp); + var methodName = GetHandlerDelegateMethodName(wrapper); + + Assert.Contains("FunctionHandlerAsync", methodName); + Assert.DoesNotContain("StreamingFunctionHandlerAsync", methodName); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public void Property9_Alb_StreamingFlag_RoutesCorrectly(bool enableStreaming) + { + var options = new HostingOptions { EnableResponseStreaming = enableStreaming }; + var sp = BuildServiceProvider(options); + var server = new TestableAlbServer(sp); + var wrapper = server.PublicCreateHandlerWrapper(sp); + var methodName = GetHandlerDelegateMethodName(wrapper); + + Assert.Contains("FunctionHandlerAsync", methodName); + Assert.DoesNotContain("StreamingFunctionHandlerAsync", methodName); + } +} diff --git a/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/Amazon.Lambda.AspNetCoreServer.Test.csproj b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/Amazon.Lambda.AspNetCoreServer.Test.csproj index 9ace52777..6d2a4d012 100644 --- a/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/Amazon.Lambda.AspNetCoreServer.Test.csproj +++ b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/Amazon.Lambda.AspNetCoreServer.Test.csproj @@ -1,7 +1,9 @@  + + - net6.0;net8.0 + net8.0;net10.0 Amazon.Lambda.AspNetCoreServer.Test Library Amazon.Lambda.AspNetCoreServer.Test @@ -55,5 +57,6 @@ + diff --git a/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/BuildStreamingPreludeTests.cs b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/BuildStreamingPreludeTests.cs new file mode 100644 index 000000000..3504d0dd5 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/BuildStreamingPreludeTests.cs @@ -0,0 +1,232 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +using System.Net; +using System.Runtime.Versioning; + +using Amazon.Lambda.AspNetCoreServer.Internal; +using Microsoft.AspNetCore.Http.Features; +using Xunit; + +namespace Amazon.Lambda.AspNetCoreServer.Test +{ + [RequiresPreviewFeatures] + public class BuildStreamingPreludeTests + { + // Subclass that skips host startup entirely and + // just exposes BuildStreamingPrelude directly without needing a running host. + private class StandalonePreludeBuilder : APIGatewayHttpApiV2ProxyFunction + { + // Use the StartupMode.FirstRequest constructor so no host is started eagerly. + public StandalonePreludeBuilder() + : base(StartupMode.FirstRequest) { } + + public Amazon.Lambda.Core.ResponseStreaming.HttpResponseStreamPrelude + InvokeBuildStreamingPrelude(IHttpResponseFeature responseFeature) + => BuildStreamingPrelude(responseFeature); + } + + private static StandalonePreludeBuilder CreateBuilder() => new StandalonePreludeBuilder(); + + // Helper: create an InvokeFeatures, set StatusCode and Headers, return as IHttpResponseFeature. + private static IHttpResponseFeature MakeResponseFeature(int statusCode, System.Collections.Generic.Dictionary headers = null) + { + var features = new InvokeFeatures(); + var rf = (IHttpResponseFeature)features; + rf.StatusCode = statusCode; + if (headers != null) + { + foreach (var kvp in headers) + rf.Headers[kvp.Key] = new Microsoft.Extensions.Primitives.StringValues(kvp.Value); + } + return rf; + } + + // ----------------------------------------------------------------------- + // 6.1 Status code is copied correctly for values 100–599 + // ----------------------------------------------------------------------- + [Theory] + [InlineData(100)] + [InlineData(200)] + [InlineData(201)] + [InlineData(204)] + [InlineData(301)] + [InlineData(302)] + [InlineData(400)] + [InlineData(401)] + [InlineData(403)] + [InlineData(404)] + [InlineData(500)] + [InlineData(503)] + [InlineData(599)] + public void StatusCode_IsCopiedCorrectly(int statusCode) + { + var builder = CreateBuilder(); + var rf = MakeResponseFeature(statusCode); + + var prelude = builder.InvokeBuildStreamingPrelude(rf); + + Assert.Equal((HttpStatusCode)statusCode, prelude.StatusCode); + } + + // ----------------------------------------------------------------------- + // 6.2 Status code defaults to 200 when IHttpResponseFeature.StatusCode is 0 + // ----------------------------------------------------------------------- + [Fact] + public void StatusCode_DefaultsTo200_WhenFeatureStatusCodeIsZero() + { + var builder = CreateBuilder(); + var rf = MakeResponseFeature(0); + + var prelude = builder.InvokeBuildStreamingPrelude(rf); + + Assert.Equal(HttpStatusCode.OK, prelude.StatusCode); + } + + // ----------------------------------------------------------------------- + // 6.3 Non-Set-Cookie headers appear in MultiValueHeaders with all values preserved + // ----------------------------------------------------------------------- + [Fact] + public void NonSetCookieHeaders_AppearInMultiValueHeaders_WithAllValuesPreserved() + { + var builder = CreateBuilder(); + var rf = MakeResponseFeature(200, new System.Collections.Generic.Dictionary + { + ["Content-Type"] = new[] { "application/json" }, + ["X-Custom"] = new[] { "val1", "val2" }, + ["Cache-Control"] = new[] { "no-cache", "no-store" } + }); + + var prelude = builder.InvokeBuildStreamingPrelude(rf); + + Assert.True(prelude.MultiValueHeaders.ContainsKey("Content-Type")); + Assert.Equal(new[] { "application/json" }, prelude.MultiValueHeaders["Content-Type"]); + + Assert.True(prelude.MultiValueHeaders.ContainsKey("X-Custom")); + Assert.Equal(new[] { "val1", "val2" }, prelude.MultiValueHeaders["X-Custom"]); + + Assert.True(prelude.MultiValueHeaders.ContainsKey("Cache-Control")); + Assert.Equal(new[] { "no-cache", "no-store" }, prelude.MultiValueHeaders["Cache-Control"]); + } + + [Fact] + public void NonSetCookieHeaders_MultiValueHeaders_PreservesMultipleValues() + { + var builder = CreateBuilder(); + var rf = MakeResponseFeature(200, new System.Collections.Generic.Dictionary + { + ["Accept"] = new[] { "text/html", "application/xhtml+xml", "application/xml" } + }); + + var prelude = builder.InvokeBuildStreamingPrelude(rf); + + Assert.Equal(new[] { "text/html", "application/xhtml+xml", "application/xml" }, + prelude.MultiValueHeaders["Accept"]); + } + + // ----------------------------------------------------------------------- + // 6.4 Set-Cookie header values are moved to Cookies and absent from MultiValueHeaders + // ----------------------------------------------------------------------- + [Fact] + public void SetCookieHeader_MovedToCookies_AbsentFromMultiValueHeaders() + { + var builder = CreateBuilder(); + var rf = MakeResponseFeature(200, new System.Collections.Generic.Dictionary + { + ["Set-Cookie"] = new[] { "session=abc123; Path=/; HttpOnly" }, + ["Content-Type"] = new[] { "text/html" } + }); + + var prelude = builder.InvokeBuildStreamingPrelude(rf); + + // Cookie value is in Cookies + Assert.Contains("session=abc123; Path=/; HttpOnly", prelude.Cookies); + + // Set-Cookie is NOT in MultiValueHeaders + Assert.False(prelude.MultiValueHeaders.ContainsKey("Set-Cookie")); + Assert.False(prelude.MultiValueHeaders.ContainsKey("set-cookie")); + + // Other headers are still present + Assert.True(prelude.MultiValueHeaders.ContainsKey("Content-Type")); + } + + [Fact] + public void SetCookieHeader_IsCaseInsensitive() + { + // The implementation uses StringComparison.OrdinalIgnoreCase, so + // "set-cookie" (lowercase) should also be routed to Cookies. + var builder = CreateBuilder(); + var features = new InvokeFeatures(); + var rf = (IHttpResponseFeature)features; + rf.StatusCode = 200; + // HeaderDictionary is case-insensitive, so "set-cookie" and "Set-Cookie" are the same key. + rf.Headers["set-cookie"] = new Microsoft.Extensions.Primitives.StringValues("id=xyz; Path=/"); + + var prelude = builder.InvokeBuildStreamingPrelude(rf); + + Assert.Contains("id=xyz; Path=/", prelude.Cookies); + Assert.False(prelude.MultiValueHeaders.ContainsKey("Set-Cookie")); + Assert.False(prelude.MultiValueHeaders.ContainsKey("set-cookie")); + } + + // ----------------------------------------------------------------------- + // 6.5 Multiple Set-Cookie values all appear in Cookies + // ----------------------------------------------------------------------- + [Fact] + public void MultipleSetCookieValues_AllAppearInCookies() + { + var builder = CreateBuilder(); + var rf = MakeResponseFeature(200, new System.Collections.Generic.Dictionary + { + ["Set-Cookie"] = new[] + { + "session=abc; Path=/; HttpOnly", + "theme=dark; Path=/", + "lang=en; Path=/; SameSite=Strict" + } + }); + + var prelude = builder.InvokeBuildStreamingPrelude(rf); + + Assert.Equal(3, prelude.Cookies.Count); + Assert.Contains("session=abc; Path=/; HttpOnly", prelude.Cookies); + Assert.Contains("theme=dark; Path=/", prelude.Cookies); + Assert.Contains("lang=en; Path=/; SameSite=Strict", prelude.Cookies); + + // None of them should be in MultiValueHeaders + Assert.False(prelude.MultiValueHeaders.ContainsKey("Set-Cookie")); + } + + [Fact] + public void MultipleSetCookieValues_WithOtherHeaders_CookiesAndHeadersAreSeparated() + { + var builder = CreateBuilder(); + var rf = MakeResponseFeature(201, new System.Collections.Generic.Dictionary + { + ["Set-Cookie"] = new[] { "a=1", "b=2" }, + ["Location"] = new[] { "/new-resource" } + }); + + var prelude = builder.InvokeBuildStreamingPrelude(rf); + + Assert.Equal((HttpStatusCode)201, prelude.StatusCode); + Assert.Equal(2, prelude.Cookies.Count); + Assert.Contains("a=1", prelude.Cookies); + Assert.Contains("b=2", prelude.Cookies); + Assert.True(prelude.MultiValueHeaders.ContainsKey("Location")); + Assert.False(prelude.MultiValueHeaders.ContainsKey("Set-Cookie")); + } + + [Fact] + public void EmptyHeaders_ProducesEmptyMultiValueHeadersAndCookies() + { + var builder = CreateBuilder(); + var rf = MakeResponseFeature(204); + + var prelude = builder.InvokeBuildStreamingPrelude(rf); + + Assert.Equal(HttpStatusCode.NoContent, prelude.StatusCode); + Assert.Empty(prelude.MultiValueHeaders); + Assert.Empty(prelude.Cookies); + } + } +} diff --git a/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/ResponseStreamingPropertyTests.cs b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/ResponseStreamingPropertyTests.cs new file mode 100644 index 000000000..d10149f07 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/ResponseStreamingPropertyTests.cs @@ -0,0 +1,478 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Runtime.Versioning; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +using Amazon.Lambda.APIGatewayEvents; +using Amazon.Lambda.AspNetCoreServer.Internal; +using Amazon.Lambda.Core; +using Amazon.Lambda.TestUtilities; + +using Microsoft.AspNetCore.Http.Features; + +using Xunit; + +namespace Amazon.Lambda.AspNetCoreServer.Test +{ + [RequiresPreviewFeatures] + public class ResponseStreamingPropertyTests + { + // ----------------------------------------------------------------------- + // Shared test infrastructure + // ----------------------------------------------------------------------- + + private class PropertyTestStreamingFunction : APIGatewayHttpApiV2ProxyFunction + { + public InvokeFeatures CapturedFeatures { get; private set; } + public MemoryStream CapturedLambdaStream { get; private set; } + public bool MarshallResponseCalled { get; private set; } + + public PropertyTestStreamingFunction() + : base(StartupMode.FirstRequest) + { + EnableResponseStreaming = true; + } + + public void PublicMarshallRequest(InvokeFeatures features, + APIGatewayHttpApiV2ProxyRequest request, ILambdaContext context) + => MarshallRequest(features, request, context); + + protected override void PostMarshallItemsFeatureFeature( + IItemsFeature aspNetCoreItemFeature, + APIGatewayHttpApiV2ProxyRequest lambdaRequest, + ILambdaContext lambdaContext) + { + CapturedFeatures = aspNetCoreItemFeature as InvokeFeatures; + base.PostMarshallItemsFeatureFeature(aspNetCoreItemFeature, lambdaRequest, lambdaContext); + } + + [RequiresPreviewFeatures] + protected override Stream CreateLambdaResponseStream( + Amazon.Lambda.Core.ResponseStreaming.HttpResponseStreamPrelude prelude) + { + var ms = new MemoryStream(); + CapturedLambdaStream = ms; + return ms; + } + + protected override APIGatewayHttpApiV2ProxyResponse MarshallResponse( + IHttpResponseFeature responseFeatures, + ILambdaContext lambdaContext, + int statusCodeIfNotSet = 200) + { + MarshallResponseCalled = true; + return base.MarshallResponse(responseFeatures, lambdaContext, statusCodeIfNotSet); + } + } + + private class StandalonePreludeBuilder : APIGatewayHttpApiV2ProxyFunction + { + public StandalonePreludeBuilder() : base(StartupMode.FirstRequest) { } + + public Amazon.Lambda.Core.ResponseStreaming.HttpResponseStreamPrelude + InvokeBuildStreamingPrelude(IHttpResponseFeature responseFeature) + => BuildStreamingPrelude(responseFeature); + } + + private static APIGatewayHttpApiV2ProxyRequest MakeRequest( + string method = "GET", string path = "/api/values", + Dictionary headers = null, string body = null) + => new APIGatewayHttpApiV2ProxyRequest + { + RequestContext = new APIGatewayHttpApiV2ProxyRequest.ProxyRequestContext + { + Http = new APIGatewayHttpApiV2ProxyRequest.HttpDescription { Method = method, Path = path }, + Stage = "$default" + }, + RawPath = path, + Headers = headers ?? new Dictionary { ["accept"] = "application/json" }, + Body = body + }; + + + public static IEnumerable RequestMarshallingCases() => + [ + ["GET", "/api/values", null, null], + ["POST", "/api/values", new Dictionary{["content-type"]="application/json"}, "{\"k\":\"v\"}"], + ["PUT", "/api/items/42", new Dictionary{["x-custom-header"]="abc"}, null], + ["DELETE", "/api/items/1", null, null], + ["PATCH", "/api/values", new Dictionary{["accept"]="text/html"}, null], + ]; + + [Theory] + [MemberData(nameof(RequestMarshallingCases))] + public void Property1_RequestMarshalling_IdenticalInStreamingAndBufferedModes( + string method, string path, Dictionary headers, string body) + { + var function = new PropertyTestStreamingFunction(); + var context = new TestLambdaContext(); + + // Warm up so the host is started + function.FunctionHandlerAsync(MakeRequest(), context).GetAwaiter().GetResult(); + + var request = MakeRequest(method, path, headers, body); + function.FunctionHandlerAsync(request, context).GetAwaiter().GetResult(); + var streamingReq = (IHttpRequestFeature)function.CapturedFeatures; + + var bufferedFeatures = new InvokeFeatures(); + function.PublicMarshallRequest(bufferedFeatures, request, context); + var bufferedReq = (IHttpRequestFeature)bufferedFeatures; + + Assert.NotNull(streamingReq); + Assert.Equal(bufferedReq.Method, streamingReq.Method); + Assert.Equal(bufferedReq.Path, streamingReq.Path); + Assert.Equal(bufferedReq.PathBase, streamingReq.PathBase); + Assert.Equal(bufferedReq.QueryString, streamingReq.QueryString); + Assert.Equal(bufferedReq.Scheme, streamingReq.Scheme); + + foreach (var key in bufferedReq.Headers.Keys) + { + Assert.True(streamingReq.Headers.ContainsKey(key), + $"Streaming features missing header '{key}'"); + Assert.Equal(bufferedReq.Headers[key], streamingReq.Headers[key]); + } + } + + + public static IEnumerable BufferedModeCases() => + [ + ["GET", "/api/values", null, null], + ["POST", "/api/values", null, "{\"key\":\"value\"}"], + ["PUT", "/api/items/5", null, null], + ["DELETE", "/api/items/5", null, null], + ["GET", "/api/values", new Dictionary{["accept"]="text/html"}, null], + ]; + + [Theory] + [MemberData(nameof(BufferedModeCases))] + public void Property2_BufferedMode_Unaffected( + string method, string path, Dictionary headers, string body) + { + // Use a fresh function with streaming OFF + var function = new PropertyTestStreamingFunction(); + function.EnableResponseStreaming = false; + var context = new TestLambdaContext(); + + var response = function.FunctionHandlerAsync(MakeRequest(method, path, headers, body), context) + .GetAwaiter().GetResult(); + + Assert.NotNull(response); + Assert.True(function.MarshallResponseCalled, "MarshallResponse must be called in buffered mode"); + Assert.IsType(response); + Assert.True(response.StatusCode >= 100 && response.StatusCode <= 599, + $"Status code {response.StatusCode} out of valid range"); + } + + + public static IEnumerable PreludeStatusAndHeaderCases() => + [ + // (statusCode, headerKey, headerValues[]) + [0, "accept", new[] { "application/json" }], + [200, "content-type", new[] { "text/plain" }], + [201, "x-request-id", new[] { "abc-123" }], + [404, "cache-control", new[] { "no-cache", "no-store" }], + [500, "x-custom-header", new[] { "val1", "val2", "val3" }], + ]; + + [Theory] + [MemberData(nameof(PreludeStatusAndHeaderCases))] + public void Property3_Prelude_StatusCodeAndNonCookieHeaders_Correct( + int statusCode, string headerKey, string[] headerValues) + { + var builder = new StandalonePreludeBuilder(); + var features = new InvokeFeatures(); + var rf = (IHttpResponseFeature)features; + rf.StatusCode = statusCode; + rf.Headers[headerKey] = new Microsoft.Extensions.Primitives.StringValues(headerValues); + + var prelude = builder.InvokeBuildStreamingPrelude(rf); + + int expectedStatus = statusCode == 0 ? 200 : statusCode; + Assert.Equal((System.Net.HttpStatusCode)expectedStatus, prelude.StatusCode); + + Assert.True(prelude.MultiValueHeaders.ContainsKey(headerKey), + $"Header '{headerKey}' missing from MultiValueHeaders"); + Assert.Equal(headerValues, prelude.MultiValueHeaders[headerKey].ToArray()); + + Assert.False(prelude.MultiValueHeaders.ContainsKey("Set-Cookie")); + Assert.False(prelude.MultiValueHeaders.ContainsKey("set-cookie")); + } + + + public static IEnumerable SetCookieCases() => + [ + [new[] { "session=abc; Path=/" }], + [new[] { "a=1; Path=/", "b=2; Path=/" }], + [new[] { "x=foo; Path=/", "y=bar; Path=/", "z=baz; Path=/" }], + ]; + + [Theory] + [MemberData(nameof(SetCookieCases))] + public void Property4_SetCookieHeaders_MovedToCookies_AbsentFromMultiValueHeaders(string[] cookies) + { + var builder = new StandalonePreludeBuilder(); + var features = new InvokeFeatures(); + var rf = (IHttpResponseFeature)features; + rf.StatusCode = 200; + rf.Headers["Set-Cookie"] = new Microsoft.Extensions.Primitives.StringValues(cookies); + rf.Headers["content-type"] = "application/json"; + + var prelude = builder.InvokeBuildStreamingPrelude(rf); + + foreach (var cookie in cookies) + Assert.Contains(cookie, prelude.Cookies); + + Assert.False(prelude.MultiValueHeaders.ContainsKey("Set-Cookie"), + "Set-Cookie must not appear in MultiValueHeaders"); + Assert.False(prelude.MultiValueHeaders.ContainsKey("set-cookie"), + "set-cookie must not appear in MultiValueHeaders"); + + Assert.True(prelude.MultiValueHeaders.ContainsKey("content-type")); + } + + + public static IEnumerable BodyBytesCases() => + [ + [new[] { new byte[] { 1, 2, 3 } }], + [new[] { new byte[] { 10, 20 }, new byte[] { 30, 40, 50 } }], + [new[] { new byte[] { 0xFF }, new byte[] { 0x00 }, new byte[] { 0xAB, 0xCD } }], + [new[] { Encoding.UTF8.GetBytes("hello "), Encoding.UTF8.GetBytes("world") }], + ]; + + [Theory] + [MemberData(nameof(BodyBytesCases))] + public async Task Property5_BodyBytes_ForwardedToLambdaResponseStream_InOrder(byte[][] chunks) + { + var lambdaStream = new MemoryStream(); + var invokeFeatures = new InvokeFeatures(); + var feature = new StreamingResponseBodyFeature( + (IHttpResponseFeature)invokeFeatures, + () => Task.FromResult(lambdaStream)); + + await feature.StartAsync(); + + foreach (var chunk in chunks) + await feature.Stream.WriteAsync(chunk, 0, chunk.Length); + + lambdaStream.Position = 0; + var actual = lambdaStream.ToArray(); + var expected = chunks.SelectMany(c => c).ToArray(); + + Assert.Equal(expected, actual); + } + + + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(3)] + [InlineData(5)] + public async Task Property6_OnStartingCallbacks_FireBeforeFirstByte(int cbCount) + { + int sequenceCounter = 0; + var callbackSequences = new List(); + int firstWriteSequence = -1; + + var trackingStream = new WriteTrackingStream(() => firstWriteSequence = sequenceCounter++); + var invokeFeatures = new InvokeFeatures(); + var responseFeature = (IHttpResponseFeature)invokeFeatures; + + for (int i = 0; i < cbCount; i++) + { + responseFeature.OnStarting(_ => + { + callbackSequences.Add(sequenceCounter++); + return Task.CompletedTask; + }, null); + } + + var feature = new StreamingResponseBodyFeature( + responseFeature, + () => Task.FromResult(trackingStream)); + + await feature.StartAsync(); + var bytes = new byte[] { 1, 2, 3 }; + await feature.Stream.WriteAsync(bytes, 0, bytes.Length); + + Assert.Equal(cbCount, callbackSequences.Count); + Assert.True(firstWriteSequence >= 0, "No write reached the lambda stream"); + foreach (var seq in callbackSequences) + Assert.True(seq < firstWriteSequence, + $"Callback (seq={seq}) did not fire before first write (seq={firstWriteSequence})"); + } + + + public static IEnumerable FileRangeCases() => + [ + // (fileBytes, offset, count) — null count means read to end + [new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 }, 0L, (long?)8L], + [new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 }, 2L, (long?)4L], + [new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 }, 0L, (long?)null], + [new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 }, 5L, (long?)null], + [new byte[] { 0xAA, 0xBB, 0xCC, 0xDD }, 1L, (long?)2L], + ]; + + [Theory] + [MemberData(nameof(FileRangeCases))] + public async Task Property7_SendFileAsync_WritesCorrectByteRange( + byte[] fileBytes, long offset, long? count) + { + var lambdaStream = new MemoryStream(); + var invokeFeatures = new InvokeFeatures(); + var feature = new StreamingResponseBodyFeature( + (IHttpResponseFeature)invokeFeatures, + () => Task.FromResult(lambdaStream)); + + var tempFile = Path.GetTempFileName(); + try + { + await File.WriteAllBytesAsync(tempFile, fileBytes); + await feature.SendFileAsync(tempFile, offset, count); + + lambdaStream.Position = 0; + var actual = lambdaStream.ToArray(); + + long actualCount = count ?? (fileBytes.Length - offset); + var expected = fileBytes.Skip((int)offset).Take((int)actualCount).ToArray(); + + Assert.Equal(expected, actual); + } + finally + { + File.Delete(tempFile); + } + } + + + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(3)] + [InlineData(5)] + public void Property8_OnCompletedCallbacks_FireAfterStreamClose(int cbCount) + { + int sequenceCounter = 0; + var completedSequences = new List(); + int streamClosedSequence = -1; + + var function = new OnCompletedTrackingFunction( + cbCount: cbCount, + completedSequences: completedSequences, + getAndIncrementCounter: () => sequenceCounter++, + onStreamClosed: () => streamClosedSequence = sequenceCounter++); + + var context = new TestLambdaContext(); + var request = MakeRequest(); + + function.FunctionHandlerAsync(request, context).GetAwaiter().GetResult(); + + Assert.Equal(cbCount, completedSequences.Count); + Assert.True(streamClosedSequence >= 0, "Stream was never closed"); + foreach (var seq in completedSequences) + Assert.True(seq > streamClosedSequence, + $"OnCompleted callback (seq={seq}) fired before stream closed (seq={streamClosedSequence})"); + } + + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + private class WriteTrackingStream : MemoryStream + { + private readonly Action _onFirstWrite; + private bool _fired; + + public WriteTrackingStream(Action onFirstWrite) => _onFirstWrite = onFirstWrite; + + public override void Write(byte[] buffer, int offset, int count) + { + FireOnce(); + base.Write(buffer, offset, count); + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, + CancellationToken cancellationToken) + { + FireOnce(); + return base.WriteAsync(buffer, offset, count, cancellationToken); + } + + private void FireOnce() + { + if (!_fired) { _fired = true; _onFirstWrite?.Invoke(); } + } + } + + private class OnCompletedTrackingFunction : APIGatewayHttpApiV2ProxyFunction + { + private readonly int _cbCount; + private readonly List _completedSequences; + private readonly Func _getAndIncrementCounter; + private readonly Action _onStreamClosed; + + public OnCompletedTrackingFunction( + int cbCount, + List completedSequences, + Func getAndIncrementCounter, + Action onStreamClosed) + : base(StartupMode.FirstRequest) + { + EnableResponseStreaming = true; + _cbCount = cbCount; + _completedSequences = completedSequences; + _getAndIncrementCounter = getAndIncrementCounter; + _onStreamClosed = onStreamClosed; + } + + protected override void PostMarshallItemsFeatureFeature( + IItemsFeature aspNetCoreItemFeature, + APIGatewayHttpApiV2ProxyRequest lambdaRequest, + ILambdaContext lambdaContext) + { + var responseFeature = (IHttpResponseFeature)aspNetCoreItemFeature; + for (int i = 0; i < _cbCount; i++) + { + responseFeature.OnCompleted(_ => + { + _completedSequences.Add(_getAndIncrementCounter()); + return Task.CompletedTask; + }, null); + } + base.PostMarshallItemsFeatureFeature(aspNetCoreItemFeature, lambdaRequest, lambdaContext); + } + + [RequiresPreviewFeatures] + protected override Stream CreateLambdaResponseStream( + Amazon.Lambda.Core.ResponseStreaming.HttpResponseStreamPrelude prelude) + { + return new CloseTrackingStream(_onStreamClosed); + } + } + + private class CloseTrackingStream : MemoryStream + { + private readonly Action _onClose; + private bool _closed; + + public CloseTrackingStream(Action onClose) => _onClose = onClose; + + protected override void Dispose(bool disposing) + { + if (!_closed) { _closed = true; _onClose?.Invoke(); } + base.Dispose(disposing); + } + + public override ValueTask DisposeAsync() + { + if (!_closed) { _closed = true; _onClose?.Invoke(); } + return base.DisposeAsync(); + } + } + } +} diff --git a/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/StreamingFunctionHandlerAsyncTests.cs b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/StreamingFunctionHandlerAsyncTests.cs new file mode 100644 index 000000000..b006ec11f --- /dev/null +++ b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/StreamingFunctionHandlerAsyncTests.cs @@ -0,0 +1,703 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +using System; +using System.Collections.Generic; +using System.IO; +using System.Reflection; +using System.Runtime.Versioning; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +using Amazon.Lambda.APIGatewayEvents; +using Amazon.Lambda.AspNetCoreServer.Internal; +using Amazon.Lambda.Core; +using Amazon.Lambda.TestUtilities; + +using Microsoft.AspNetCore.Http.Features; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +using Xunit; + +namespace Amazon.Lambda.AspNetCoreServer.Test +{ + /// + /// Unit tests for the streaming path in + /// when EnableResponseStreaming is true. + /// + /// overrides CreateLambdaResponseStream to inject + /// a instead of calling LambdaResponseStreamFactory.CreateHttpStream, + /// allowing tests to run without the Lambda runtime. + /// + [RequiresPreviewFeatures] + public class StreamingFunctionHandlerAsyncTests + { + // ----------------------------------------------------------------------- + // Base testable subclass — overrides CreateLambdaResponseStream + // ----------------------------------------------------------------------- + + private class TestableStreamingFunction : APIGatewayHttpApiV2ProxyFunction + { + // Captured in PostMarshallItemsFeatureFeature — the InvokeFeatures after MarshallRequest + public InvokeFeatures CapturedFeatures { get; private set; } + + // The MemoryStream used as the Lambda response stream + public MemoryStream CapturedLambdaStream { get; private set; } + + // Whether CreateLambdaResponseStream was called (stream was opened) + public bool StreamOpened { get; private set; } + + // Whether MarshallResponse was called (buffered mode check) + public bool MarshallResponseCalled { get; private set; } + + // Optional setup action invoked inside PostMarshallItemsFeatureFeature + public Func PipelineSetupAction { get; set; } + + public TestableStreamingFunction() + : base(StartupMode.FirstRequest) + { + EnableResponseStreaming = true; + } + + // Expose MarshallRequest publicly so tests can call it after the host is started + public void PublicMarshallRequest(InvokeFeatures features, + APIGatewayHttpApiV2ProxyRequest request, ILambdaContext context) + => MarshallRequest(features, request, context); + + protected override void PostMarshallItemsFeatureFeature( + IItemsFeature aspNetCoreItemFeature, + APIGatewayHttpApiV2ProxyRequest lambdaRequest, + ILambdaContext lambdaContext) + { + CapturedFeatures = aspNetCoreItemFeature as InvokeFeatures; + PipelineSetupAction?.Invoke(CapturedFeatures); + base.PostMarshallItemsFeatureFeature(aspNetCoreItemFeature, lambdaRequest, lambdaContext); + } + + [RequiresPreviewFeatures] + protected override Stream CreateLambdaResponseStream( + Amazon.Lambda.Core.ResponseStreaming.HttpResponseStreamPrelude prelude) + { + var ms = new MemoryStream(); + CapturedLambdaStream = ms; + StreamOpened = true; + return ms; + } + + protected override APIGatewayHttpApiV2ProxyResponse MarshallResponse( + IHttpResponseFeature responseFeatures, + ILambdaContext lambdaContext, + int statusCodeIfNotSet = 200) + { + MarshallResponseCalled = true; + return base.MarshallResponse(responseFeatures, lambdaContext, statusCodeIfNotSet); + } + } + + // ----------------------------------------------------------------------- + // Helper: build a minimal APIGatewayHttpApiV2ProxyRequest + // ----------------------------------------------------------------------- + private static APIGatewayHttpApiV2ProxyRequest MakeRequest( + string method = "GET", + string path = "/api/values", + Dictionary headers = null, + string body = null) + { + return new APIGatewayHttpApiV2ProxyRequest + { + RequestContext = new APIGatewayHttpApiV2ProxyRequest.ProxyRequestContext + { + Http = new APIGatewayHttpApiV2ProxyRequest.HttpDescription + { + Method = method, + Path = path + }, + Stage = "$default" + }, + RawPath = path, + Headers = headers ?? new Dictionary + { + ["accept"] = "application/json" + }, + Body = body + }; + } + + [Fact] + public async Task RequestMarshalling_ProducesSameHttpRequestFeatureState_AsBufferedMode() + { + var function = new TestableStreamingFunction(); + var context = new TestLambdaContext(); + var request = MakeRequest( + method: "POST", + path: "/api/values", + headers: new Dictionary + { + ["content-type"] = "application/json", + ["x-custom-header"] = "test-value" + }, + body: "{\"key\":\"value\"}" + ); + + // Run the streaming path first — this starts the host and captures features + await function.FunctionHandlerAsync(request, context); + var streamingReq = (IHttpRequestFeature)function.CapturedFeatures; + + // Now call MarshallRequest directly (host is started, _logger is initialized) + var bufferedFeatures = new InvokeFeatures(); + function.PublicMarshallRequest(bufferedFeatures, request, context); + var bufferedReq = (IHttpRequestFeature)bufferedFeatures; + + Assert.NotNull(streamingReq); + Assert.Equal(bufferedReq.Method, streamingReq.Method); + Assert.Equal(bufferedReq.Path, streamingReq.Path); + Assert.Equal(bufferedReq.PathBase, streamingReq.PathBase); + Assert.Equal(bufferedReq.QueryString, streamingReq.QueryString); + Assert.Equal(bufferedReq.Scheme, streamingReq.Scheme); + } + + [Fact] + public async Task RequestMarshalling_PreservesHeaders_InStreamingMode() + { + var function = new TestableStreamingFunction(); + var context = new TestLambdaContext(); + var request = MakeRequest( + headers: new Dictionary + { + ["x-forwarded-for"] = "1.2.3.4", + ["accept"] = "text/html" + } + ); + + // Run streaming path first to start the host + await function.FunctionHandlerAsync(request, context); + var streamingReq = (IHttpRequestFeature)function.CapturedFeatures; + + // Compare with buffered path + var bufferedFeatures = new InvokeFeatures(); + function.PublicMarshallRequest(bufferedFeatures, request, context); + var bufferedReq = (IHttpRequestFeature)bufferedFeatures; + + foreach (var key in bufferedReq.Headers.Keys) + { + Assert.True(streamingReq.Headers.ContainsKey(key), + $"Streaming features missing header '{key}' that buffered features has"); + Assert.Equal(bufferedReq.Headers[key], streamingReq.Headers[key]); + } + } + + [Fact] + public async Task AfterSetup_BodyFeature_IsStreamingResponseBodyFeature() + { + IHttpResponseBodyFeature capturedBodyFeature = null; + + var function = new TestableStreamingFunction(); + function.PipelineSetupAction = features => + { + var responseFeature = (IHttpResponseFeature)features; + responseFeature.OnStarting(_ => + { + capturedBodyFeature = (IHttpResponseBodyFeature)features[typeof(IHttpResponseBodyFeature)]; + return Task.CompletedTask; + }, null); + return Task.CompletedTask; + }; + + var context = new TestLambdaContext(); + var request = MakeRequest(); + + await function.FunctionHandlerAsync(request, context); + + // Verify via CapturedFeatures directly — the body feature was replaced before pipeline ran + var bodyFeatureFromCapture = function.CapturedFeatures[typeof(IHttpResponseBodyFeature)]; + Assert.IsType(bodyFeatureFromCapture); + } + + [Fact] + public async Task AfterSetup_BodyFeature_IsStreamingResponseBodyFeature_ViaOnStarting() + { + IHttpResponseBodyFeature capturedBodyFeature = null; + + var function = new TestableStreamingFunction(); + function.PipelineSetupAction = features => + { + var responseFeature = (IHttpResponseFeature)features; + responseFeature.OnStarting(_ => + { + capturedBodyFeature = (IHttpResponseBodyFeature)features[typeof(IHttpResponseBodyFeature)]; + return Task.CompletedTask; + }, null); + return Task.CompletedTask; + }; + + var context = new TestLambdaContext(); + var request = MakeRequest(); + + await function.FunctionHandlerAsync(request, context); + + if (capturedBodyFeature != null) + { + Assert.IsType(capturedBodyFeature); + } + else + { + var bodyFeature = function.CapturedFeatures[typeof(IHttpResponseBodyFeature)]; + Assert.IsType(bodyFeature); + } + } + + [Fact] + public async Task FunctionHandlerAsync_BufferedMode_StillReturnsResponse_ViaMarshallResponse() + { + // Buffered mode: EnableResponseStreaming defaults to false + var function = new TestableStreamingFunction(); + function.EnableResponseStreaming = false; + var context = new TestLambdaContext(); + var request = MakeRequest(); + + var response = await function.FunctionHandlerAsync(request, context); + + Assert.NotNull(response); + Assert.True(function.MarshallResponseCalled, + "MarshallResponse should have been called in buffered mode"); + Assert.IsType(response); + } + + [Fact] + public async Task FunctionHandlerAsync_BufferedMode_ReturnsStatusCode_FromPipeline() + { + var function = new TestableStreamingFunction(); + function.EnableResponseStreaming = false; + var context = new TestLambdaContext(); + var request = MakeRequest(path: "/api/values"); + + var response = await function.FunctionHandlerAsync(request, context); + + Assert.Equal(200, response.StatusCode); + } + + [Fact] + public async Task FunctionHandlerAsync_BufferedMode_DoesNotOpenLambdaStream() + { + var function = new TestableStreamingFunction(); + function.EnableResponseStreaming = false; + var context = new TestLambdaContext(); + var request = MakeRequest(); + + await function.FunctionHandlerAsync(request, context); + + Assert.False(function.StreamOpened, + "FunctionHandlerAsync (buffered mode) should not open the Lambda response stream"); + } + + // ----------------------------------------------------------------------- + // 7.4 OnCompleted callbacks fire after LambdaResponseStream is closed + // on success path + // ----------------------------------------------------------------------- + [Fact] + public async Task OnCompleted_FiresAfterStreamClosed_OnSuccessPath() + { + bool callbackFired = false; + + var function = new TestableStreamingFunction(); + function.PipelineSetupAction = features => + { + var responseFeature = (IHttpResponseFeature)features; + responseFeature.OnCompleted(_ => + { + callbackFired = true; + return Task.CompletedTask; + }, null); + return Task.CompletedTask; + }; + + var context = new TestLambdaContext(); + var request = MakeRequest(); + + await function.FunctionHandlerAsync(request, context); + + Assert.True(callbackFired, "OnCompleted callback should have fired on the success path"); + } + + [Fact] + public async Task OnCompleted_MultipleCallbacks_AllFire() + { + int firedCount = 0; + + var function = new TestableStreamingFunction(); + function.PipelineSetupAction = features => + { + var responseFeature = (IHttpResponseFeature)features; + for (int i = 0; i < 3; i++) + { + responseFeature.OnCompleted(_ => + { + firedCount++; + return Task.CompletedTask; + }, null); + } + return Task.CompletedTask; + }; + + var context = new TestLambdaContext(); + var request = MakeRequest(); + + await function.FunctionHandlerAsync(request, context); + + Assert.Equal(3, firedCount); + } + + [Fact] + public async Task ExceptionBeforeStreamOpen_StreamClosedCleanly_OnCompletedFires() + { + bool onCompletedFired = false; + + var function = new ThrowingBeforeStreamOpenFunction( + onCompleted: () => onCompletedFired = true); + + var context = new TestLambdaContext(); + var request = MakeRequest(); + + await function.FunctionHandlerAsync(request, context); + + Assert.False(function.StreamOpened, + "Stream should not have been opened when exception occurs before stream open"); + Assert.True(onCompletedFired, + "OnCompleted should fire even when exception occurs before stream open"); + } + + [Fact] + public async Task ExceptionBeforeStreamOpen_WithIncludeExceptionDetail_Writes500ErrorBody() + { + const string exceptionMessage = "Deliberate test failure for 500 response"; + + var function = new ThrowingBeforeStreamOpenFunction( + exceptionMessage: exceptionMessage, + onCompleted: null) + { + IncludeUnhandledExceptionDetailInResponse = true + }; + + var context = new TestLambdaContext(); + var request = MakeRequest(); + + await function.FunctionHandlerAsync(request, context); + + Assert.True(function.StreamOpened, + "An error stream should have been opened for the 500 response"); + Assert.NotNull(function.CapturedLambdaStream); + + var errorBody = Encoding.UTF8.GetString(function.CapturedLambdaStream.ToArray()); + Assert.Contains(exceptionMessage, errorBody); + } + + [Fact] + public async Task ExceptionBeforeStreamOpen_WithoutIncludeExceptionDetail_NoStreamOpened() + { + var function = new ThrowingBeforeStreamOpenFunction( + exceptionMessage: "Should not appear in response", + onCompleted: null) + { + IncludeUnhandledExceptionDetailInResponse = false + }; + + var context = new TestLambdaContext(); + var request = MakeRequest(); + + await function.FunctionHandlerAsync(request, context); + + Assert.False(function.StreamOpened, + "Stream should not be opened when IncludeUnhandledExceptionDetailInResponse=false"); + } + + // ----------------------------------------------------------------------- + // 7.7 Exception after stream open → stream closed after logging, OnCompleted fires + // ----------------------------------------------------------------------- + [Fact] + public async Task ExceptionAfterStreamOpen_StreamClosedAfterLogging_OnCompletedFires() + { + bool onCompletedFired = false; + + var function = new ThrowingAfterStreamOpenFunction( + onCompleted: () => onCompletedFired = true); + + var context = new TestLambdaContext(); + var request = MakeRequest(); + + await function.FunctionHandlerAsync(request, context); + + Assert.True(function.StreamOpened, + "Stream should have been opened before the exception"); + Assert.True(onCompletedFired, + "OnCompleted should fire even when exception occurs after stream open"); + } + + [Fact] + public async Task ExceptionAfterStreamOpen_DoesNotWriteNewErrorBody() + { + var function = new ThrowingAfterStreamOpenFunction(onCompleted: null) + { + IncludeUnhandledExceptionDetailInResponse = true + }; + + var context = new TestLambdaContext(); + var request = MakeRequest(); + + await function.FunctionHandlerAsync(request, context); + + Assert.True(function.StreamOpened); + var streamContent = function.CapturedLambdaStream.ToArray(); + var bodyText = Encoding.UTF8.GetString(streamContent); + Assert.DoesNotContain("InvalidOperationException", bodyText); + } + + [Fact] + public void FunctionHandlerAsync_HasLambdaSerializerAttribute() + { + var method = typeof(APIGatewayHttpApiV2ProxyFunction) + .GetMethod(nameof(APIGatewayHttpApiV2ProxyFunction.FunctionHandlerAsync)); + + Assert.NotNull(method); + + var attr = method.GetCustomAttribute(); + Assert.NotNull(attr); + Assert.Equal( + typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer), + attr.SerializerType); + } + + [Fact] + public void EnableResponseStreaming_Property_HasRequiresPreviewFeaturesAttribute() + { + var prop = typeof(APIGatewayHttpApiV2ProxyFunction) + .GetProperty(nameof(APIGatewayHttpApiV2ProxyFunction.EnableResponseStreaming)); + + Assert.NotNull(prop); + + var attr = prop.GetCustomAttribute(); + Assert.NotNull(attr); + } + + [Fact] + public void EnableResponseStreaming_Property_DefaultsToFalse() + { + var function = new TestableStreamingFunction(); + function.EnableResponseStreaming = false; // reset to default + Assert.False(function.EnableResponseStreaming); + } + + [Fact] + public void FunctionHandlerAsync_ReturnsTaskOfT() + { + var method = typeof(APIGatewayHttpApiV2ProxyFunction) + .GetMethod(nameof(APIGatewayHttpApiV2ProxyFunction.FunctionHandlerAsync)); + + Assert.NotNull(method); + Assert.True(method.ReturnType.IsGenericType); + Assert.Equal(typeof(Task<>), method.ReturnType.GetGenericTypeDefinition()); + } + + [Fact] + public void FunctionHandlerAsync_IsPublicVirtual() + { + var method = typeof(APIGatewayHttpApiV2ProxyFunction) + .GetMethod(nameof(APIGatewayHttpApiV2ProxyFunction.FunctionHandlerAsync)); + + Assert.NotNull(method); + Assert.True(method.IsPublic); + Assert.True(method.IsVirtual); + } + + // ----------------------------------------------------------------------- + // Helper subclasses for exception-path tests + // ----------------------------------------------------------------------- + + /// + /// Base class for exception-path tests. Overrides ExecuteStreamingRequestAsync + /// indirectly by overriding the pipeline via a custom ProcessRequest-equivalent. + /// Uses EnableResponseStreaming = true so FunctionHandlerAsync takes the + /// streaming path, then injects custom pipeline logic via . + /// + private abstract class CustomPipelineStreamingFunction + : APIGatewayHttpApiV2ProxyFunction + { + public MemoryStream CapturedLambdaStream { get; protected set; } + public bool StreamOpened { get; protected set; } + + protected CustomPipelineStreamingFunction() + : base(StartupMode.FirstRequest) + { + EnableResponseStreaming = true; + } + + [RequiresPreviewFeatures] + protected override Stream CreateLambdaResponseStream( + Amazon.Lambda.Core.ResponseStreaming.HttpResponseStreamPrelude prelude) + { + var ms = new MemoryStream(); + CapturedLambdaStream = ms; + StreamOpened = true; + return ms; + } + + // Override FunctionHandlerAsync to inject custom pipeline logic. + // We replicate the streaming setup from ExecuteStreamingRequestAsync so we can + // call RunPipelineAsync instead of the real ASP.NET Core pipeline. + public override async Task FunctionHandlerAsync( + APIGatewayHttpApiV2ProxyRequest request, + ILambdaContext lambdaContext) + { + if (!IsStarted) Start(); + + var features = new InvokeFeatures(); + MarshallRequest(features, request, lambdaContext); + + var itemFeatures = (IItemsFeature)features; + itemFeatures.Items = new System.Collections.Generic.Dictionary(); + itemFeatures.Items[LAMBDA_CONTEXT] = lambdaContext; + itemFeatures.Items[LAMBDA_REQUEST_OBJECT] = request; + PostMarshallItemsFeatureFeature(itemFeatures, request, lambdaContext); + + var responseFeature = (IHttpResponseFeature)features; + + async Task OpenStream() + { + var prelude = BuildStreamingPrelude(responseFeature); + return CreateLambdaResponseStream(prelude); + } + + var streamingBodyFeature = new StreamingResponseBodyFeature(_logger, responseFeature, OpenStream); + features[typeof(IHttpResponseBodyFeature)] = streamingBodyFeature; + + var scope = this._hostServices.CreateScope(); + Exception pipelineException = null; + try + { + ((IServiceProvidersFeature)features).RequestServices = scope.ServiceProvider; + + try + { + try + { + await RunPipelineAsync(features, streamingBodyFeature); + } + catch (Exception e) + { + pipelineException = e; + + if (!StreamOpened && IncludeUnhandledExceptionDetailInResponse) + { + var errorPrelude = new Amazon.Lambda.Core.ResponseStreaming.HttpResponseStreamPrelude + { + StatusCode = System.Net.HttpStatusCode.InternalServerError + }; + var errorStream = CreateLambdaResponseStream(errorPrelude); + var errorBytes = Encoding.UTF8.GetBytes(ErrorReport(e)); + await errorStream.WriteAsync(errorBytes, 0, errorBytes.Length); + } + else if (StreamOpened) + { + _logger.LogError(e, $"Unhandled exception after response stream was opened: {ErrorReport(e)}"); + } + else + { + _logger.LogError(e, $"Unknown error responding to request: {ErrorReport(e)}"); + } + } + } + finally + { + if (features.ResponseCompletedEvents != null) + { + await features.ResponseCompletedEvents.ExecuteAsync(); + } + } + } + finally + { + scope.Dispose(); + } + + return default; + } + + protected abstract Task RunPipelineAsync( + InvokeFeatures features, + StreamingResponseBodyFeature bodyFeature); + } + + private class ThrowingBeforeStreamOpenFunction : CustomPipelineStreamingFunction + { + private readonly string _exceptionMessage; + private readonly Action _onCompleted; + + public ThrowingBeforeStreamOpenFunction( + string exceptionMessage = "Test exception before stream open", + Action onCompleted = null) + { + _exceptionMessage = exceptionMessage; + _onCompleted = onCompleted; + } + + protected override void PostMarshallItemsFeatureFeature( + IItemsFeature aspNetCoreItemFeature, + APIGatewayHttpApiV2ProxyRequest lambdaRequest, + ILambdaContext lambdaContext) + { + if (_onCompleted != null) + { + ((IHttpResponseFeature)aspNetCoreItemFeature).OnCompleted(_ => + { + _onCompleted(); + return Task.CompletedTask; + }, null); + } + base.PostMarshallItemsFeatureFeature(aspNetCoreItemFeature, lambdaRequest, lambdaContext); + } + + protected override Task RunPipelineAsync( + InvokeFeatures features, + StreamingResponseBodyFeature bodyFeature) + { + throw new InvalidOperationException(_exceptionMessage); + } + } + + private class ThrowingAfterStreamOpenFunction : CustomPipelineStreamingFunction + { + private readonly Action _onCompleted; + + public ThrowingAfterStreamOpenFunction(Action onCompleted = null) + { + _onCompleted = onCompleted; + } + + protected override void PostMarshallItemsFeatureFeature( + IItemsFeature aspNetCoreItemFeature, + APIGatewayHttpApiV2ProxyRequest lambdaRequest, + ILambdaContext lambdaContext) + { + if (_onCompleted != null) + { + ((IHttpResponseFeature)aspNetCoreItemFeature).OnCompleted(_ => + { + _onCompleted(); + return Task.CompletedTask; + }, null); + } + base.PostMarshallItemsFeatureFeature(aspNetCoreItemFeature, lambdaRequest, lambdaContext); + } + + protected override async Task RunPipelineAsync( + InvokeFeatures features, + StreamingResponseBodyFeature bodyFeature) + { + await bodyFeature.StartAsync(); + var partial = Encoding.UTF8.GetBytes("partial"); + await bodyFeature.Stream.WriteAsync(partial, 0, partial.Length); + throw new InvalidOperationException("Test exception after stream open"); + } + } + } +} diff --git a/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/StreamingResponseBodyFeatureTests.cs b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/StreamingResponseBodyFeatureTests.cs new file mode 100644 index 000000000..cdbd403e4 --- /dev/null +++ b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/StreamingResponseBodyFeatureTests.cs @@ -0,0 +1,286 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +using System; +using System.IO; +using System.Runtime.Versioning; +using System.Threading.Tasks; + +using Amazon.Lambda.AspNetCoreServer.Internal; +using Microsoft.AspNetCore.Http.Features; +using Xunit; + +namespace Amazon.Lambda.AspNetCoreServer.Test +{ + [RequiresPreviewFeatures] + public class StreamingResponseBodyFeatureTests + { + // Helper: creates a StreamingResponseBodyFeature backed by a MemoryStream stand-in. + // Returns the feature and the MemoryStream that acts as the LambdaResponseStream. + private static (StreamingResponseBodyFeature feature, MemoryStream lambdaStream, InvokeFeatures invokeFeatures) + CreateFeature() + { + var lambdaStream = new MemoryStream(); + var invokeFeatures = new InvokeFeatures(); + var feature = new StreamingResponseBodyFeature( + (IHttpResponseFeature)invokeFeatures, + () => Task.FromResult(lambdaStream)); + return (feature, lambdaStream, invokeFeatures); + } + + [Fact] + public async Task PreStartBytes_AreBuffered_ThenFlushedToLambdaStream_OnStartAsync() + { + var (feature, lambdaStream, _) = CreateFeature(); + + // Write before StartAsync — should go to the pre-start buffer, NOT to lambdaStream yet. + var preBytes = new byte[] { 1, 2, 3 }; + await feature.Stream.WriteAsync(preBytes, 0, preBytes.Length); + + Assert.Equal(0, lambdaStream.Length); // nothing in lambda stream yet + + // Now call StartAsync — buffered bytes should be flushed. + await feature.StartAsync(); + + lambdaStream.Position = 0; + var result = lambdaStream.ToArray(); + Assert.Equal(preBytes, result); + } + + [Fact] + public async Task PostStartBytes_GoDirectlyToLambdaStream() + { + var (feature, lambdaStream, _) = CreateFeature(); + + await feature.StartAsync(); + + var postBytes = new byte[] { 10, 20, 30, 40 }; + await feature.Stream.WriteAsync(postBytes, 0, postBytes.Length); + + lambdaStream.Position = 0; + var result = lambdaStream.ToArray(); + Assert.Equal(postBytes, result); + } + + [Fact] + public async Task OnStartingCallbacks_FireBeforeFirstByteReachesLambdaStream() + { + var lambdaStream = new SequenceTrackingStream(); + var invokeFeatures = new InvokeFeatures(); + var responseFeature = (IHttpResponseFeature)invokeFeatures; + + int callbackSequence = -1; + int writeSequence = -1; + int sequenceCounter = 0; + + // Register an OnStarting callback that records its sequence number. + responseFeature.OnStarting(_ => + { + callbackSequence = sequenceCounter++; + return Task.CompletedTask; + }, null); + + // The stream opener records the sequence when the stream is first written to. + var feature = new StreamingResponseBodyFeature( + responseFeature, + () => + { + lambdaStream.OnFirstWrite = () => writeSequence = sequenceCounter++; + return Task.FromResult(lambdaStream); + }); + + // Write a byte — this should trigger StartAsync internally (via Stream property + // returning the pre-start buffer), but we explicitly call StartAsync here. + await feature.StartAsync(); + + // Write after start to trigger the first actual write to lambdaStream. + var bytes = new byte[] { 0xFF }; + await feature.Stream.WriteAsync(bytes, 0, bytes.Length); + + Assert.True(callbackSequence >= 0, "OnStarting callback was never called"); + Assert.True(writeSequence >= 0, "No write reached the lambda stream"); + Assert.True(callbackSequence < writeSequence, + $"OnStarting callback (seq={callbackSequence}) should fire before first write (seq={writeSequence})"); + } + + [Fact] + public async Task DisableBuffering_IsNoOp_DoesNotThrow_DoesNotChangeBehavior() + { + var (feature, lambdaStream, _) = CreateFeature(); + + // Should not throw. + feature.DisableBuffering(); + + // Behavior should be unchanged: bytes still flow through normally. + await feature.StartAsync(); + var bytes = new byte[] { 7, 8, 9 }; + await feature.Stream.WriteAsync(bytes, 0, bytes.Length); + + lambdaStream.Position = 0; + Assert.Equal(bytes, lambdaStream.ToArray()); + } + + [Fact] + public void DisableBuffering_BeforeStart_DoesNotThrow() + { + var (feature, _, _) = CreateFeature(); + var ex = Record.Exception(() => feature.DisableBuffering()); + Assert.Null(ex); + } + + [Fact] + public async Task SendFileAsync_WritesFullFile_WhenNoOffsetOrCount() + { + var (feature, lambdaStream, _) = CreateFeature(); + + var fileBytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 }; + var tempFile = Path.GetTempFileName(); + try + { + await File.WriteAllBytesAsync(tempFile, fileBytes); + + await feature.SendFileAsync(tempFile, 0, null); + + lambdaStream.Position = 0; + Assert.Equal(fileBytes, lambdaStream.ToArray()); + } + finally + { + File.Delete(tempFile); + } + } + + [Fact] + public async Task SendFileAsync_WritesCorrectByteRange_WithOffsetAndCount() + { + var (feature, lambdaStream, _) = CreateFeature(); + + var fileBytes = new byte[] { 10, 20, 30, 40, 50, 60, 70, 80 }; + var tempFile = Path.GetTempFileName(); + try + { + await File.WriteAllBytesAsync(tempFile, fileBytes); + + // Read bytes at offset=2, count=4 → should get [30, 40, 50, 60] + await feature.SendFileAsync(tempFile, offset: 2, count: 4); + + lambdaStream.Position = 0; + Assert.Equal(new byte[] { 30, 40, 50, 60 }, lambdaStream.ToArray()); + } + finally + { + File.Delete(tempFile); + } + } + + [Fact] + public async Task SendFileAsync_WithOffset_SkipsLeadingBytes() + { + var (feature, lambdaStream, _) = CreateFeature(); + + var fileBytes = new byte[] { 1, 2, 3, 4, 5 }; + var tempFile = Path.GetTempFileName(); + try + { + await File.WriteAllBytesAsync(tempFile, fileBytes); + + // offset=3, count=null → should get [4, 5] + await feature.SendFileAsync(tempFile, offset: 3, count: null); + + lambdaStream.Position = 0; + Assert.Equal(new byte[] { 4, 5 }, lambdaStream.ToArray()); + } + finally + { + File.Delete(tempFile); + } + } + + [Fact] + public async Task CompleteAsync_CallsStartAsync_WhenNotYetStarted() + { + bool streamOpenerCalled = false; + var lambdaStream = new MemoryStream(); + var invokeFeatures = new InvokeFeatures(); + + var feature = new StreamingResponseBodyFeature( + (IHttpResponseFeature)invokeFeatures, + () => + { + streamOpenerCalled = true; + return Task.FromResult(lambdaStream); + }); + + Assert.False(streamOpenerCalled); + + await feature.CompleteAsync(); + + Assert.True(streamOpenerCalled, "CompleteAsync should have triggered StartAsync which calls the stream opener"); + } + + [Fact] + public async Task CompleteAsync_WhenAlreadyStarted_DoesNotCallStreamOpenerAgain() + { + int streamOpenerCallCount = 0; + var lambdaStream = new MemoryStream(); + var invokeFeatures = new InvokeFeatures(); + + var feature = new StreamingResponseBodyFeature( + (IHttpResponseFeature)invokeFeatures, + () => + { + streamOpenerCallCount++; + return Task.FromResult(lambdaStream); + }); + + await feature.StartAsync(); + await feature.CompleteAsync(); + + Assert.Equal(1, streamOpenerCallCount); + } + + [Fact] + public async Task PreAndPostStartBytes_AreForwardedInOrder() + { + var (feature, lambdaStream, _) = CreateFeature(); + + var preBytes = new byte[] { 1, 2, 3 }; + var postBytes = new byte[] { 4, 5, 6 }; + + await feature.Stream.WriteAsync(preBytes, 0, preBytes.Length); + await feature.StartAsync(); + await feature.Stream.WriteAsync(postBytes, 0, postBytes.Length); + + lambdaStream.Position = 0; + var result = lambdaStream.ToArray(); + Assert.Equal(new byte[] { 1, 2, 3, 4, 5, 6 }, result); + } + + private class SequenceTrackingStream : MemoryStream + { + public Action OnFirstWrite { get; set; } + private bool _firstWriteDone; + + public override void Write(byte[] buffer, int offset, int count) + { + FireFirstWrite(); + base.Write(buffer, offset, count); + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, + System.Threading.CancellationToken cancellationToken) + { + FireFirstWrite(); + return base.WriteAsync(buffer, offset, count, cancellationToken); + } + + private void FireFirstWrite() + { + if (!_firstWriteDone) + { + _firstWriteDone = true; + OnFirstWrite?.Invoke(); + } + } + } + } +} diff --git a/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/TestApiGatewayHttpApiV2Calls.cs b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/TestApiGatewayHttpApiV2Calls.cs index 1b844bf1e..e6a80d38c 100644 --- a/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/TestApiGatewayHttpApiV2Calls.cs +++ b/Libraries/test/Amazon.Lambda.AspNetCoreServer.Test/TestApiGatewayHttpApiV2Calls.cs @@ -1,11 +1,8 @@ using System; -using System.Collections.Generic; using System.IO; -using System.IO.Compression; using System.Linq; using System.Net; using System.Reflection; -using System.Text; using System.Threading; using System.Threading.Tasks; @@ -285,7 +282,6 @@ public async Task TestTraceIdSetFromLambdaContext() } } - #if NET8_0_OR_GREATER /// /// Verifies that is invoked during startup. /// @@ -313,7 +309,6 @@ public async Task TestSnapStartInitialization() Assert.True(SnapStartController.Invoked); } - #endif private async Task InvokeAPIGatewayRequest(string fileName, bool configureApiToReturnExceptionDetail = false) { diff --git a/Libraries/test/Amazon.Lambda.Logging.AspNetCore.Tests/Amazon.Lambda.Logging.AspNetCore.Tests.csproj b/Libraries/test/Amazon.Lambda.Logging.AspNetCore.Tests/Amazon.Lambda.Logging.AspNetCore.Tests.csproj index df332b1a5..148cb7155 100644 --- a/Libraries/test/Amazon.Lambda.Logging.AspNetCore.Tests/Amazon.Lambda.Logging.AspNetCore.Tests.csproj +++ b/Libraries/test/Amazon.Lambda.Logging.AspNetCore.Tests/Amazon.Lambda.Logging.AspNetCore.Tests.csproj @@ -1,7 +1,7 @@  - net6.0 + net10.0 Amazon.Lambda.Logging.AspNetCore.Tests Amazon.Lambda.Logging.AspNetCore.Tests true diff --git a/Libraries/test/TestMinimalAPIApp/TestMinimalAPIApp.csproj b/Libraries/test/TestMinimalAPIApp/TestMinimalAPIApp.csproj index 60080ae84..94bb22bb2 100644 --- a/Libraries/test/TestMinimalAPIApp/TestMinimalAPIApp.csproj +++ b/Libraries/test/TestMinimalAPIApp/TestMinimalAPIApp.csproj @@ -1,7 +1,7 @@  - net6.0 + net10.0 enable enable diff --git a/Libraries/test/TestWebApp/TestWebApp.csproj b/Libraries/test/TestWebApp/TestWebApp.csproj index e5607beb2..37a8b67fa 100644 --- a/Libraries/test/TestWebApp/TestWebApp.csproj +++ b/Libraries/test/TestWebApp/TestWebApp.csproj @@ -1,7 +1,7 @@  - net6.0;net8.0 + net8.0;net10.0 true TestWebApp Exe