Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .autover/changes/c27a62e6-91ca-4a59-9406-394866cdfa62.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"Projects": [
{
"Name": "Amazon.Lambda.RuntimeSupport",
"Type": "Minor",
"ChangelogMessages": [
"(Preview) Add response streaming support"
]
},
{
"Name": "Amazon.Lambda.Core",
"Type": "Minor",
"ChangelogMessages": [
"(Preview) Add response streaming support"
]
}
]
}
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*.suo
*.user

**/.kiro/

####################
# Build/Test folders
####################
Expand Down
19 changes: 17 additions & 2 deletions Libraries/Libraries.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.0.31717.71
# Visual Studio Version 18
VisualStudioVersion = 18.3.11512.155 d18.3
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{AAB54E74-20B1-42ED-BC3D-CE9F7BC7FD12}"
EndProject
Expand Down Expand Up @@ -151,6 +151,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestCustomAuthorizerApp.Int
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestCustomAuthorizerApp", "test\TestCustomAuthorizerApp\TestCustomAuthorizerApp.csproj", "{3BFA4B73-BA61-4578-833B-C5B3A16EDA9E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ResponseStreamingFunctionHandlers", "test\Amazon.Lambda.RuntimeSupport.Tests\ResponseStreamingFunctionHandlers\ResponseStreamingFunctionHandlers.csproj", "{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -941,6 +943,18 @@ Global
{3BFA4B73-BA61-4578-833B-C5B3A16EDA9E}.Release|x64.Build.0 = Release|Any CPU
{3BFA4B73-BA61-4578-833B-C5B3A16EDA9E}.Release|x86.ActiveCfg = Release|Any CPU
{3BFA4B73-BA61-4578-833B-C5B3A16EDA9E}.Release|x86.Build.0 = Release|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Debug|x64.ActiveCfg = Debug|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Debug|x64.Build.0 = Debug|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Debug|x86.ActiveCfg = Debug|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Debug|x86.Build.0 = Debug|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Release|Any CPU.Build.0 = Release|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Release|x64.ActiveCfg = Release|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Release|x64.Build.0 = Release|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Release|x86.ActiveCfg = Release|Any CPU
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1015,6 +1029,7 @@ Global
{8D03BDF3-7078-4B46-A3F1-C73BE6D6CE0D} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
{8EEDD576-7FC4-4FAC-A5A2-F58562753A53} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
{3BFA4B73-BA61-4578-833B-C5B3A16EDA9E} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9} = {B5BD0336-7D08-492C-8489-42C987E29B39}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {503678A4-B8D1-4486-8915-405A3E9CF0EB}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
#if NET8_0_OR_GREATER
using System.Collections.Generic;
using System.Net;
using System.Runtime.Versioning;
using System.Text.Json;

namespace Amazon.Lambda.Core.ResponseStreaming
{
/// <summary>
/// The HTTP response prelude to be sent as the first chunk of a streaming response when using <see cref="LambdaResponseStreamFactory.CreateHttpStream"/>.
/// </summary>
[RequiresPreviewFeatures(LambdaResponseStreamFactory.PreviewMessage)]
public class HttpResponseStreamPrelude
{
/// <summary>
/// The Http status code to include in the response prelude.
/// </summary>
public HttpStatusCode? StatusCode { get; set; }

/// <summary>
/// The response headers to include in the response prelude. This collection supports setting single value for the same headers.
/// </summary>
public IDictionary<string, string> Headers { get; set; } = new Dictionary<string, string>();

/// <summary>
/// The response headers to include in the response prelude. This collection supports setting multiple values for the same headers.
/// </summary>
public IDictionary<string, IList<string>> MultiValueHeaders { get; set; } = new Dictionary<string, IList<string>>();

/// <summary>
/// The list of cookies to include in the response prelude. This is used for Lambda Function URL responses, which support a separate "cookies" field in the response JSON for setting cookies, rather than requiring cookies to be set via the "Set-Cookie" header.
/// </summary>
public IList<string> Cookies { get; set; } = new List<string>();

internal byte[] ToByteArray()
{
var bufferWriter = new System.Buffers.ArrayBufferWriter<byte>();
using (var writer = new Utf8JsonWriter(bufferWriter))
{
writer.WriteStartObject();

if (StatusCode.HasValue)
writer.WriteNumber("statusCode", (int)StatusCode);

if (Headers?.Count > 0)
{
writer.WriteStartObject("headers");
foreach (var header in Headers)
{
writer.WriteString(header.Key, header.Value);
}
writer.WriteEndObject();
}

if (MultiValueHeaders?.Count > 0)
{
writer.WriteStartObject("multiValueHeaders");
foreach (var header in MultiValueHeaders)
{
writer.WriteStartArray(header.Key);
foreach (var value in header.Value)
{
writer.WriteStringValue(value);
}
writer.WriteEndArray();
}
writer.WriteEndObject();
}

if (Cookies?.Count > 0)
{
writer.WriteStartArray("cookies");
foreach (var cookie in Cookies)
{
writer.WriteStringValue(cookie);
}
writer.WriteEndArray();
}

writer.WriteEndObject();
}

return bufferWriter.WrittenSpan.ToArray();
}
}
}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
#if NET8_0_OR_GREATER
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Amazon.Lambda.Core.ResponseStreaming
{
/// <summary>
/// Interface for writing streaming responses in AWS Lambda functions.
/// Obtained by calling <see cref="LambdaResponseStreamFactory.CreateStream"/> within a handler.
/// </summary>
internal interface ILambdaResponseStream : IDisposable
{
/// <summary>
/// Asynchronously writes a portion of a byte array to the response stream.
/// </summary>
/// <param name="buffer">The byte array containing data to write.</param>
/// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes.</param>
/// <param name="count">The number of bytes to write.</param>
/// <param name="cancellationToken">Optional cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default);


/// <summary>
/// Gets the total number of bytes written to the stream so far.
/// </summary>
long BytesWritten { get; }


/// <summary>
/// Gets whether an error has been reported.
/// </summary>
bool HasError { get; }
}
}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
#if NET8_0_OR_GREATER

using System;
using System.IO;
using System.Runtime.Versioning;
using System.Threading;
using System.Threading.Tasks;

namespace Amazon.Lambda.Core.ResponseStreaming
{
/// <summary>
/// A write-only, non-seekable <see cref="Stream"/> subclass that streams response data
/// to the Lambda Runtime API. Returned by <see cref="LambdaResponseStreamFactory.CreateStream"/>.
/// Integrates with standard .NET stream consumers such as <see cref="System.IO.StreamWriter"/>.
/// </summary>
[RequiresPreviewFeatures(LambdaResponseStreamFactory.PreviewMessage)]
public class LambdaResponseStream : Stream
{
private readonly ILambdaResponseStream _responseStream;

internal LambdaResponseStream(ILambdaResponseStream responseStream)
{
_responseStream = responseStream;
}

/// <summary>
/// The number of bytes written to the Lambda response stream so far.
/// </summary>
public long BytesWritten => _responseStream.BytesWritten;

/// <summary>
/// Asynchronously writes a byte array to the response stream.
/// </summary>
/// <param name="buffer">The byte array to write.</param>
/// <param name="cancellationToken">Optional cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
public async Task WriteAsync(byte[] buffer, CancellationToken cancellationToken = default)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));

await WriteAsync(buffer, 0, buffer.Length, cancellationToken);
}

/// <summary>
/// Asynchronously writes a portion of a byte array to the response stream.
/// </summary>
/// <param name="buffer">The byte array containing data to write.</param>
/// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes.</param>
/// <param name="count">The number of bytes to write.</param>
/// <param name="cancellationToken">Optional cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
/// <exception cref="InvalidOperationException">Thrown if the stream is already completed or an error has been reported.</exception>
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
{
await _responseStream.WriteAsync(buffer, offset, count, cancellationToken);
}

#region Noop Overrides

/// <summary>Gets a value indicating whether the stream supports reading. Always <c>false</c>.</summary>
public override bool CanRead => false;

/// <summary>Gets a value indicating whether the stream supports seeking. Always <c>false</c>.</summary>
public override bool CanSeek => false;

/// <summary>Gets a value indicating whether the stream supports writing. Always <c>true</c>.</summary>
public override bool CanWrite => true;

/// <summary>
/// Gets the total number of bytes written to the stream so far.
/// Equivalent to <see cref="BytesWritten"/>.
/// </summary>
public override long Length => BytesWritten;

/// <summary>
/// Getting or setting the position is not supported.
/// </summary>
/// <exception cref="NotSupportedException">Always thrown.</exception>
public override long Position
{
get => throw new NotSupportedException("LambdaResponseStream does not support seeking.");
set => throw new NotSupportedException("LambdaResponseStream does not support seeking.");
}

/// <summary>Not supported.</summary>
/// <exception cref="NotImplementedException">Always thrown.</exception>
public override long Seek(long offset, SeekOrigin origin)
=> throw new NotImplementedException("LambdaResponseStream does not support seeking.");

/// <summary>Not supported.</summary>
/// <exception cref="NotImplementedException">Always thrown.</exception>
public override int Read(byte[] buffer, int offset, int count)
=> throw new NotImplementedException("LambdaResponseStream does not support reading.");

/// <summary>Not supported.</summary>
/// <exception cref="NotImplementedException">Always thrown.</exception>
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> throw new NotImplementedException("LambdaResponseStream does not support reading.");

/// <summary>
/// Writes a sequence of bytes to the stream. Delegates to the async path synchronously.
/// Prefer <see cref="WriteAsync(byte[], int, int, CancellationToken)"/> to avoid blocking.
/// </summary>
public override void Write(byte[] buffer, int offset, int count)
=> WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();

/// <summary>
/// Flush is a no-op; data is sent to the Runtime API immediately on each write.
/// </summary>
public override void Flush() { }

/// <summary>Not supported.</summary>
/// <exception cref="NotSupportedException">Always thrown.</exception>
public override void SetLength(long value)
=> throw new NotSupportedException("LambdaResponseStream does not support SetLength.");
#endregion
}
}
#endif
Loading
Loading