From aa7534e323f5dedfd73ee03e0f6455cc77d8fa18 Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Wed, 25 Mar 2026 20:48:02 +1100 Subject: [PATCH 01/13] . --- .../Protocol/MessageExchangeStreamFixture.cs | 238 ++++++++++++++++++ .../Protocol/MessageExchangeStream.cs | 9 +- 2 files changed, 246 insertions(+), 1 deletion(-) create mode 100644 source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs diff --git a/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs b/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs new file mode 100644 index 000000000..e0aa26272 --- /dev/null +++ b/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs @@ -0,0 +1,238 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Halibut.Diagnostics; +using Halibut.Tests.Support; +using Halibut.Tests.Support.Logging; +using Halibut.Transport; +using Halibut.Transport.Observability; +using Halibut.Transport.Protocol; +using NSubstitute; +using NUnit.Framework; + +namespace Halibut.Tests.Transport.Protocol +{ + public class MessageExchangeStreamFixture : BaseTest + { + [Test] + public async Task ShouldLogErrorWhenDataStreamSendsSizeMismatchedData() + { + var inMemoryLog = new InMemoryLogWriter(); + var memoryStream = new MemoryStream(); + + var serializer = new MessageSerializerBuilder(new LogFactory()).Build(); + + var messageExchangeStream = new MessageExchangeStream( + memoryStream, + serializer, + new NoOpControlMessageObserver(), + new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), + inMemoryLog); + + var actualData = new byte[100]; + new Random().NextBytes(actualData); + + var maliciousDataStream = new DataStream(10, async (stream, ct) => + { + await stream.WriteAsync(actualData, 0, actualData.Length, ct); + }); + + var requestMessage = new RequestMessage + { + Destination = new ServiceEndPoint(new Uri("https://example.com"), "ABC123", new HalibutTimeoutsAndLimitsForTestsBuilder().Build()), + MethodName = "Test", + ServiceName = "TestService", + Params = new object[] { maliciousDataStream } + }; + + await messageExchangeStream.SendAsync(requestMessage, CancellationToken.None); + + var logs = inMemoryLog.GetLogs(); + logs.Should().Contain(log => + log.Type == EventType.Error && + log.FormattedMessage.Contains("Data stream size mismatch detected during send") && + log.FormattedMessage.Contains("Declared length: 10") && + log.FormattedMessage.Contains("Actual bytes written: 100")); + } + + [Test] + public async Task ShouldLogErrorWhenDataStreamReceivesSizeMismatchedData() + { + var inMemoryLog = new InMemoryLogWriter(); + + var serializer = new MessageSerializerBuilder(new LogFactory()).Build(); + + var streamData = new MemoryStream(); + var writer = new BinaryWriter(streamData); + + var dataStreamId = Guid.NewGuid(); + writer.Write(dataStreamId.ToByteArray()); + writer.Write((long)10); + + var actualData = new byte[10]; + new Random().NextBytes(actualData); + writer.Write(actualData); + + writer.Write((long)100); + + streamData.Position = 0; + + var messageExchangeStream = new MessageExchangeStream( + streamData, + serializer, + new NoOpControlMessageObserver(), + new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), + inMemoryLog); + + var deserializedDataStream = new DataStream(); + typeof(DataStream).GetProperty("Id")!.SetValue(deserializedDataStream, dataStreamId); + + Func act = async () => + { + var method = typeof(MessageExchangeStream).GetMethod("ReadStreamAsync", + System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); + await (Task)method!.Invoke(messageExchangeStream, + new object[] { new[] { deserializedDataStream }, CancellationToken.None })!; + }; + + await act.Should().ThrowAsync() + .WithMessage("*length of the file was expected to be: 10*"); + + var logs = inMemoryLog.GetLogs(); + logs.Should().Contain(log => + log.Type == EventType.Error && + log.FormattedMessage.Contains("Data stream size mismatch detected") && + log.FormattedMessage.Contains("Expected length: 10") && + log.FormattedMessage.Contains("Actual length claimed at end: 100")); + } + + [Test] + public async Task ShouldLogErrorWhenDataStreamSendsTooFewBytes() + { + var inMemoryLog = new InMemoryLogWriter(); + var memoryStream = new MemoryStream(); + + var serializer = new MessageSerializerBuilder(new LogFactory()).Build(); + + var messageExchangeStream = new MessageExchangeStream( + memoryStream, + serializer, + new NoOpControlMessageObserver(), + new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), + inMemoryLog); + + var actualData = new byte[10]; + new Random().NextBytes(actualData); + + var underSizedDataStream = new DataStream(100, async (stream, ct) => + { + await stream.WriteAsync(actualData, 0, actualData.Length, ct); + }); + + var requestMessage = new RequestMessage + { + Destination = new ServiceEndPoint(new Uri("https://example.com"), "ABC123", new HalibutTimeoutsAndLimitsForTestsBuilder().Build()), + MethodName = "Test", + ServiceName = "TestService", + Params = new object[] { underSizedDataStream } + }; + + await messageExchangeStream.SendAsync(requestMessage, CancellationToken.None); + + var logs = inMemoryLog.GetLogs(); + logs.Should().Contain(log => + log.Type == EventType.Error && + log.FormattedMessage.Contains("Data stream size mismatch detected during send") && + log.FormattedMessage.Contains("Declared length: 100") && + log.FormattedMessage.Contains("Actual bytes written: 10")); + } + + [Test] + public async Task ShouldLogErrorWhenDataStreamReceivesTooFewBytes() + { + var inMemoryLog = new InMemoryLogWriter(); + + var serializer = new MessageSerializerBuilder(new LogFactory()).Build(); + + var streamData = new MemoryStream(); + var writer = new BinaryWriter(streamData); + + var dataStreamId = Guid.NewGuid(); + writer.Write(dataStreamId.ToByteArray()); + writer.Write((long)100); + + var actualData = new byte[10]; + new Random().NextBytes(actualData); + writer.Write(actualData); + + streamData.Position = 0; + + var messageExchangeStream = new MessageExchangeStream( + streamData, + serializer, + new NoOpControlMessageObserver(), + new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), + inMemoryLog); + + var deserializedDataStream = new DataStream(); + typeof(DataStream).GetProperty("Id")!.SetValue(deserializedDataStream, dataStreamId); + + Func act = async () => + { + var method = typeof(MessageExchangeStream).GetMethod("ReadStreamAsync", + System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); + await (Task)method!.Invoke(messageExchangeStream, + new object[] { new[] { deserializedDataStream }, CancellationToken.None })!; + }; + + await act.Should().ThrowAsync() + .WithMessage("*Stream with length 100 was closed after only reading 10 bytes*"); + + var logs = inMemoryLog.GetLogs(); + logs.Should().NotContain(log => + log.Type == EventType.Error && + log.FormattedMessage.Contains("Data stream size mismatch")); + } + + [Test] + public async Task ShouldNotLogErrorWhenDataStreamSendsCorrectSize() + { + var inMemoryLog = new InMemoryLogWriter(); + var memoryStream = new MemoryStream(); + + var serializer = new MessageSerializerBuilder(new LogFactory()).Build(); + + var messageExchangeStream = new MessageExchangeStream( + memoryStream, + serializer, + new NoOpControlMessageObserver(), + new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), + inMemoryLog); + + var actualData = new byte[10]; + new Random().NextBytes(actualData); + + var correctDataStream = new DataStream(10, async (stream, ct) => + { + await stream.WriteAsync(actualData, 0, actualData.Length, ct); + }); + + var requestMessage = new RequestMessage + { + Destination = new ServiceEndPoint(new Uri("https://example.com"), "ABC123", new HalibutTimeoutsAndLimitsForTestsBuilder().Build()), + MethodName = "Test", + ServiceName = "TestService", + Params = new object[] { correctDataStream } + }; + + await messageExchangeStream.SendAsync(requestMessage, CancellationToken.None); + + var logs = inMemoryLog.GetLogs(); + logs.Should().NotContain(log => + log.Type == EventType.Error && + log.FormattedMessage.Contains("Data stream size mismatch")); + } + } +} diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index 334462a4c..f11a46795 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -276,6 +276,7 @@ async Task ReadStreamAsync(IReadOnlyList deserializedStreams, Cancel var lengthAgain = await stream.ReadInt64Async(cancellationToken); if (lengthAgain != length) { + log.Write(EventType.Error, "Data stream size mismatch detected. Stream ID: {0}, Expected length: {1}, Actual length claimed at end: {2}", id, length, lengthAgain); throw new ProtocolException("There was a problem receiving a file stream: the length of the file was expected to be: " + length + " but less data was actually sent. This can happen if the remote party is sending a stream but the stream had already been partially read, or if the stream was being reused between calls."); } @@ -323,9 +324,15 @@ async Task WriteEachStreamAsync(IEnumerable streams, CancellationTok await stream.WriteLongAsync(dataStream.Length, cancellationToken); await stream.FlushAsync(cancellationToken); - await ((IDataStreamInternal)dataStream).TransmitAsync(stream, cancellationToken); + var byteCountingStream = new ByteCountingStream(stream, OnDispose.LeaveInputStreamOpen); + await ((IDataStreamInternal)dataStream).TransmitAsync(byteCountingStream, cancellationToken); await stream.FlushAsync(cancellationToken); + if (byteCountingStream.BytesWritten != dataStream.Length) + { + log.Write(EventType.Error, "Data stream size mismatch detected during send. Stream ID: {0}, Declared length: {1}, Actual bytes written: {2}", dataStream.Id, dataStream.Length, byteCountingStream.BytesWritten); + } + await stream.WriteLongAsync(dataStream.Length, cancellationToken); await stream.FlushAsync(cancellationToken); } From 75d06943a4059d210f5d264404e429281d3c9219 Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Wed, 25 Mar 2026 20:57:54 +1100 Subject: [PATCH 02/13] . --- source/Halibut.Tests/DataStreamFixture.cs | 77 ++++++++++++++++++- .../Protocol/MessageExchangeStreamFixture.cs | 7 +- .../Protocol/MessageExchangeStream.cs | 15 +++- 3 files changed, 95 insertions(+), 4 deletions(-) diff --git a/source/Halibut.Tests/DataStreamFixture.cs b/source/Halibut.Tests/DataStreamFixture.cs index f7af6b13a..0924dbf23 100644 --- a/source/Halibut.Tests/DataStreamFixture.cs +++ b/source/Halibut.Tests/DataStreamFixture.cs @@ -1,7 +1,9 @@ -using System; +using System; using System.IO; +using System.Linq; using System.Threading.Tasks; using FluentAssertions; +using Halibut.Diagnostics; using Halibut.Tests.Support; using Halibut.Tests.Support.TestAttributes; using Halibut.Tests.Support.TestCases; @@ -50,5 +52,78 @@ public async Task ASyncDataStreamWriter_CanBeUsedInAsync() await ((IDataStreamInternal) ds).TransmitAsync(memoryStream, CancellationToken); memoryStream.ToArray().Should().BeEquivalentTo(data); } + + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] + public async Task EndToEnd_ShouldLogErrorWhenDataStreamSendsTooManyBytes(ClientAndServiceTestCase clientAndServiceTestCase) + { + await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .WithStandardServices() + .AsLatestClientAndLatestServiceBuilder() + .RecordingClientLogs(out var clientLogs) + .RecordingServiceLogs(out var serviceLogs) + .Build(CancellationToken)) + { + var readDataStreamService = clientAndService.CreateAsyncClient(); + + var actualData = new byte[100]; + new Random().NextBytes(actualData); + + var maliciousDataStream = new DataStream(10, async (stream, ct) => + { + await stream.WriteAsync(actualData, 0, actualData.Length, ct); + }); + + await AssertException.Throws(async () => + await readDataStreamService.SendDataAsync(maliciousDataStream)); + + var allClientLogs = clientLogs.Values.SelectMany(log => log.GetLogs()).ToList(); + allClientLogs.Should().Contain(log => + log.Type == EventType.Error && + log.FormattedMessage.Contains("Data stream size mismatch detected during send") && + log.FormattedMessage.Contains("Declared length: 10") && + log.FormattedMessage.Contains("Actual bytes written: 100")); + } + } + + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] + public async Task EndToEnd_ShouldLogErrorWhenDataStreamSendsTooFewBytes(ClientAndServiceTestCase clientAndServiceTestCase) + { + await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .WithStandardServices() + .AsLatestClientAndLatestServiceBuilder() + .RecordingClientLogs(out var clientLogs) + .RecordingServiceLogs(out var serviceLogs) + .Build(CancellationToken)) + { + var readDataStreamService = clientAndService.CreateAsyncClient(); + + var actualData = new byte[10]; + new Random().NextBytes(actualData); + + var underSizedDataStream = new DataStream(100, async (stream, ct) => + { + await stream.WriteAsync(actualData, 0, actualData.Length, ct); + }); + + await AssertException.Throws(async () => + await readDataStreamService.SendDataAsync(underSizedDataStream)); + + var allClientLogs = clientLogs.Values.SelectMany(log => log.GetLogs()).ToList(); + allClientLogs.Should().Contain(log => + log.Type == EventType.Error && + log.FormattedMessage.Contains("Data stream size mismatch detected during send") && + log.FormattedMessage.Contains("Declared length: 100") && + log.FormattedMessage.Contains("Actual bytes written: 10")); + + var allServiceLogs = serviceLogs.Values.SelectMany(log => log.GetLogs()).ToList(); + allServiceLogs.Should().Contain(log => + log.Type == EventType.Error && + log.FormattedMessage.Contains("Data stream reading failed") && + log.FormattedMessage.Contains("Expected length: 100") && + log.FormattedMessage.Contains("Actual bytes read before stream closed: 10")); + } + } } } \ No newline at end of file diff --git a/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs b/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs index e0aa26272..225c944cb 100644 --- a/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs @@ -191,9 +191,12 @@ await act.Should().ThrowAsync() .WithMessage("*Stream with length 100 was closed after only reading 10 bytes*"); var logs = inMemoryLog.GetLogs(); - logs.Should().NotContain(log => + logs.Should().Contain(log => log.Type == EventType.Error && - log.FormattedMessage.Contains("Data stream size mismatch")); + log.FormattedMessage.Contains("Data stream reading failed") && + log.FormattedMessage.Contains($"Stream ID: {dataStreamId}") && + log.FormattedMessage.Contains("Expected length: 100") && + log.FormattedMessage.Contains("Actual bytes read before stream closed: 10")); } [Test] diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index f11a46795..46836eedd 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -272,7 +272,20 @@ async Task ReadStreamAsync(IReadOnlyList deserializedStreams, Cancel var id = new Guid(await stream.ReadBytesAsync(16, cancellationToken)); var length = await stream.ReadInt64Async(cancellationToken); var dataStream = FindStreamById(deserializedStreams, id); - var tempFile = await CopyStreamToFileAsync(id, length, stream, cancellationToken); + + TemporaryFileStream tempFile; + try + { + tempFile = await CopyStreamToFileAsync(id, length, stream, cancellationToken); + } + catch (ProtocolException ex) when (ex.Message.Contains("was closed after only reading")) + { + var bytesReadMatch = System.Text.RegularExpressions.Regex.Match(ex.Message, @"closed after only reading (\d+) bytes"); + var bytesRead = bytesReadMatch.Success ? long.Parse(bytesReadMatch.Groups[1].Value) : -1; + log.Write(EventType.Error, "Data stream reading failed. Stream ID: {0}, Expected length: {1}, Actual bytes read before stream closed: {2}", id, length, bytesRead); + throw; + } + var lengthAgain = await stream.ReadInt64Async(cancellationToken); if (lengthAgain != length) { From dbba0e29bbae971f6b5b1adba49b69640c219a21 Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Wed, 25 Mar 2026 21:10:10 +1100 Subject: [PATCH 03/13] . --- source/Halibut.Tests/DataStreamFixture.cs | 7 ------- .../Transport/Protocol/MessageExchangeStreamFixture.cs | 2 +- source/Halibut/Transport/Protocol/MessageExchangeStream.cs | 4 ++-- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/source/Halibut.Tests/DataStreamFixture.cs b/source/Halibut.Tests/DataStreamFixture.cs index 0924dbf23..694eee10c 100644 --- a/source/Halibut.Tests/DataStreamFixture.cs +++ b/source/Halibut.Tests/DataStreamFixture.cs @@ -116,13 +116,6 @@ await AssertException.Throws(async () => log.FormattedMessage.Contains("Data stream size mismatch detected during send") && log.FormattedMessage.Contains("Declared length: 100") && log.FormattedMessage.Contains("Actual bytes written: 10")); - - var allServiceLogs = serviceLogs.Values.SelectMany(log => log.GetLogs()).ToList(); - allServiceLogs.Should().Contain(log => - log.Type == EventType.Error && - log.FormattedMessage.Contains("Data stream reading failed") && - log.FormattedMessage.Contains("Expected length: 100") && - log.FormattedMessage.Contains("Actual bytes read before stream closed: 10")); } } } diff --git a/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs b/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs index 225c944cb..bc0f3ebcd 100644 --- a/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs @@ -196,7 +196,7 @@ await act.Should().ThrowAsync() log.FormattedMessage.Contains("Data stream reading failed") && log.FormattedMessage.Contains($"Stream ID: {dataStreamId}") && log.FormattedMessage.Contains("Expected length: 100") && - log.FormattedMessage.Contains("Actual bytes read before stream closed: 10")); + log.FormattedMessage.Contains("Actual bytes read: 10")); } [Test] diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index 46836eedd..b1ddf46ad 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -278,11 +278,11 @@ async Task ReadStreamAsync(IReadOnlyList deserializedStreams, Cancel { tempFile = await CopyStreamToFileAsync(id, length, stream, cancellationToken); } - catch (ProtocolException ex) when (ex.Message.Contains("was closed after only reading")) + catch (Exception ex) { var bytesReadMatch = System.Text.RegularExpressions.Regex.Match(ex.Message, @"closed after only reading (\d+) bytes"); var bytesRead = bytesReadMatch.Success ? long.Parse(bytesReadMatch.Groups[1].Value) : -1; - log.Write(EventType.Error, "Data stream reading failed. Stream ID: {0}, Expected length: {1}, Actual bytes read before stream closed: {2}", id, length, bytesRead); + log.Write(EventType.Error, "Data stream reading failed. Stream ID: {0}, Expected length: {1}, Actual bytes read: {2}, Exception: {3}", id, length, bytesRead, ex.Message); throw; } From 55f408ca07ed38794c9e0c14465f639943c4f801 Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Wed, 25 Mar 2026 21:19:18 +1100 Subject: [PATCH 04/13] . --- source/Halibut.Tests/DataStreamFixture.cs | 76 +++++++++++++++++++ .../Protocol/MessageExchangeStreamFixture.cs | 44 +++++++++++ .../Diagnostics/HalibutTimeoutsAndLimits.cs | 6 ++ .../Protocol/MessageExchangeStream.cs | 5 ++ 4 files changed, 131 insertions(+) diff --git a/source/Halibut.Tests/DataStreamFixture.cs b/source/Halibut.Tests/DataStreamFixture.cs index 694eee10c..3bf521290 100644 --- a/source/Halibut.Tests/DataStreamFixture.cs +++ b/source/Halibut.Tests/DataStreamFixture.cs @@ -118,5 +118,81 @@ await AssertException.Throws(async () => log.FormattedMessage.Contains("Actual bytes written: 10")); } } + + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] + public async Task EndToEnd_ShouldThrowWhenConfiguredAndDataStreamSendsTooManyBytes(ClientAndServiceTestCase clientAndServiceTestCase) + { + await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .WithStandardServices() + .AsLatestClientAndLatestServiceBuilder() + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimits + { + ThrowOnDataStreamSizeMismatch = true + }) + .RecordingClientLogs(out var clientLogs) + .Build(CancellationToken)) + { + var readDataStreamService = clientAndService.CreateAsyncClient(); + + var actualData = new byte[100]; + new Random().NextBytes(actualData); + + var maliciousDataStream = new DataStream(10, async (stream, ct) => + { + await stream.WriteAsync(actualData, 0, actualData.Length, ct); + }); + + var exception = await AssertException.Throws(async () => + await readDataStreamService.SendDataAsync(maliciousDataStream)); + + exception.And.Message.Should().Contain("Data stream size mismatch"); + + var allClientLogs = clientLogs.Values.SelectMany(log => log.GetLogs()).ToList(); + allClientLogs.Should().Contain(log => + log.Type == EventType.Error && + log.FormattedMessage.Contains("Data stream size mismatch detected during send") && + log.FormattedMessage.Contains("Declared length: 10") && + log.FormattedMessage.Contains("Actual bytes written: 100")); + } + } + + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] + public async Task EndToEnd_ShouldThrowWhenConfiguredAndDataStreamSendsTooFewBytes(ClientAndServiceTestCase clientAndServiceTestCase) + { + await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .WithStandardServices() + .AsLatestClientAndLatestServiceBuilder() + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimits + { + ThrowOnDataStreamSizeMismatch = true + }) + .RecordingClientLogs(out var clientLogs) + .Build(CancellationToken)) + { + var readDataStreamService = clientAndService.CreateAsyncClient(); + + var actualData = new byte[10]; + new Random().NextBytes(actualData); + + var underSizedDataStream = new DataStream(100, async (stream, ct) => + { + await stream.WriteAsync(actualData, 0, actualData.Length, ct); + }); + + var exception = await AssertException.Throws(async () => + await readDataStreamService.SendDataAsync(underSizedDataStream)); + + exception.And.Message.Should().Contain("Data stream size mismatch"); + + var allClientLogs = clientLogs.Values.SelectMany(log => log.GetLogs()).ToList(); + allClientLogs.Should().Contain(log => + log.Type == EventType.Error && + log.FormattedMessage.Contains("Data stream size mismatch detected during send") && + log.FormattedMessage.Contains("Declared length: 100") && + log.FormattedMessage.Contains("Actual bytes written: 10")); + } + } } } \ No newline at end of file diff --git a/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs b/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs index bc0f3ebcd..431043f37 100644 --- a/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs @@ -199,6 +199,50 @@ await act.Should().ThrowAsync() log.FormattedMessage.Contains("Actual bytes read: 10")); } + [Test] + public async Task ShouldThrowExceptionWhenConfiguredAndDataStreamSendsSizeMismatch() + { + var inMemoryLog = new InMemoryLogWriter(); + var memoryStream = new MemoryStream(); + + var serializer = new MessageSerializerBuilder(new LogFactory()).Build(); + var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); + limits.ThrowOnDataStreamSizeMismatch = true; + + var messageExchangeStream = new MessageExchangeStream( + memoryStream, + serializer, + new NoOpControlMessageObserver(), + limits, + inMemoryLog); + + var actualData = new byte[100]; + new Random().NextBytes(actualData); + + var maliciousDataStream = new DataStream(10, async (stream, ct) => + { + await stream.WriteAsync(actualData, 0, actualData.Length, ct); + }); + + var requestMessage = new RequestMessage + { + Destination = new ServiceEndPoint(new Uri("https://example.com"), "ABC123", new HalibutTimeoutsAndLimitsForTestsBuilder().Build()), + MethodName = "Test", + ServiceName = "TestService", + Params = new object[] { maliciousDataStream } + }; + + Func act = async () => await messageExchangeStream.SendAsync(requestMessage, CancellationToken.None); + + await act.Should().ThrowAsync() + .WithMessage("*Data stream size mismatch: Stream * declared length 10 but actually wrote 100 bytes*"); + + var logs = inMemoryLog.GetLogs(); + logs.Should().Contain(log => + log.Type == EventType.Error && + log.FormattedMessage.Contains("Data stream size mismatch detected during send")); + } + [Test] public async Task ShouldNotLogErrorWhenDataStreamSendsCorrectSize() { diff --git a/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs b/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs index 80d99653a..ab9861da0 100644 --- a/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs +++ b/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs @@ -171,6 +171,12 @@ public TimeSpan SafeTcpClientPooledConnectionTimeout /// public bool TcpNoDelay { get; set; } + /// + /// When true, an exception will be thrown if a data stream's actual size doesn't match the declared size. + /// When false (default), size mismatches are only logged at Error level. + /// + public bool ThrowOnDataStreamSizeMismatch { get; set; } = false; + /// /// In the future these will become the default /// diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index b1ddf46ad..0a2117930 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -344,6 +344,11 @@ async Task WriteEachStreamAsync(IEnumerable streams, CancellationTok if (byteCountingStream.BytesWritten != dataStream.Length) { log.Write(EventType.Error, "Data stream size mismatch detected during send. Stream ID: {0}, Declared length: {1}, Actual bytes written: {2}", dataStream.Id, dataStream.Length, byteCountingStream.BytesWritten); + + if (halibutTimeoutsAndLimits.ThrowOnDataStreamSizeMismatch) + { + throw new ProtocolException($"Data stream size mismatch: Stream {dataStream.Id} declared length {dataStream.Length} but actually wrote {byteCountingStream.BytesWritten} bytes."); + } } await stream.WriteLongAsync(dataStream.Length, cancellationToken); From e13d9aba74ff808e8ab0232dec3e707213073e55 Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Wed, 25 Mar 2026 21:30:19 +1100 Subject: [PATCH 05/13] . --- source/Halibut.Tests/DataStreamFixture.cs | 59 +++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/source/Halibut.Tests/DataStreamFixture.cs b/source/Halibut.Tests/DataStreamFixture.cs index 3bf521290..6281ba14f 100644 --- a/source/Halibut.Tests/DataStreamFixture.cs +++ b/source/Halibut.Tests/DataStreamFixture.cs @@ -194,5 +194,64 @@ public async Task EndToEnd_ShouldThrowWhenConfiguredAndDataStreamSendsTooFewByte log.FormattedMessage.Contains("Actual bytes written: 10")); } } + + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] + public async Task EndToEnd_WithThrowOnMismatch_SecondRequestSucceedsQuicklyAfterFirstFails(ClientAndServiceTestCase clientAndServiceTestCase) + { + await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .WithStandardServices() + .AsLatestClientAndLatestServiceBuilder() + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimits + { + ThrowOnDataStreamSizeMismatch = true, + TcpClientReceiveResponseTimeout = TimeSpan.FromSeconds(60) + }) + .RecordingClientLogs(out var clientLogs) + .Build(CancellationToken)) + { + var readDataStreamService = clientAndService.CreateAsyncClient(); + + var underSizedData = new byte[10]; + new Random().NextBytes(underSizedData); + + var underSizedDataStream = new DataStream(100, async (stream, ct) => + { + await stream.WriteAsync(underSizedData, 0, underSizedData.Length, ct); + }); + + var stopwatch = System.Diagnostics.Stopwatch.StartNew(); + + await AssertException.Throws(async () => + await readDataStreamService.SendDataAsync(underSizedDataStream)); + + var firstRequestTime = stopwatch.Elapsed; + firstRequestTime.Should().BeLessThan(TimeSpan.FromSeconds(10), + "first request should fail quickly due to ThrowOnDataStreamSizeMismatch"); + + stopwatch.Restart(); + + var correctData = new byte[50]; + new Random().NextBytes(correctData); + var correctDataStream = new DataStream(50, async (stream, ct) => + { + await stream.WriteAsync(correctData, 0, correctData.Length, ct); + }); + + var received = await readDataStreamService.SendDataAsync(correctDataStream); + + var secondRequestTime = stopwatch.Elapsed; + received.Should().Be(50); + secondRequestTime.Should().BeLessThan(TimeSpan.FromSeconds(10), + "second request with correct stream should complete quickly (much faster than 60s timeout)"); + + var allClientLogs = clientLogs.Values.SelectMany(log => log.GetLogs()).ToList(); + allClientLogs.Should().Contain(log => + log.Type == EventType.Error && + log.FormattedMessage.Contains("Data stream size mismatch detected during send") && + log.FormattedMessage.Contains("Declared length: 100") && + log.FormattedMessage.Contains("Actual bytes written: 10")); + } + } } } \ No newline at end of file From 07cd30b09fd9b01fccc94806fde513b317601f2f Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Wed, 25 Mar 2026 21:42:54 +1100 Subject: [PATCH 06/13] . --- .../Protocol/MessageExchangeStream.cs | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index 0a2117930..93da6a8e0 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -273,18 +273,7 @@ async Task ReadStreamAsync(IReadOnlyList deserializedStreams, Cancel var length = await stream.ReadInt64Async(cancellationToken); var dataStream = FindStreamById(deserializedStreams, id); - TemporaryFileStream tempFile; - try - { - tempFile = await CopyStreamToFileAsync(id, length, stream, cancellationToken); - } - catch (Exception ex) - { - var bytesReadMatch = System.Text.RegularExpressions.Regex.Match(ex.Message, @"closed after only reading (\d+) bytes"); - var bytesRead = bytesReadMatch.Success ? long.Parse(bytesReadMatch.Groups[1].Value) : -1; - log.Write(EventType.Error, "Data stream reading failed. Stream ID: {0}, Expected length: {1}, Actual bytes read: {2}, Exception: {3}", id, length, bytesRead, ex.Message); - throw; - } + var tempFile = await CopyStreamToFileAsync(id, length, stream, cancellationToken); var lengthAgain = await stream.ReadInt64Async(cancellationToken); if (lengthAgain != length) @@ -308,8 +297,24 @@ async Task CopyStreamToFileAsync(Guid id, long length, Stre var buffer = new byte[65*1024]; while (bytesLeftToRead > 0) { - var read = await stream.ReadAsync(buffer, 0, (int)Math.Min(buffer.Length, bytesLeftToRead), cancellationToken); - if (read == 0) throw new ProtocolException($"Stream with length {length} was closed after only reading {length - bytesLeftToRead} bytes."); + int read; + try + { + read = await stream.ReadAsync(buffer, 0, (int)Math.Min(buffer.Length, bytesLeftToRead), cancellationToken); + } + catch (Exception ex) + { + log.Write(EventType.Error, "Data stream reading failed. Stream ID: {0}, Expected length: {1}, Actual bytes read: {2}, Exception: {3}", id, length, length - bytesLeftToRead, ex.Message); + throw; + } + + if (read == 0) + { + var bytesRead = length - bytesLeftToRead; + log.Write(EventType.Error, "Data stream reading failed. Stream ID: {0}, Expected length: {1}, Actual bytes read: {2}, Exception: {3}", id, length, bytesRead, $"Stream with length {length} was closed after only reading {bytesRead} bytes."); + throw new ProtocolException($"Stream with length {length} was closed after only reading {bytesRead} bytes."); + } + bytesLeftToRead -= read; await fileStream.WriteAsync(buffer, 0, read, cancellationToken); } From f70ce157ac530e31e22d4af87c01e5afdf7ada5e Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Thu, 26 Mar 2026 10:12:53 +1100 Subject: [PATCH 07/13] . --- source/Halibut.Tests/DataStreamFixture.cs | 63 +++++++++---------- .../Protocol/MessageExchangeStreamFixture.cs | 37 ++++++++--- .../Diagnostics/HalibutTimeoutsAndLimits.cs | 6 +- .../Transport/Protocol/IHalibutMessage.cs | 7 +++ .../Protocol/MessageExchangeStream.cs | 60 +++++++++++++----- .../Transport/Protocol/RequestMessage.cs | 2 +- .../Transport/Protocol/ResponseMessage.cs | 2 +- 7 files changed, 111 insertions(+), 66 deletions(-) create mode 100644 source/Halibut/Transport/Protocol/IHalibutMessage.cs diff --git a/source/Halibut.Tests/DataStreamFixture.cs b/source/Halibut.Tests/DataStreamFixture.cs index 6281ba14f..5b17956a5 100644 --- a/source/Halibut.Tests/DataStreamFixture.cs +++ b/source/Halibut.Tests/DataStreamFixture.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.IO; using System.Linq; using System.Threading.Tasks; @@ -55,11 +56,15 @@ public async Task ASyncDataStreamWriter_CanBeUsedInAsync() [Test] [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] - public async Task EndToEnd_ShouldLogErrorWhenDataStreamSendsTooManyBytes(ClientAndServiceTestCase clientAndServiceTestCase) + public async Task WhenSendingADataStream_AndWeSendMoreDataThanWeShould_ThenADescriptiveErrorIsLogged(ClientAndServiceTestCase clientAndServiceTestCase) { await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() .WithStandardServices() .AsLatestClientAndLatestServiceBuilder() + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimits + { + ThrowOnDataStreamSizeMismatch = false + }) .RecordingClientLogs(out var clientLogs) .RecordingServiceLogs(out var serviceLogs) .Build(CancellationToken)) @@ -83,16 +88,30 @@ await AssertException.Throws(async () => log.FormattedMessage.Contains("Data stream size mismatch detected during send") && log.FormattedMessage.Contains("Declared length: 10") && log.FormattedMessage.Contains("Actual bytes written: 100")); + + var allServiceLogs = serviceLogs.Values.SelectMany(log => log.GetLogs()).ToList(); + allServiceLogs.Should().Contain(log => + log.Type == EventType.Error && + log.FormattedMessage.Contains("Data stream size mismatch detected") && + log.FormattedMessage.Contains("Message ID:") && + log.FormattedMessage.Contains("Stream ID:") && + log.FormattedMessage.Contains("Expected length: 10") && + log.FormattedMessage.Contains("Total length of all DataStreams")); + } } [Test] [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] - public async Task EndToEnd_ShouldLogErrorWhenDataStreamSendsTooFewBytes(ClientAndServiceTestCase clientAndServiceTestCase) + public async Task WhenSendingADataStream_AndWeSendLessDataThanWeShould_ThenADescriptiveErrorIsLogged(ClientAndServiceTestCase clientAndServiceTestCase) { await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() .WithStandardServices() .AsLatestClientAndLatestServiceBuilder() + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimits + { + ThrowOnDataStreamSizeMismatch = false + }) .RecordingClientLogs(out var clientLogs) .RecordingServiceLogs(out var serviceLogs) .Build(CancellationToken)) @@ -121,7 +140,7 @@ await AssertException.Throws(async () => [Test] [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] - public async Task EndToEnd_ShouldThrowWhenConfiguredAndDataStreamSendsTooManyBytes(ClientAndServiceTestCase clientAndServiceTestCase) + public async Task WhenSendingADataStream_AndWeSendMoreDataThanWeShould_AndThrowOnDataStreamSizeMismatchIsEnabled_ThenItThrows(ClientAndServiceTestCase clientAndServiceTestCase) { await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() .WithStandardServices() @@ -147,19 +166,12 @@ public async Task EndToEnd_ShouldThrowWhenConfiguredAndDataStreamSendsTooManyByt await readDataStreamService.SendDataAsync(maliciousDataStream)); exception.And.Message.Should().Contain("Data stream size mismatch"); - - var allClientLogs = clientLogs.Values.SelectMany(log => log.GetLogs()).ToList(); - allClientLogs.Should().Contain(log => - log.Type == EventType.Error && - log.FormattedMessage.Contains("Data stream size mismatch detected during send") && - log.FormattedMessage.Contains("Declared length: 10") && - log.FormattedMessage.Contains("Actual bytes written: 100")); } } [Test] [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] - public async Task EndToEnd_ShouldThrowWhenConfiguredAndDataStreamSendsTooFewBytes(ClientAndServiceTestCase clientAndServiceTestCase) + public async Task WhenSendingADataStream_AndWeSendLessDataThanWeShould_AndThrowOnDataStreamSizeMismatchIsEnabled_ThenItThrows(ClientAndServiceTestCase clientAndServiceTestCase) { await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() .WithStandardServices() @@ -185,19 +197,12 @@ public async Task EndToEnd_ShouldThrowWhenConfiguredAndDataStreamSendsTooFewByte await readDataStreamService.SendDataAsync(underSizedDataStream)); exception.And.Message.Should().Contain("Data stream size mismatch"); - - var allClientLogs = clientLogs.Values.SelectMany(log => log.GetLogs()).ToList(); - allClientLogs.Should().Contain(log => - log.Type == EventType.Error && - log.FormattedMessage.Contains("Data stream size mismatch detected during send") && - log.FormattedMessage.Contains("Declared length: 100") && - log.FormattedMessage.Contains("Actual bytes written: 10")); } } [Test] [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] - public async Task EndToEnd_WithThrowOnMismatch_SecondRequestSucceedsQuicklyAfterFirstFails(ClientAndServiceTestCase clientAndServiceTestCase) + public async Task WhenSendingADataStream_AndWeSendLessDataThanWeShould_AndThrowOnDataStreamSizeMismatchIsEnabled_ThenSecondRequestSucceedsQuickly(ClientAndServiceTestCase clientAndServiceTestCase) { await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() .WithStandardServices() @@ -219,17 +224,11 @@ public async Task EndToEnd_WithThrowOnMismatch_SecondRequestSucceedsQuicklyAfter { await stream.WriteAsync(underSizedData, 0, underSizedData.Length, ct); }); - - var stopwatch = System.Diagnostics.Stopwatch.StartNew(); await AssertException.Throws(async () => await readDataStreamService.SendDataAsync(underSizedDataStream)); - var firstRequestTime = stopwatch.Elapsed; - firstRequestTime.Should().BeLessThan(TimeSpan.FromSeconds(10), - "first request should fail quickly due to ThrowOnDataStreamSizeMismatch"); - - stopwatch.Restart(); + var stopwatch = Stopwatch.StartNew(); var correctData = new byte[50]; new Random().NextBytes(correctData); @@ -243,14 +242,10 @@ await AssertException.Throws(async () => var secondRequestTime = stopwatch.Elapsed; received.Should().Be(50); secondRequestTime.Should().BeLessThan(TimeSpan.FromSeconds(10), - "second request with correct stream should complete quickly (much faster than 60s timeout)"); - - var allClientLogs = clientLogs.Values.SelectMany(log => log.GetLogs()).ToList(); - allClientLogs.Should().Contain(log => - log.Type == EventType.Error && - log.FormattedMessage.Contains("Data stream size mismatch detected during send") && - log.FormattedMessage.Contains("Declared length: 100") && - log.FormattedMessage.Contains("Actual bytes written: 10")); + "second request with correct stream should complete quickly, since the sender" + + " detects an issue it will close the connection which will result in the receiver" + + " seeing an EOF which results in it entering into a reconnect." + + " Previously the sender would need to wait 60s before reconnecting."); } } } diff --git a/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs b/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs index 431043f37..492bab9e5 100644 --- a/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs @@ -23,12 +23,14 @@ public async Task ShouldLogErrorWhenDataStreamSendsSizeMismatchedData() var memoryStream = new MemoryStream(); var serializer = new MessageSerializerBuilder(new LogFactory()).Build(); + var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); + limits.ThrowOnDataStreamSizeMismatch = false; var messageExchangeStream = new MessageExchangeStream( memoryStream, serializer, new NoOpControlMessageObserver(), - new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), + limits, inMemoryLog); var actualData = new byte[100]; @@ -53,8 +55,11 @@ public async Task ShouldLogErrorWhenDataStreamSendsSizeMismatchedData() logs.Should().Contain(log => log.Type == EventType.Error && log.FormattedMessage.Contains("Data stream size mismatch detected during send") && + log.FormattedMessage.Contains("Message ID:") && + log.FormattedMessage.Contains("Stream ID:") && log.FormattedMessage.Contains("Declared length: 10") && - log.FormattedMessage.Contains("Actual bytes written: 100")); + log.FormattedMessage.Contains("Actual bytes written: 100") && + log.FormattedMessage.Contains("Total length of all DataStreams")); } [Test] @@ -94,18 +99,20 @@ public async Task ShouldLogErrorWhenDataStreamReceivesSizeMismatchedData() var method = typeof(MessageExchangeStream).GetMethod("ReadStreamAsync", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); await (Task)method!.Invoke(messageExchangeStream, - new object[] { new[] { deserializedDataStream }, CancellationToken.None })!; + new object[] { "test-message-id", new[] { deserializedDataStream }, CancellationToken.None })!; }; await act.Should().ThrowAsync() - .WithMessage("*length of the file was expected to be: 10*"); + .WithMessage("*Data stream size mismatch detected*Message Id: test-message-id*Stream ID: *Expected length: 10*Actual length claimed at end: 100*"); var logs = inMemoryLog.GetLogs(); logs.Should().Contain(log => log.Type == EventType.Error && log.FormattedMessage.Contains("Data stream size mismatch detected") && + log.FormattedMessage.Contains("Stream ID:") && log.FormattedMessage.Contains("Expected length: 10") && - log.FormattedMessage.Contains("Actual length claimed at end: 100")); + log.FormattedMessage.Contains("Actual length claimed at end: 100") && + log.FormattedMessage.Contains("Total length of all DataStreams")); } [Test] @@ -115,12 +122,14 @@ public async Task ShouldLogErrorWhenDataStreamSendsTooFewBytes() var memoryStream = new MemoryStream(); var serializer = new MessageSerializerBuilder(new LogFactory()).Build(); + var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); + limits.ThrowOnDataStreamSizeMismatch = false; var messageExchangeStream = new MessageExchangeStream( memoryStream, serializer, new NoOpControlMessageObserver(), - new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), + limits, inMemoryLog); var actualData = new byte[10]; @@ -145,8 +154,11 @@ public async Task ShouldLogErrorWhenDataStreamSendsTooFewBytes() logs.Should().Contain(log => log.Type == EventType.Error && log.FormattedMessage.Contains("Data stream size mismatch detected during send") && + log.FormattedMessage.Contains("Message ID:") && + log.FormattedMessage.Contains("Stream ID:") && log.FormattedMessage.Contains("Declared length: 100") && - log.FormattedMessage.Contains("Actual bytes written: 10")); + log.FormattedMessage.Contains("Actual bytes written: 10") && + log.FormattedMessage.Contains("Total length of all DataStreams")); } [Test] @@ -184,7 +196,7 @@ public async Task ShouldLogErrorWhenDataStreamReceivesTooFewBytes() var method = typeof(MessageExchangeStream).GetMethod("ReadStreamAsync", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); await (Task)method!.Invoke(messageExchangeStream, - new object[] { new[] { deserializedDataStream }, CancellationToken.None })!; + new object[] { "test-message-id", new[] { deserializedDataStream }, CancellationToken.None })!; }; await act.Should().ThrowAsync() @@ -235,12 +247,17 @@ public async Task ShouldThrowExceptionWhenConfiguredAndDataStreamSendsSizeMismat Func act = async () => await messageExchangeStream.SendAsync(requestMessage, CancellationToken.None); await act.Should().ThrowAsync() - .WithMessage("*Data stream size mismatch: Stream * declared length 10 but actually wrote 100 bytes*"); + .WithMessage("*Data stream size mismatch detected during send*Stream ID:*Declared length: 10*Actual bytes written: 100*"); var logs = inMemoryLog.GetLogs(); logs.Should().Contain(log => log.Type == EventType.Error && - log.FormattedMessage.Contains("Data stream size mismatch detected during send")); + log.FormattedMessage.Contains("Data stream size mismatch detected during send") && + log.FormattedMessage.Contains("Message ID:") && + log.FormattedMessage.Contains("Stream ID:") && + log.FormattedMessage.Contains("Declared length: 10") && + log.FormattedMessage.Contains("Actual bytes written: 100") && + log.FormattedMessage.Contains("Total length of all DataStreams")); } [Test] diff --git a/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs b/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs index ab9861da0..9257838bc 100644 --- a/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs +++ b/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs @@ -172,10 +172,10 @@ public TimeSpan SafeTcpClientPooledConnectionTimeout public bool TcpNoDelay { get; set; } /// - /// When true, an exception will be thrown if a data stream's actual size doesn't match the declared size. - /// When false (default), size mismatches are only logged at Error level. + /// When true (default), an exception will be thrown if a data stream's actual size doesn't match the declared size. + /// When false, size mismatches are only logged at Error level. /// - public bool ThrowOnDataStreamSizeMismatch { get; set; } = false; + public bool ThrowOnDataStreamSizeMismatch { get; set; } = true; /// /// In the future these will become the default diff --git a/source/Halibut/Transport/Protocol/IHalibutMessage.cs b/source/Halibut/Transport/Protocol/IHalibutMessage.cs new file mode 100644 index 000000000..06abff703 --- /dev/null +++ b/source/Halibut/Transport/Protocol/IHalibutMessage.cs @@ -0,0 +1,7 @@ +namespace Halibut.Transport.Protocol +{ + public interface IHalibutMessage + { + public string Id { get; } + } +} \ No newline at end of file diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index 93da6a8e0..7d8704cf7 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -189,7 +189,8 @@ public async Task ReadRemoteIdentityAsync(CancellationToken canc public async Task SendAsync(T message, CancellationToken cancellationToken) { var serializedStreams = await serializer.WriteMessageAsync(stream, message, cancellationToken); - await WriteEachStreamAsync(serializedStreams, cancellationToken); + var messageId = (message as IHalibutMessage)?.Id ?? "unknown"; + await WriteEachStreamAsync(messageId, serializedStreams, cancellationToken); log.Write(EventType.Diagnostic, "Sent message"); } @@ -214,9 +215,10 @@ await stream.WithReadTimeout( } async Task ReceiveAsync(CancellationToken cancellationToken) + where T : IHalibutMessage { var (result, dataStreams) = await serializer.ReadMessageAsync(stream, cancellationToken); - await ReadStreamsAsync(dataStreams, cancellationToken); + await ReadStreamsAsync(result.Id, dataStreams, cancellationToken); log.Write(EventType.Diagnostic, "Received Message"); return result; } @@ -256,39 +258,45 @@ async Task ExpectServerIdentityAsync(CancellationToken cancellationToken) } } - async Task ReadStreamsAsync(IReadOnlyList deserializedStreams, CancellationToken cancellationToken) + async Task ReadStreamsAsync(string messageId, IReadOnlyList deserializedStreams, CancellationToken cancellationToken) { var expected = deserializedStreams.Count; for (var i = 0; i < expected; i++) { - await ReadStreamAsync(deserializedStreams, cancellationToken); + await ReadStreamAsync(messageId, deserializedStreams, cancellationToken); } } - async Task ReadStreamAsync(IReadOnlyList deserializedStreams, CancellationToken cancellationToken) + async Task ReadStreamAsync(string messageId, IReadOnlyList deserializedStreams, CancellationToken cancellationToken) { var id = new Guid(await stream.ReadBytesAsync(16, cancellationToken)); var length = await stream.ReadInt64Async(cancellationToken); var dataStream = FindStreamById(deserializedStreams, id); - var tempFile = await CopyStreamToFileAsync(id, length, stream, cancellationToken); + var tempFile = await CopyStreamToFileAsync(messageId, id, length, stream, deserializedStreams, cancellationToken); var lengthAgain = await stream.ReadInt64Async(cancellationToken); if (lengthAgain != length) { - log.Write(EventType.Error, "Data stream size mismatch detected. Stream ID: {0}, Expected length: {1}, Actual length claimed at end: {2}", id, length, lengthAgain); - throw new ProtocolException("There was a problem receiving a file stream: the length of the file was expected to be: " + length + " but less data was actually sent. This can happen if the remote party is sending a stream but the stream had already been partially read, or if the stream was being reused between calls."); + log.Write(EventType.Error, "Data stream size mismatch detected. Message ID: {0}, Stream ID: {1}, " + + "Expected length: {2}, Actual length claimed at end: {3}. " + + "Total length of all DataStreams to be sent is {4}", + messageId, id, length, lengthAgain, deserializedStreams.Select(d => d.Length).Sum()); + throw new ProtocolException($"Data stream size mismatch detected. Message Id: {messageId}, Stream ID: {id}, " + + $"Expected length: {length}, Actual length claimed at end: {lengthAgain}. " + + $"Total length of all DataStreams to be sent is {deserializedStreams.Select(d => d.Length).Sum()}"); } ((IDataStreamInternal)dataStream).Received(tempFile); } - async Task CopyStreamToFileAsync(Guid id, long length, Stream stream, CancellationToken cancellationToken) + async Task CopyStreamToFileAsync(string messageId, Guid id, long length, Stream stream, IReadOnlyList deserializedStreams, CancellationToken cancellationToken) { var path = Path.Combine(Path.GetTempPath(), string.Format("{0}_{1}", id.ToString(), Interlocked.Increment(ref streamCount))); long bytesLeftToRead = length; + var totalDataStreamLength = deserializedStreams.Select(d => d.Length).Sum(); #if !NETFRAMEWORK await #endif @@ -297,22 +305,32 @@ async Task CopyStreamToFileAsync(Guid id, long length, Stre var buffer = new byte[65*1024]; while (bytesLeftToRead > 0) { - int read; + int read = 0; try { read = await stream.ReadAsync(buffer, 0, (int)Math.Min(buffer.Length, bytesLeftToRead), cancellationToken); } catch (Exception ex) { - log.Write(EventType.Error, "Data stream reading failed. Stream ID: {0}, Expected length: {1}, Actual bytes read: {2}, Exception: {3}", id, length, length - bytesLeftToRead, ex.Message); + log.WriteException(EventType.Error, "Data stream reading failed. Message ID: {0}, Stream ID: {1}, " + + "Expected length: {2}, Actual bytes read: {3}. " + + "Total length of all DataStreams to be sent is {4}. Exception: {5}", + ex, + messageId, id, length, length - bytesLeftToRead, totalDataStreamLength, ex.Message); throw; } if (read == 0) { var bytesRead = length - bytesLeftToRead; - log.Write(EventType.Error, "Data stream reading failed. Stream ID: {0}, Expected length: {1}, Actual bytes read: {2}, Exception: {3}", id, length, bytesRead, $"Stream with length {length} was closed after only reading {bytesRead} bytes."); - throw new ProtocolException($"Stream with length {length} was closed after only reading {bytesRead} bytes."); + log.Write(EventType.Error, "Data stream reading failed. Message ID: {0}, Stream ID: {1}, " + + "Expected length: {2}, Actual bytes read: {3}. " + + "Total length of all DataStreams to be sent is {4}. Exception: Stream closed prematurely.", + messageId, id, length, bytesRead, totalDataStreamLength); + throw new ProtocolException($"Data stream reading failed. Message Id: {messageId}, Stream ID: {id}, " + + $"Expected length: {length}, Actual bytes read: {bytesRead}. " + + $"Total length of all DataStreams to be sent is {totalDataStreamLength}. " + + $"Stream with length {length} was closed after only reading {bytesRead} bytes."); } bytesLeftToRead -= read; @@ -334,9 +352,12 @@ static DataStream FindStreamById(IReadOnlyList deserializedStreams, return dataStream; } - async Task WriteEachStreamAsync(IEnumerable streams, CancellationToken cancellationToken) + async Task WriteEachStreamAsync(string messageId, IEnumerable streams, CancellationToken cancellationToken) { - foreach (var dataStream in streams) + var streamsList = streams.ToList(); + var totalDataStreamLength = streamsList.Select(d => d.Length).Sum(); + + foreach (var dataStream in streamsList) { await stream.WriteByteArrayAsync(dataStream.Id.ToByteArray(), cancellationToken); await stream.WriteLongAsync(dataStream.Length, cancellationToken); @@ -348,11 +369,16 @@ async Task WriteEachStreamAsync(IEnumerable streams, CancellationTok if (byteCountingStream.BytesWritten != dataStream.Length) { - log.Write(EventType.Error, "Data stream size mismatch detected during send. Stream ID: {0}, Declared length: {1}, Actual bytes written: {2}", dataStream.Id, dataStream.Length, byteCountingStream.BytesWritten); + log.Write(EventType.Error, "Data stream size mismatch detected during send. Message ID: {0}, Stream ID: {1}, " + + "Declared length: {2}, Actual bytes written: {3}. " + + "Total length of all DataStreams to be sent is {4}", + messageId, dataStream.Id, dataStream.Length, byteCountingStream.BytesWritten, totalDataStreamLength); if (halibutTimeoutsAndLimits.ThrowOnDataStreamSizeMismatch) { - throw new ProtocolException($"Data stream size mismatch: Stream {dataStream.Id} declared length {dataStream.Length} but actually wrote {byteCountingStream.BytesWritten} bytes."); + throw new ProtocolException($"Data stream size mismatch detected during send. Message Id: {messageId}, Stream ID: {dataStream.Id}, " + + $"Declared length: {dataStream.Length}, Actual bytes written: {byteCountingStream.BytesWritten}. " + + $"Total length of all DataStreams to be sent is {totalDataStreamLength}."); } } diff --git a/source/Halibut/Transport/Protocol/RequestMessage.cs b/source/Halibut/Transport/Protocol/RequestMessage.cs index 25daec614..d591c5bb9 100644 --- a/source/Halibut/Transport/Protocol/RequestMessage.cs +++ b/source/Halibut/Transport/Protocol/RequestMessage.cs @@ -3,7 +3,7 @@ namespace Halibut.Transport.Protocol { - public class RequestMessage + public class RequestMessage : IHalibutMessage { #pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. [JsonProperty("id")] diff --git a/source/Halibut/Transport/Protocol/ResponseMessage.cs b/source/Halibut/Transport/Protocol/ResponseMessage.cs index f73dc6ca8..ee7102aea 100644 --- a/source/Halibut/Transport/Protocol/ResponseMessage.cs +++ b/source/Halibut/Transport/Protocol/ResponseMessage.cs @@ -5,7 +5,7 @@ namespace Halibut.Transport.Protocol { - public class ResponseMessage + public class ResponseMessage : IHalibutMessage { [JsonProperty("id")] #pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. From 72d01066df73e0f648dfe3cb43ea6b2042f08b04 Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Thu, 26 Mar 2026 10:23:42 +1100 Subject: [PATCH 08/13] Message can be null --- source/Halibut.Tests/DataStreamFixture.cs | 14 ++++++-------- .../Transport/Protocol/IMessageSerializer.cs | 2 +- .../Transport/Protocol/MessageExchangeStream.cs | 5 ++++- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/source/Halibut.Tests/DataStreamFixture.cs b/source/Halibut.Tests/DataStreamFixture.cs index 5b17956a5..4a87a3dcb 100644 --- a/source/Halibut.Tests/DataStreamFixture.cs +++ b/source/Halibut.Tests/DataStreamFixture.cs @@ -61,10 +61,9 @@ public async Task WhenSendingADataStream_AndWeSendMoreDataThanWeShould_ThenADesc await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() .WithStandardServices() .AsLatestClientAndLatestServiceBuilder() - .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimits - { - ThrowOnDataStreamSizeMismatch = false - }) + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder() + .Build() + .Apply(h => h.ThrowOnDataStreamSizeMismatch = false)) .RecordingClientLogs(out var clientLogs) .RecordingServiceLogs(out var serviceLogs) .Build(CancellationToken)) @@ -108,10 +107,9 @@ public async Task WhenSendingADataStream_AndWeSendLessDataThanWeShould_ThenADesc await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() .WithStandardServices() .AsLatestClientAndLatestServiceBuilder() - .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimits - { - ThrowOnDataStreamSizeMismatch = false - }) + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder() + .Build() + .Apply(h => h.ThrowOnDataStreamSizeMismatch = false)) .RecordingClientLogs(out var clientLogs) .RecordingServiceLogs(out var serviceLogs) .Build(CancellationToken)) diff --git a/source/Halibut/Transport/Protocol/IMessageSerializer.cs b/source/Halibut/Transport/Protocol/IMessageSerializer.cs index 84a048eb4..474d1cb0c 100644 --- a/source/Halibut/Transport/Protocol/IMessageSerializer.cs +++ b/source/Halibut/Transport/Protocol/IMessageSerializer.cs @@ -10,6 +10,6 @@ namespace Halibut.Transport.Protocol public interface IMessageSerializer { Task> WriteMessageAsync(Stream stream, T message, CancellationToken cancellationToken); - Task<(T Message, IReadOnlyList DataStreams)> ReadMessageAsync(RewindableBufferStream stream, CancellationToken cancellationToken); + Task<(T? Message, IReadOnlyList DataStreams)> ReadMessageAsync(RewindableBufferStream stream, CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index 7d8704cf7..9274e4918 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -218,7 +218,10 @@ await stream.WithReadTimeout( where T : IHalibutMessage { var (result, dataStreams) = await serializer.ReadMessageAsync(stream, cancellationToken); - await ReadStreamsAsync(result.Id, dataStreams, cancellationToken); + if (dataStreams.Count > 0) + { + await ReadStreamsAsync(result?.Id??"", dataStreams, cancellationToken); + } log.Write(EventType.Diagnostic, "Received Message"); return result; } From 3bc86c68c741d696e0daa83c248de0c137853516 Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Thu, 26 Mar 2026 10:26:16 +1100 Subject: [PATCH 09/13] . --- .../Transport/Protocol/MessageSerializerFixture.cs | 4 ++-- source/Halibut/Transport/Protocol/MessageSerializer.cs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/Halibut.Tests/Transport/Protocol/MessageSerializerFixture.cs b/source/Halibut.Tests/Transport/Protocol/MessageSerializerFixture.cs index fe3ec7afe..585baad4a 100644 --- a/source/Halibut.Tests/Transport/Protocol/MessageSerializerFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/MessageSerializerFixture.cs @@ -104,7 +104,7 @@ public async Task BackwardsCompatibility_ExtraParametersInServerErrorAreIgnored( await using (var stream = new RewindableBufferStream(new MemoryStream(Convert.FromBase64String(base64Bson)))) { var result = await ReadMessage(sut, stream); - result.Error.Should().NotBeNull(); + result!.Error.Should().NotBeNull(); result.Error!.Message = "foo"; result.Error.HalibutErrorType = "MethodNotFoundHalibutClientException"; } @@ -303,7 +303,7 @@ static byte[] DeflateString(string s) } } - async Task ReadMessage(MessageSerializer messageSerializer, RewindableBufferStream rewindableBufferStream) + async Task ReadMessage(MessageSerializer messageSerializer, RewindableBufferStream rewindableBufferStream) { return (await messageSerializer.ReadMessageAsync(rewindableBufferStream, CancellationToken)).Message; } diff --git a/source/Halibut/Transport/Protocol/MessageSerializer.cs b/source/Halibut/Transport/Protocol/MessageSerializer.cs index 201b182d6..f9de44442 100644 --- a/source/Halibut/Transport/Protocol/MessageSerializer.cs +++ b/source/Halibut/Transport/Protocol/MessageSerializer.cs @@ -64,7 +64,7 @@ public async Task> WriteMessageAsync(Stream stream, return serializedStreams; } - public async Task<(T Message, IReadOnlyList DataStreams)> ReadMessageAsync(RewindableBufferStream stream, CancellationToken cancellationToken) + public async Task<(T? Message, IReadOnlyList DataStreams)> ReadMessageAsync(RewindableBufferStream stream, CancellationToken cancellationToken) { await using (var errorRecordingStream = new ErrorRecordingStream(stream, closeInner: false)) { From 83b56c141ddf2a39164911bc9a50a30883ecd441 Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Thu, 26 Mar 2026 10:47:33 +1100 Subject: [PATCH 10/13] Allow more tests to pass on mac --- source/Halibut.Tests/DataStreamFixture.cs | 2 +- ...ExceptionReturnedByHalibutProxyExtensionMethodFixture.cs | 3 ++- source/Halibut.Tests/ExceptionContractFixture.cs | 1 + source/Halibut.Tests/FailureModesFixture.cs | 6 +++++- source/Halibut.Tests/PollingServiceTimeoutsFixture.cs | 1 + source/Halibut.Tests/ProxyFixture.cs | 1 + .../Halibut.Tests/Timeouts/ReceiveResponseTimeoutTests.cs | 3 +++ .../SendingAndReceivingRequestMessagesTimeoutsFixture.cs | 2 ++ .../Transport/Streams/NetworkTimeoutStreamFixture.cs | 4 ++++ .../ExceptionReturnedByHalibutProxyExtensionMethod.cs | 2 ++ 10 files changed, 22 insertions(+), 3 deletions(-) diff --git a/source/Halibut.Tests/DataStreamFixture.cs b/source/Halibut.Tests/DataStreamFixture.cs index 4a87a3dcb..718bcf4d5 100644 --- a/source/Halibut.Tests/DataStreamFixture.cs +++ b/source/Halibut.Tests/DataStreamFixture.cs @@ -239,7 +239,7 @@ await AssertException.Throws(async () => var secondRequestTime = stopwatch.Elapsed; received.Should().Be(50); - secondRequestTime.Should().BeLessThan(TimeSpan.FromSeconds(10), + secondRequestTime.Should().BeLessThan(TimeSpan.FromSeconds(30), "second request with correct stream should complete quickly, since the sender" + " detects an issue it will close the connection which will result in the receiver" + " seeing an EOF which results in it entering into a reconnect." + diff --git a/source/Halibut.Tests/Diagnostics/ExceptionReturnedByHalibutProxyExtensionMethodFixture.cs b/source/Halibut.Tests/Diagnostics/ExceptionReturnedByHalibutProxyExtensionMethodFixture.cs index e6482d5de..35934db94 100644 --- a/source/Halibut.Tests/Diagnostics/ExceptionReturnedByHalibutProxyExtensionMethodFixture.cs +++ b/source/Halibut.Tests/Diagnostics/ExceptionReturnedByHalibutProxyExtensionMethodFixture.cs @@ -122,7 +122,8 @@ public async Task WhenTheConnectionPausesWaitingForAResponse(ClientAndServiceTes .Be(HalibutNetworkExceptionType.IsNetworkError); exception.Message.Should().ContainAny( - "Unable to read data from the transport connection: Connection timed out.", + "Unable to read data from the transport connection: Connection timed out.", + "Unable to read data from the transport connection: Operation timed out.", "Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond."); } } diff --git a/source/Halibut.Tests/ExceptionContractFixture.cs b/source/Halibut.Tests/ExceptionContractFixture.cs index 9222f01ed..3589e9766 100644 --- a/source/Halibut.Tests/ExceptionContractFixture.cs +++ b/source/Halibut.Tests/ExceptionContractFixture.cs @@ -60,6 +60,7 @@ public async Task WhenThePollingRequestHasBegunTransfer_AndATcpTimeoutIsReached_ (await AssertException.Throws(async () => await doSomeActionClient.ActionAsync(new(CancellationToken)))) .And.Message.Should().ContainAny( "Unable to read data from the transport connection: Connection timed out.", + "Unable to read data from the transport connection: Operation timed out.", "Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond"); waitSemaphore.Release(); diff --git a/source/Halibut.Tests/FailureModesFixture.cs b/source/Halibut.Tests/FailureModesFixture.cs index 312de19a1..e074c024a 100644 --- a/source/Halibut.Tests/FailureModesFixture.cs +++ b/source/Halibut.Tests/FailureModesFixture.cs @@ -131,7 +131,11 @@ public async Task FailOnInvalidHostname() else { // Failed with: An error occurred when sending a request to 'https://sduj08ud9382ujd98dw9fh934hdj2389u982:8000/', before the request could begin: Name or service not known, but found False. - new [] {"No such device or address", "Resource temporarily unavailable", "Name or service not known"}.Any(message.Contains).Should().BeTrue($"Message does not match known strings: {message}"); + new [] {"No such device or address", + "Resource temporarily unavailable", + "Name or service not known", + "nodename nor servname provided, or not known" + }.Any(message.Contains).Should().BeTrue($"Message does not match known strings: {message}"); } } } diff --git a/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs b/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs index d793ff02e..54f988c78 100644 --- a/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs +++ b/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs @@ -120,6 +120,7 @@ public async Task WhenThePollingRequestHasBegunTransfer_AndWeTimeoutWaitingForTh var stopwatch = Stopwatch.StartNew(); var exception = (await AssertException.Throws(async () => await doSomeActionClient.ActionAsync(new(CancellationToken)))).And; exception.Message.Should().ContainAny( + "Unable to read data from the transport connection: Operation timed out.", "Unable to read data from the transport connection: Connection timed out.", "Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond"); exception.ConnectionState.Should().Be(ConnectionState.Unknown); diff --git a/source/Halibut.Tests/ProxyFixture.cs b/source/Halibut.Tests/ProxyFixture.cs index 142656563..907057d93 100644 --- a/source/Halibut.Tests/ProxyFixture.cs +++ b/source/Halibut.Tests/ProxyFixture.cs @@ -85,6 +85,7 @@ public async Task ClientTimesOutConnectingToAProxy_WhenTheProxyHangsDuringConnec "No connection could be made because the target machine actively refused it", "the polling endpoint did not collect the request within the allowed time", "Unable to read data from the transport connection: Connection timed out.", + "Unable to read data from the transport connection: Operation timed out.", "A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.", "A timeout while waiting for the proxy server at"); ; diff --git a/source/Halibut.Tests/Timeouts/ReceiveResponseTimeoutTests.cs b/source/Halibut.Tests/Timeouts/ReceiveResponseTimeoutTests.cs index de4f7478c..b67006783 100644 --- a/source/Halibut.Tests/Timeouts/ReceiveResponseTimeoutTests.cs +++ b/source/Halibut.Tests/Timeouts/ReceiveResponseTimeoutTests.cs @@ -39,6 +39,7 @@ public async Task WhenRpcExecutionExceedsReceiveResponseTimeout_ThenInitialDataR (await AssertionExtensions.Should(() => doSomeActionClient.ActionAsync()).ThrowAsync()) .And.Message.Should().ContainAny( "Connection timed out.", + "Operation timed out.", "A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond"); } } @@ -85,6 +86,7 @@ public async Task WhenRpcExecutionIsWithinReceiveResponseTimeout_ButSubsequentDa (await AssertionExtensions.Should(() => lastServiceClient.GetListAsync()).ThrowAsync()) .And.Message.Should().ContainAny( "Connection timed out.", + "Operation timed out.", "A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond", "Unable to read data from the transport connection: Connection reset by peer"); } @@ -117,6 +119,7 @@ public async Task WhenRpcExecutionIsWithinReceiveResponseTimeout_ButDataStreamDa Logger.Information(e, "The received expected exception, we were expecting one"); e.Message.Should().ContainAny( "Connection timed out.", + "Operation timed out.", "A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond"); } } diff --git a/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs b/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs index 419ef6c10..bde4abe0f 100644 --- a/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs +++ b/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs @@ -202,6 +202,7 @@ static void AssertExceptionLooksLikeAWriteTimeout(HalibutClientException? e) { e!.Message.Should().ContainAny( "Unable to write data to the transport connection: Connection timed out.", + "Unable to write data to the transport connection: Operation timed out.", " Unable to write data to the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond"); e.IsNetworkError().Should().Be(HalibutNetworkExceptionType.IsNetworkError); @@ -210,6 +211,7 @@ static void AssertExceptionLooksLikeAWriteTimeout(HalibutClientException? e) static void AssertExceptionMessageLooksLikeAReadTimeout(HalibutClientException? e) { e!.Message.Should().ContainAny( + "Unable to read data from the transport connection: Operation timed out.", "Unable to read data from the transport connection: Connection timed out.", "Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond."); diff --git a/source/Halibut.Tests/Transport/Streams/NetworkTimeoutStreamFixture.cs b/source/Halibut.Tests/Transport/Streams/NetworkTimeoutStreamFixture.cs index 987bc8b55..275097c09 100644 --- a/source/Halibut.Tests/Transport/Streams/NetworkTimeoutStreamFixture.cs +++ b/source/Halibut.Tests/Transport/Streams/NetworkTimeoutStreamFixture.cs @@ -66,6 +66,7 @@ public async Task ReadCalls_ShouldTimeout_AndCloseTheStream_AndThrowExceptionTha actualException.Should().NotBeNull().And.BeOfType(); actualException!.Message.Should().ContainAny( "Unable to read data from the transport connection: Connection timed out.", + "Unable to read data from the transport connection: Operation timed out.", "Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond."); stopWatch.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(10)); @@ -177,6 +178,7 @@ public async Task WriteCalls_ShouldTimeout_AndCloseTheStream_AndThrowExceptionTh actualException.Should().NotBeNull().And.BeOfType(); actualException!.Message.Should().ContainAny( "Unable to write data to the transport connection: Connection timed out.", + "Unable to write data to the transport connection: Operation timed out.", "Unable to write data to the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond."); stopWatch.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(10)); @@ -311,6 +313,7 @@ public async Task FlushAsync_ShouldTimeout_AndCloseTheStream_AndThrowExceptionTh (await AssertException.Throws(async () => await sut.FlushAsync(CancellationToken))) .And.Message.Should().ContainAny( "Unable to write data to the transport connection: Connection timed out.", + "Unable to write data to the transport connection: Operation timed out.", "Unable to write data to the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond."); AssertStreamWasClosed(StreamMethod.Async, callCountingStream); @@ -444,6 +447,7 @@ public async Task CopyToCalls_ShouldTimeout_AndCloseTheStream_AndThrowExceptionT actualException.Should().NotBeNull().And.BeOfType(); actualException!.Message.Should().ContainAny( "Unable to read data from the transport connection: Connection timed out.", + "Unable to read data from the transport connection: Operation timed out.", "Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond."); stopWatch.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(10)); diff --git a/source/Halibut/Diagnostics/ExceptionReturnedByHalibutProxyExtensionMethod.cs b/source/Halibut/Diagnostics/ExceptionReturnedByHalibutProxyExtensionMethod.cs index 011eefa37..9c0a60148 100644 --- a/source/Halibut/Diagnostics/ExceptionReturnedByHalibutProxyExtensionMethod.cs +++ b/source/Halibut/Diagnostics/ExceptionReturnedByHalibutProxyExtensionMethod.cs @@ -113,6 +113,8 @@ public static HalibutNetworkExceptionType IsNetworkError(this Exception exceptio if (exception.Message.Contains("The I/O operation has been aborted because of either a thread exit or an application request")) return HalibutNetworkExceptionType.IsNetworkError; if (exception.Message.Contains("A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.")) return HalibutNetworkExceptionType.IsNetworkError; if (exception.Message.Contains("The remote party closed the WebSocket connection without completing the close handshake.")) return HalibutNetworkExceptionType.IsNetworkError; + if (exception.Message.Contains("Unable to read data from the transport connection: Operation timed out.")) return HalibutNetworkExceptionType.IsNetworkError; + if (exception.Message.Contains("Unable to write data to the transport connection: Operation timed out.")) return HalibutNetworkExceptionType.IsNetworkError; } From 232778a707d9e25d919daf7f76aebcf2049a56ae Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Thu, 26 Mar 2026 11:02:13 +1100 Subject: [PATCH 11/13] . --- source/Halibut.Tests/DataStreamFixture.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/Halibut.Tests/DataStreamFixture.cs b/source/Halibut.Tests/DataStreamFixture.cs index 718bcf4d5..645a3f0e6 100644 --- a/source/Halibut.Tests/DataStreamFixture.cs +++ b/source/Halibut.Tests/DataStreamFixture.cs @@ -223,11 +223,11 @@ public async Task WhenSendingADataStream_AndWeSendLessDataThanWeShould_AndThrowO await stream.WriteAsync(underSizedData, 0, underSizedData.Length, ct); }); + var stopwatch = Stopwatch.StartNew(); + await AssertException.Throws(async () => await readDataStreamService.SendDataAsync(underSizedDataStream)); - var stopwatch = Stopwatch.StartNew(); - var correctData = new byte[50]; new Random().NextBytes(correctData); var correctDataStream = new DataStream(50, async (stream, ct) => @@ -239,11 +239,11 @@ await AssertException.Throws(async () => var secondRequestTime = stopwatch.Elapsed; received.Should().Be(50); + Logger.Information("All requests took: {Time}s", secondRequestTime); secondRequestTime.Should().BeLessThan(TimeSpan.FromSeconds(30), - "second request with correct stream should complete quickly, since the sender" + - " detects an issue it will close the connection which will result in the receiver" + - " seeing an EOF which results in it entering into a reconnect." + - " Previously the sender would need to wait 60s before reconnecting."); + "We should detect the wrong size DataStream, this results in an immediate exception being raised." + + " This will result in the client killing the connection, which will result in the client also receiving a EOF. " + + "This will result in both sides quickly re-connecting allowing another RPC to be re-attempted quickly."); } } } From f19a338f1384e4b7785f60b2b976e2d1df3abb9d Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Thu, 26 Mar 2026 11:07:43 +1100 Subject: [PATCH 12/13] . --- .../Protocol/MessageExchangeStreamFixture.cs | 302 ------------------ 1 file changed, 302 deletions(-) delete mode 100644 source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs diff --git a/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs b/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs deleted file mode 100644 index 492bab9e5..000000000 --- a/source/Halibut.Tests/Transport/Protocol/MessageExchangeStreamFixture.cs +++ /dev/null @@ -1,302 +0,0 @@ -using System; -using System.IO; -using System.Threading; -using System.Threading.Tasks; -using FluentAssertions; -using Halibut.Diagnostics; -using Halibut.Tests.Support; -using Halibut.Tests.Support.Logging; -using Halibut.Transport; -using Halibut.Transport.Observability; -using Halibut.Transport.Protocol; -using NSubstitute; -using NUnit.Framework; - -namespace Halibut.Tests.Transport.Protocol -{ - public class MessageExchangeStreamFixture : BaseTest - { - [Test] - public async Task ShouldLogErrorWhenDataStreamSendsSizeMismatchedData() - { - var inMemoryLog = new InMemoryLogWriter(); - var memoryStream = new MemoryStream(); - - var serializer = new MessageSerializerBuilder(new LogFactory()).Build(); - var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); - limits.ThrowOnDataStreamSizeMismatch = false; - - var messageExchangeStream = new MessageExchangeStream( - memoryStream, - serializer, - new NoOpControlMessageObserver(), - limits, - inMemoryLog); - - var actualData = new byte[100]; - new Random().NextBytes(actualData); - - var maliciousDataStream = new DataStream(10, async (stream, ct) => - { - await stream.WriteAsync(actualData, 0, actualData.Length, ct); - }); - - var requestMessage = new RequestMessage - { - Destination = new ServiceEndPoint(new Uri("https://example.com"), "ABC123", new HalibutTimeoutsAndLimitsForTestsBuilder().Build()), - MethodName = "Test", - ServiceName = "TestService", - Params = new object[] { maliciousDataStream } - }; - - await messageExchangeStream.SendAsync(requestMessage, CancellationToken.None); - - var logs = inMemoryLog.GetLogs(); - logs.Should().Contain(log => - log.Type == EventType.Error && - log.FormattedMessage.Contains("Data stream size mismatch detected during send") && - log.FormattedMessage.Contains("Message ID:") && - log.FormattedMessage.Contains("Stream ID:") && - log.FormattedMessage.Contains("Declared length: 10") && - log.FormattedMessage.Contains("Actual bytes written: 100") && - log.FormattedMessage.Contains("Total length of all DataStreams")); - } - - [Test] - public async Task ShouldLogErrorWhenDataStreamReceivesSizeMismatchedData() - { - var inMemoryLog = new InMemoryLogWriter(); - - var serializer = new MessageSerializerBuilder(new LogFactory()).Build(); - - var streamData = new MemoryStream(); - var writer = new BinaryWriter(streamData); - - var dataStreamId = Guid.NewGuid(); - writer.Write(dataStreamId.ToByteArray()); - writer.Write((long)10); - - var actualData = new byte[10]; - new Random().NextBytes(actualData); - writer.Write(actualData); - - writer.Write((long)100); - - streamData.Position = 0; - - var messageExchangeStream = new MessageExchangeStream( - streamData, - serializer, - new NoOpControlMessageObserver(), - new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), - inMemoryLog); - - var deserializedDataStream = new DataStream(); - typeof(DataStream).GetProperty("Id")!.SetValue(deserializedDataStream, dataStreamId); - - Func act = async () => - { - var method = typeof(MessageExchangeStream).GetMethod("ReadStreamAsync", - System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); - await (Task)method!.Invoke(messageExchangeStream, - new object[] { "test-message-id", new[] { deserializedDataStream }, CancellationToken.None })!; - }; - - await act.Should().ThrowAsync() - .WithMessage("*Data stream size mismatch detected*Message Id: test-message-id*Stream ID: *Expected length: 10*Actual length claimed at end: 100*"); - - var logs = inMemoryLog.GetLogs(); - logs.Should().Contain(log => - log.Type == EventType.Error && - log.FormattedMessage.Contains("Data stream size mismatch detected") && - log.FormattedMessage.Contains("Stream ID:") && - log.FormattedMessage.Contains("Expected length: 10") && - log.FormattedMessage.Contains("Actual length claimed at end: 100") && - log.FormattedMessage.Contains("Total length of all DataStreams")); - } - - [Test] - public async Task ShouldLogErrorWhenDataStreamSendsTooFewBytes() - { - var inMemoryLog = new InMemoryLogWriter(); - var memoryStream = new MemoryStream(); - - var serializer = new MessageSerializerBuilder(new LogFactory()).Build(); - var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); - limits.ThrowOnDataStreamSizeMismatch = false; - - var messageExchangeStream = new MessageExchangeStream( - memoryStream, - serializer, - new NoOpControlMessageObserver(), - limits, - inMemoryLog); - - var actualData = new byte[10]; - new Random().NextBytes(actualData); - - var underSizedDataStream = new DataStream(100, async (stream, ct) => - { - await stream.WriteAsync(actualData, 0, actualData.Length, ct); - }); - - var requestMessage = new RequestMessage - { - Destination = new ServiceEndPoint(new Uri("https://example.com"), "ABC123", new HalibutTimeoutsAndLimitsForTestsBuilder().Build()), - MethodName = "Test", - ServiceName = "TestService", - Params = new object[] { underSizedDataStream } - }; - - await messageExchangeStream.SendAsync(requestMessage, CancellationToken.None); - - var logs = inMemoryLog.GetLogs(); - logs.Should().Contain(log => - log.Type == EventType.Error && - log.FormattedMessage.Contains("Data stream size mismatch detected during send") && - log.FormattedMessage.Contains("Message ID:") && - log.FormattedMessage.Contains("Stream ID:") && - log.FormattedMessage.Contains("Declared length: 100") && - log.FormattedMessage.Contains("Actual bytes written: 10") && - log.FormattedMessage.Contains("Total length of all DataStreams")); - } - - [Test] - public async Task ShouldLogErrorWhenDataStreamReceivesTooFewBytes() - { - var inMemoryLog = new InMemoryLogWriter(); - - var serializer = new MessageSerializerBuilder(new LogFactory()).Build(); - - var streamData = new MemoryStream(); - var writer = new BinaryWriter(streamData); - - var dataStreamId = Guid.NewGuid(); - writer.Write(dataStreamId.ToByteArray()); - writer.Write((long)100); - - var actualData = new byte[10]; - new Random().NextBytes(actualData); - writer.Write(actualData); - - streamData.Position = 0; - - var messageExchangeStream = new MessageExchangeStream( - streamData, - serializer, - new NoOpControlMessageObserver(), - new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), - inMemoryLog); - - var deserializedDataStream = new DataStream(); - typeof(DataStream).GetProperty("Id")!.SetValue(deserializedDataStream, dataStreamId); - - Func act = async () => - { - var method = typeof(MessageExchangeStream).GetMethod("ReadStreamAsync", - System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); - await (Task)method!.Invoke(messageExchangeStream, - new object[] { "test-message-id", new[] { deserializedDataStream }, CancellationToken.None })!; - }; - - await act.Should().ThrowAsync() - .WithMessage("*Stream with length 100 was closed after only reading 10 bytes*"); - - var logs = inMemoryLog.GetLogs(); - logs.Should().Contain(log => - log.Type == EventType.Error && - log.FormattedMessage.Contains("Data stream reading failed") && - log.FormattedMessage.Contains($"Stream ID: {dataStreamId}") && - log.FormattedMessage.Contains("Expected length: 100") && - log.FormattedMessage.Contains("Actual bytes read: 10")); - } - - [Test] - public async Task ShouldThrowExceptionWhenConfiguredAndDataStreamSendsSizeMismatch() - { - var inMemoryLog = new InMemoryLogWriter(); - var memoryStream = new MemoryStream(); - - var serializer = new MessageSerializerBuilder(new LogFactory()).Build(); - var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); - limits.ThrowOnDataStreamSizeMismatch = true; - - var messageExchangeStream = new MessageExchangeStream( - memoryStream, - serializer, - new NoOpControlMessageObserver(), - limits, - inMemoryLog); - - var actualData = new byte[100]; - new Random().NextBytes(actualData); - - var maliciousDataStream = new DataStream(10, async (stream, ct) => - { - await stream.WriteAsync(actualData, 0, actualData.Length, ct); - }); - - var requestMessage = new RequestMessage - { - Destination = new ServiceEndPoint(new Uri("https://example.com"), "ABC123", new HalibutTimeoutsAndLimitsForTestsBuilder().Build()), - MethodName = "Test", - ServiceName = "TestService", - Params = new object[] { maliciousDataStream } - }; - - Func act = async () => await messageExchangeStream.SendAsync(requestMessage, CancellationToken.None); - - await act.Should().ThrowAsync() - .WithMessage("*Data stream size mismatch detected during send*Stream ID:*Declared length: 10*Actual bytes written: 100*"); - - var logs = inMemoryLog.GetLogs(); - logs.Should().Contain(log => - log.Type == EventType.Error && - log.FormattedMessage.Contains("Data stream size mismatch detected during send") && - log.FormattedMessage.Contains("Message ID:") && - log.FormattedMessage.Contains("Stream ID:") && - log.FormattedMessage.Contains("Declared length: 10") && - log.FormattedMessage.Contains("Actual bytes written: 100") && - log.FormattedMessage.Contains("Total length of all DataStreams")); - } - - [Test] - public async Task ShouldNotLogErrorWhenDataStreamSendsCorrectSize() - { - var inMemoryLog = new InMemoryLogWriter(); - var memoryStream = new MemoryStream(); - - var serializer = new MessageSerializerBuilder(new LogFactory()).Build(); - - var messageExchangeStream = new MessageExchangeStream( - memoryStream, - serializer, - new NoOpControlMessageObserver(), - new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), - inMemoryLog); - - var actualData = new byte[10]; - new Random().NextBytes(actualData); - - var correctDataStream = new DataStream(10, async (stream, ct) => - { - await stream.WriteAsync(actualData, 0, actualData.Length, ct); - }); - - var requestMessage = new RequestMessage - { - Destination = new ServiceEndPoint(new Uri("https://example.com"), "ABC123", new HalibutTimeoutsAndLimitsForTestsBuilder().Build()), - MethodName = "Test", - ServiceName = "TestService", - Params = new object[] { correctDataStream } - }; - - await messageExchangeStream.SendAsync(requestMessage, CancellationToken.None); - - var logs = inMemoryLog.GetLogs(); - logs.Should().NotContain(log => - log.Type == EventType.Error && - log.FormattedMessage.Contains("Data stream size mismatch")); - } - } -} From 964bfac7f2220bc508173b3c49ab45ce377c0bc9 Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Thu, 26 Mar 2026 12:39:22 +1100 Subject: [PATCH 13/13] . --- .../Protocol/MessageSerializerFixture.cs | 1 + .../Protocol/MessageExchangeStream.cs | 41 ++++++++++--------- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/source/Halibut.Tests/Transport/Protocol/MessageSerializerFixture.cs b/source/Halibut.Tests/Transport/Protocol/MessageSerializerFixture.cs index 585baad4a..c36acdd85 100644 --- a/source/Halibut.Tests/Transport/Protocol/MessageSerializerFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/MessageSerializerFixture.cs @@ -104,6 +104,7 @@ public async Task BackwardsCompatibility_ExtraParametersInServerErrorAreIgnored( await using (var stream = new RewindableBufferStream(new MemoryStream(Convert.FromBase64String(base64Bson)))) { var result = await ReadMessage(sut, stream); + result.Should().NotBeNull(); result!.Error.Should().NotBeNull(); result.Error!.Message = "foo"; result.Error.HalibutErrorType = "MethodNotFoundHalibutClientException"; diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index 9274e4918..dd1b26b8c 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -220,7 +220,7 @@ await stream.WithReadTimeout( var (result, dataStreams) = await serializer.ReadMessageAsync(stream, cancellationToken); if (dataStreams.Count > 0) { - await ReadStreamsAsync(result?.Id??"", dataStreams, cancellationToken); + await ReadStreamsAsync(result?.Id ?? "", dataStreams, cancellationToken); } log.Write(EventType.Diagnostic, "Received Message"); return result; @@ -277,8 +277,9 @@ async Task ReadStreamAsync(string messageId, IReadOnlyList deseriali var id = new Guid(await stream.ReadBytesAsync(16, cancellationToken)); var length = await stream.ReadInt64Async(cancellationToken); var dataStream = FindStreamById(deserializedStreams, id); + long totalSizeOfAllDataStreams = deserializedStreams.Select(d => d.Length).Sum(); - var tempFile = await CopyStreamToFileAsync(messageId, id, length, stream, deserializedStreams, cancellationToken); + var tempFile = await CopyStreamToFileAsync(id, length, stream, messageId, totalSizeOfAllDataStreams, cancellationToken); var lengthAgain = await stream.ReadInt64Async(cancellationToken); if (lengthAgain != length) @@ -286,20 +287,19 @@ async Task ReadStreamAsync(string messageId, IReadOnlyList deseriali log.Write(EventType.Error, "Data stream size mismatch detected. Message ID: {0}, Stream ID: {1}, " + "Expected length: {2}, Actual length claimed at end: {3}. " + "Total length of all DataStreams to be sent is {4}", - messageId, id, length, lengthAgain, deserializedStreams.Select(d => d.Length).Sum()); + messageId, id, length, lengthAgain, totalSizeOfAllDataStreams); throw new ProtocolException($"Data stream size mismatch detected. Message Id: {messageId}, Stream ID: {id}, " + $"Expected length: {length}, Actual length claimed at end: {lengthAgain}. " + - $"Total length of all DataStreams to be sent is {deserializedStreams.Select(d => d.Length).Sum()}"); + $"Total length of all DataStreams to be sent is {totalSizeOfAllDataStreams}"); } ((IDataStreamInternal)dataStream).Received(tempFile); } - async Task CopyStreamToFileAsync(string messageId, Guid id, long length, Stream stream, IReadOnlyList deserializedStreams, CancellationToken cancellationToken) + async Task CopyStreamToFileAsync(Guid dataStreamId, long dataSteamLength, Stream networkStream, string messageId, long totalSizeOfAllDataStreams, CancellationToken cancellationToken) { - var path = Path.Combine(Path.GetTempPath(), string.Format("{0}_{1}", id.ToString(), Interlocked.Increment(ref streamCount))); - long bytesLeftToRead = length; - var totalDataStreamLength = deserializedStreams.Select(d => d.Length).Sum(); + var path = Path.Combine(Path.GetTempPath(), string.Format("{0}_{1}", dataStreamId.ToString(), Interlocked.Increment(ref streamCount))); + long bytesLeftToRead = dataSteamLength; #if !NETFRAMEWORK await #endif @@ -311,29 +311,30 @@ async Task CopyStreamToFileAsync(string messageId, Guid id, int read = 0; try { - read = await stream.ReadAsync(buffer, 0, (int)Math.Min(buffer.Length, bytesLeftToRead), cancellationToken); + read = await networkStream.ReadAsync(buffer, 0, (int)Math.Min(buffer.Length, bytesLeftToRead), cancellationToken); } catch (Exception ex) { log.WriteException(EventType.Error, "Data stream reading failed. Message ID: {0}, Stream ID: {1}, " + "Expected length: {2}, Actual bytes read: {3}. " + - "Total length of all DataStreams to be sent is {4}. Exception: {5}", + "Total length of all DataStreams to be sent is {4}.", ex, - messageId, id, length, length - bytesLeftToRead, totalDataStreamLength, ex.Message); + messageId, dataStreamId, dataSteamLength, dataSteamLength - bytesLeftToRead, totalSizeOfAllDataStreams); throw; } if (read == 0) { - var bytesRead = length - bytesLeftToRead; - log.Write(EventType.Error, "Data stream reading failed. Message ID: {0}, Stream ID: {1}, " + + var bytesRead = dataSteamLength - bytesLeftToRead; + log.Write(EventType.Error, "Data stream reading failed, we read zero bytes from the stream which implies EOF." + + "Message ID: {0}, Stream ID: {1}, " + "Expected length: {2}, Actual bytes read: {3}. " + - "Total length of all DataStreams to be sent is {4}. Exception: Stream closed prematurely.", - messageId, id, length, bytesRead, totalDataStreamLength); - throw new ProtocolException($"Data stream reading failed. Message Id: {messageId}, Stream ID: {id}, " + - $"Expected length: {length}, Actual bytes read: {bytesRead}. " + - $"Total length of all DataStreams to be sent is {totalDataStreamLength}. " + - $"Stream with length {length} was closed after only reading {bytesRead} bytes."); + "Total length of all DataStreams to be sent is {4}.", + messageId, dataStreamId, dataSteamLength, bytesRead, totalSizeOfAllDataStreams); + throw new ProtocolException($"Data stream reading failed. Message Id: {messageId}, Stream ID: {dataStreamId}, " + + $"Expected length: {dataSteamLength}, Actual bytes read: {bytesRead}. " + + $"Total length of all DataStreams to be sent is {totalSizeOfAllDataStreams}. " + + $"Stream with length {dataSteamLength} was closed after only reading {bytesRead} bytes."); } bytesLeftToRead -= read; @@ -366,7 +367,7 @@ async Task WriteEachStreamAsync(string messageId, IEnumerable stream await stream.WriteLongAsync(dataStream.Length, cancellationToken); await stream.FlushAsync(cancellationToken); - var byteCountingStream = new ByteCountingStream(stream, OnDispose.LeaveInputStreamOpen); + await using var byteCountingStream = new ByteCountingStream(stream, OnDispose.LeaveInputStreamOpen); await ((IDataStreamInternal)dataStream).TransmitAsync(byteCountingStream, cancellationToken); await stream.FlushAsync(cancellationToken);