diff --git a/source/Halibut.Tests/DataStreamFixture.cs b/source/Halibut.Tests/DataStreamFixture.cs index f7af6b13a..645a3f0e6 100644 --- a/source/Halibut.Tests/DataStreamFixture.cs +++ b/source/Halibut.Tests/DataStreamFixture.cs @@ -1,7 +1,10 @@ -using System; +using System; +using System.Diagnostics; using System.IO; +using System.Linq; using System.Threading.Tasks; using FluentAssertions; +using Halibut.Diagnostics; using Halibut.Tests.Support; using Halibut.Tests.Support.TestAttributes; using Halibut.Tests.Support.TestCases; @@ -50,5 +53,198 @@ public async Task ASyncDataStreamWriter_CanBeUsedInAsync() await ((IDataStreamInternal) ds).TransmitAsync(memoryStream, CancellationToken); memoryStream.ToArray().Should().BeEquivalentTo(data); } + + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] + public async Task WhenSendingADataStream_AndWeSendMoreDataThanWeShould_ThenADescriptiveErrorIsLogged(ClientAndServiceTestCase clientAndServiceTestCase) + { + await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .WithStandardServices() + .AsLatestClientAndLatestServiceBuilder() + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder() + .Build() + .Apply(h => h.ThrowOnDataStreamSizeMismatch = false)) + .RecordingClientLogs(out var clientLogs) + .RecordingServiceLogs(out var serviceLogs) + .Build(CancellationToken)) + { + var readDataStreamService = clientAndService.CreateAsyncClient(); + + var actualData = new byte[100]; + new Random().NextBytes(actualData); + + var maliciousDataStream = new DataStream(10, async (stream, ct) => + { + await stream.WriteAsync(actualData, 0, actualData.Length, ct); + }); + + await AssertException.Throws(async () => + await readDataStreamService.SendDataAsync(maliciousDataStream)); + + var allClientLogs = clientLogs.Values.SelectMany(log => log.GetLogs()).ToList(); + allClientLogs.Should().Contain(log => + log.Type == EventType.Error && + log.FormattedMessage.Contains("Data stream size mismatch detected during send") && + log.FormattedMessage.Contains("Declared length: 10") && + log.FormattedMessage.Contains("Actual bytes written: 100")); + + var allServiceLogs = serviceLogs.Values.SelectMany(log => log.GetLogs()).ToList(); + allServiceLogs.Should().Contain(log => + log.Type == EventType.Error && + log.FormattedMessage.Contains("Data stream size mismatch detected") && + log.FormattedMessage.Contains("Message ID:") && + log.FormattedMessage.Contains("Stream ID:") && + log.FormattedMessage.Contains("Expected length: 10") && + log.FormattedMessage.Contains("Total length of all DataStreams")); + + } + } + + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] + public async Task WhenSendingADataStream_AndWeSendLessDataThanWeShould_ThenADescriptiveErrorIsLogged(ClientAndServiceTestCase clientAndServiceTestCase) + { + await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .WithStandardServices() + .AsLatestClientAndLatestServiceBuilder() + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder() + .Build() + .Apply(h => h.ThrowOnDataStreamSizeMismatch = false)) + .RecordingClientLogs(out var clientLogs) + .RecordingServiceLogs(out var serviceLogs) + .Build(CancellationToken)) + { + var readDataStreamService = clientAndService.CreateAsyncClient(); + + var actualData = new byte[10]; + new Random().NextBytes(actualData); + + var underSizedDataStream = new DataStream(100, async (stream, ct) => + { + await stream.WriteAsync(actualData, 0, actualData.Length, ct); + }); + + await AssertException.Throws(async () => + await readDataStreamService.SendDataAsync(underSizedDataStream)); + + var allClientLogs = clientLogs.Values.SelectMany(log => log.GetLogs()).ToList(); + allClientLogs.Should().Contain(log => + log.Type == EventType.Error && + log.FormattedMessage.Contains("Data stream size mismatch detected during send") && + log.FormattedMessage.Contains("Declared length: 100") && + log.FormattedMessage.Contains("Actual bytes written: 10")); + } + } + + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] + public async Task WhenSendingADataStream_AndWeSendMoreDataThanWeShould_AndThrowOnDataStreamSizeMismatchIsEnabled_ThenItThrows(ClientAndServiceTestCase clientAndServiceTestCase) + { + await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .WithStandardServices() + .AsLatestClientAndLatestServiceBuilder() + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimits + { + ThrowOnDataStreamSizeMismatch = true + }) + .RecordingClientLogs(out var clientLogs) + .Build(CancellationToken)) + { + var readDataStreamService = clientAndService.CreateAsyncClient(); + + var actualData = new byte[100]; + new Random().NextBytes(actualData); + + var maliciousDataStream = new DataStream(10, async (stream, ct) => + { + await stream.WriteAsync(actualData, 0, actualData.Length, ct); + }); + + var exception = await AssertException.Throws(async () => + await readDataStreamService.SendDataAsync(maliciousDataStream)); + + exception.And.Message.Should().Contain("Data stream size mismatch"); + } + } + + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] + public async Task WhenSendingADataStream_AndWeSendLessDataThanWeShould_AndThrowOnDataStreamSizeMismatchIsEnabled_ThenItThrows(ClientAndServiceTestCase clientAndServiceTestCase) + { + await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .WithStandardServices() + .AsLatestClientAndLatestServiceBuilder() + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimits + { + ThrowOnDataStreamSizeMismatch = true + }) + .RecordingClientLogs(out var clientLogs) + .Build(CancellationToken)) + { + var readDataStreamService = clientAndService.CreateAsyncClient(); + + var actualData = new byte[10]; + new Random().NextBytes(actualData); + + var underSizedDataStream = new DataStream(100, async (stream, ct) => + { + await stream.WriteAsync(actualData, 0, actualData.Length, ct); + }); + + var exception = await AssertException.Throws(async () => + await readDataStreamService.SendDataAsync(underSizedDataStream)); + + exception.And.Message.Should().Contain("Data stream size mismatch"); + } + } + + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] + public async Task WhenSendingADataStream_AndWeSendLessDataThanWeShould_AndThrowOnDataStreamSizeMismatchIsEnabled_ThenSecondRequestSucceedsQuickly(ClientAndServiceTestCase clientAndServiceTestCase) + { + await using (var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .WithStandardServices() + .AsLatestClientAndLatestServiceBuilder() + .WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimits + { + ThrowOnDataStreamSizeMismatch = true, + TcpClientReceiveResponseTimeout = TimeSpan.FromSeconds(60) + }) + .RecordingClientLogs(out var clientLogs) + .Build(CancellationToken)) + { + var readDataStreamService = clientAndService.CreateAsyncClient(); + + var underSizedData = new byte[10]; + new Random().NextBytes(underSizedData); + + var underSizedDataStream = new DataStream(100, async (stream, ct) => + { + await stream.WriteAsync(underSizedData, 0, underSizedData.Length, ct); + }); + + var stopwatch = Stopwatch.StartNew(); + + await AssertException.Throws(async () => + await readDataStreamService.SendDataAsync(underSizedDataStream)); + + var correctData = new byte[50]; + new Random().NextBytes(correctData); + var correctDataStream = new DataStream(50, async (stream, ct) => + { + await stream.WriteAsync(correctData, 0, correctData.Length, ct); + }); + + var received = await readDataStreamService.SendDataAsync(correctDataStream); + + var secondRequestTime = stopwatch.Elapsed; + received.Should().Be(50); + Logger.Information("All requests took: {Time}s", secondRequestTime); + secondRequestTime.Should().BeLessThan(TimeSpan.FromSeconds(30), + "We should detect the wrong size DataStream, this results in an immediate exception being raised." + + " This will result in the client killing the connection, which will result in the client also receiving a EOF. " + + "This will result in both sides quickly re-connecting allowing another RPC to be re-attempted quickly."); + } + } } } \ No newline at end of file diff --git a/source/Halibut.Tests/Diagnostics/ExceptionReturnedByHalibutProxyExtensionMethodFixture.cs b/source/Halibut.Tests/Diagnostics/ExceptionReturnedByHalibutProxyExtensionMethodFixture.cs index e6482d5de..35934db94 100644 --- a/source/Halibut.Tests/Diagnostics/ExceptionReturnedByHalibutProxyExtensionMethodFixture.cs +++ b/source/Halibut.Tests/Diagnostics/ExceptionReturnedByHalibutProxyExtensionMethodFixture.cs @@ -122,7 +122,8 @@ public async Task WhenTheConnectionPausesWaitingForAResponse(ClientAndServiceTes .Be(HalibutNetworkExceptionType.IsNetworkError); exception.Message.Should().ContainAny( - "Unable to read data from the transport connection: Connection timed out.", + "Unable to read data from the transport connection: Connection timed out.", + "Unable to read data from the transport connection: Operation timed out.", "Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond."); } } diff --git a/source/Halibut.Tests/ExceptionContractFixture.cs b/source/Halibut.Tests/ExceptionContractFixture.cs index 9222f01ed..3589e9766 100644 --- a/source/Halibut.Tests/ExceptionContractFixture.cs +++ b/source/Halibut.Tests/ExceptionContractFixture.cs @@ -60,6 +60,7 @@ public async Task WhenThePollingRequestHasBegunTransfer_AndATcpTimeoutIsReached_ (await AssertException.Throws(async () => await doSomeActionClient.ActionAsync(new(CancellationToken)))) .And.Message.Should().ContainAny( "Unable to read data from the transport connection: Connection timed out.", + "Unable to read data from the transport connection: Operation timed out.", "Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond"); waitSemaphore.Release(); diff --git a/source/Halibut.Tests/FailureModesFixture.cs b/source/Halibut.Tests/FailureModesFixture.cs index 312de19a1..e074c024a 100644 --- a/source/Halibut.Tests/FailureModesFixture.cs +++ b/source/Halibut.Tests/FailureModesFixture.cs @@ -131,7 +131,11 @@ public async Task FailOnInvalidHostname() else { // Failed with: An error occurred when sending a request to 'https://sduj08ud9382ujd98dw9fh934hdj2389u982:8000/', before the request could begin: Name or service not known, but found False. - new [] {"No such device or address", "Resource temporarily unavailable", "Name or service not known"}.Any(message.Contains).Should().BeTrue($"Message does not match known strings: {message}"); + new [] {"No such device or address", + "Resource temporarily unavailable", + "Name or service not known", + "nodename nor servname provided, or not known" + }.Any(message.Contains).Should().BeTrue($"Message does not match known strings: {message}"); } } } diff --git a/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs b/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs index d793ff02e..54f988c78 100644 --- a/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs +++ b/source/Halibut.Tests/PollingServiceTimeoutsFixture.cs @@ -120,6 +120,7 @@ public async Task WhenThePollingRequestHasBegunTransfer_AndWeTimeoutWaitingForTh var stopwatch = Stopwatch.StartNew(); var exception = (await AssertException.Throws(async () => await doSomeActionClient.ActionAsync(new(CancellationToken)))).And; exception.Message.Should().ContainAny( + "Unable to read data from the transport connection: Operation timed out.", "Unable to read data from the transport connection: Connection timed out.", "Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond"); exception.ConnectionState.Should().Be(ConnectionState.Unknown); diff --git a/source/Halibut.Tests/ProxyFixture.cs b/source/Halibut.Tests/ProxyFixture.cs index 142656563..907057d93 100644 --- a/source/Halibut.Tests/ProxyFixture.cs +++ b/source/Halibut.Tests/ProxyFixture.cs @@ -85,6 +85,7 @@ public async Task ClientTimesOutConnectingToAProxy_WhenTheProxyHangsDuringConnec "No connection could be made because the target machine actively refused it", "the polling endpoint did not collect the request within the allowed time", "Unable to read data from the transport connection: Connection timed out.", + "Unable to read data from the transport connection: Operation timed out.", "A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.", "A timeout while waiting for the proxy server at"); ; diff --git a/source/Halibut.Tests/Timeouts/ReceiveResponseTimeoutTests.cs b/source/Halibut.Tests/Timeouts/ReceiveResponseTimeoutTests.cs index de4f7478c..b67006783 100644 --- a/source/Halibut.Tests/Timeouts/ReceiveResponseTimeoutTests.cs +++ b/source/Halibut.Tests/Timeouts/ReceiveResponseTimeoutTests.cs @@ -39,6 +39,7 @@ public async Task WhenRpcExecutionExceedsReceiveResponseTimeout_ThenInitialDataR (await AssertionExtensions.Should(() => doSomeActionClient.ActionAsync()).ThrowAsync()) .And.Message.Should().ContainAny( "Connection timed out.", + "Operation timed out.", "A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond"); } } @@ -85,6 +86,7 @@ public async Task WhenRpcExecutionIsWithinReceiveResponseTimeout_ButSubsequentDa (await AssertionExtensions.Should(() => lastServiceClient.GetListAsync()).ThrowAsync()) .And.Message.Should().ContainAny( "Connection timed out.", + "Operation timed out.", "A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond", "Unable to read data from the transport connection: Connection reset by peer"); } @@ -117,6 +119,7 @@ public async Task WhenRpcExecutionIsWithinReceiveResponseTimeout_ButDataStreamDa Logger.Information(e, "The received expected exception, we were expecting one"); e.Message.Should().ContainAny( "Connection timed out.", + "Operation timed out.", "A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond"); } } diff --git a/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs b/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs index 419ef6c10..bde4abe0f 100644 --- a/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs +++ b/source/Halibut.Tests/Timeouts/SendingAndReceivingRequestMessagesTimeoutsFixture.cs @@ -202,6 +202,7 @@ static void AssertExceptionLooksLikeAWriteTimeout(HalibutClientException? e) { e!.Message.Should().ContainAny( "Unable to write data to the transport connection: Connection timed out.", + "Unable to write data to the transport connection: Operation timed out.", " Unable to write data to the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond"); e.IsNetworkError().Should().Be(HalibutNetworkExceptionType.IsNetworkError); @@ -210,6 +211,7 @@ static void AssertExceptionLooksLikeAWriteTimeout(HalibutClientException? e) static void AssertExceptionMessageLooksLikeAReadTimeout(HalibutClientException? e) { e!.Message.Should().ContainAny( + "Unable to read data from the transport connection: Operation timed out.", "Unable to read data from the transport connection: Connection timed out.", "Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond."); diff --git a/source/Halibut.Tests/Transport/Protocol/MessageSerializerFixture.cs b/source/Halibut.Tests/Transport/Protocol/MessageSerializerFixture.cs index fe3ec7afe..c36acdd85 100644 --- a/source/Halibut.Tests/Transport/Protocol/MessageSerializerFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/MessageSerializerFixture.cs @@ -104,7 +104,8 @@ public async Task BackwardsCompatibility_ExtraParametersInServerErrorAreIgnored( await using (var stream = new RewindableBufferStream(new MemoryStream(Convert.FromBase64String(base64Bson)))) { var result = await ReadMessage(sut, stream); - result.Error.Should().NotBeNull(); + result.Should().NotBeNull(); + result!.Error.Should().NotBeNull(); result.Error!.Message = "foo"; result.Error.HalibutErrorType = "MethodNotFoundHalibutClientException"; } @@ -303,7 +304,7 @@ static byte[] DeflateString(string s) } } - async Task ReadMessage(MessageSerializer messageSerializer, RewindableBufferStream rewindableBufferStream) + async Task ReadMessage(MessageSerializer messageSerializer, RewindableBufferStream rewindableBufferStream) { return (await messageSerializer.ReadMessageAsync(rewindableBufferStream, CancellationToken)).Message; } diff --git a/source/Halibut.Tests/Transport/Streams/NetworkTimeoutStreamFixture.cs b/source/Halibut.Tests/Transport/Streams/NetworkTimeoutStreamFixture.cs index 987bc8b55..275097c09 100644 --- a/source/Halibut.Tests/Transport/Streams/NetworkTimeoutStreamFixture.cs +++ b/source/Halibut.Tests/Transport/Streams/NetworkTimeoutStreamFixture.cs @@ -66,6 +66,7 @@ public async Task ReadCalls_ShouldTimeout_AndCloseTheStream_AndThrowExceptionTha actualException.Should().NotBeNull().And.BeOfType(); actualException!.Message.Should().ContainAny( "Unable to read data from the transport connection: Connection timed out.", + "Unable to read data from the transport connection: Operation timed out.", "Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond."); stopWatch.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(10)); @@ -177,6 +178,7 @@ public async Task WriteCalls_ShouldTimeout_AndCloseTheStream_AndThrowExceptionTh actualException.Should().NotBeNull().And.BeOfType(); actualException!.Message.Should().ContainAny( "Unable to write data to the transport connection: Connection timed out.", + "Unable to write data to the transport connection: Operation timed out.", "Unable to write data to the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond."); stopWatch.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(10)); @@ -311,6 +313,7 @@ public async Task FlushAsync_ShouldTimeout_AndCloseTheStream_AndThrowExceptionTh (await AssertException.Throws(async () => await sut.FlushAsync(CancellationToken))) .And.Message.Should().ContainAny( "Unable to write data to the transport connection: Connection timed out.", + "Unable to write data to the transport connection: Operation timed out.", "Unable to write data to the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond."); AssertStreamWasClosed(StreamMethod.Async, callCountingStream); @@ -444,6 +447,7 @@ public async Task CopyToCalls_ShouldTimeout_AndCloseTheStream_AndThrowExceptionT actualException.Should().NotBeNull().And.BeOfType(); actualException!.Message.Should().ContainAny( "Unable to read data from the transport connection: Connection timed out.", + "Unable to read data from the transport connection: Operation timed out.", "Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond."); stopWatch.Elapsed.Should().BeLessThan(TimeSpan.FromSeconds(10)); diff --git a/source/Halibut/Diagnostics/ExceptionReturnedByHalibutProxyExtensionMethod.cs b/source/Halibut/Diagnostics/ExceptionReturnedByHalibutProxyExtensionMethod.cs index 011eefa37..9c0a60148 100644 --- a/source/Halibut/Diagnostics/ExceptionReturnedByHalibutProxyExtensionMethod.cs +++ b/source/Halibut/Diagnostics/ExceptionReturnedByHalibutProxyExtensionMethod.cs @@ -113,6 +113,8 @@ public static HalibutNetworkExceptionType IsNetworkError(this Exception exceptio if (exception.Message.Contains("The I/O operation has been aborted because of either a thread exit or an application request")) return HalibutNetworkExceptionType.IsNetworkError; if (exception.Message.Contains("A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.")) return HalibutNetworkExceptionType.IsNetworkError; if (exception.Message.Contains("The remote party closed the WebSocket connection without completing the close handshake.")) return HalibutNetworkExceptionType.IsNetworkError; + if (exception.Message.Contains("Unable to read data from the transport connection: Operation timed out.")) return HalibutNetworkExceptionType.IsNetworkError; + if (exception.Message.Contains("Unable to write data to the transport connection: Operation timed out.")) return HalibutNetworkExceptionType.IsNetworkError; } diff --git a/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs b/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs index 80d99653a..9257838bc 100644 --- a/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs +++ b/source/Halibut/Diagnostics/HalibutTimeoutsAndLimits.cs @@ -171,6 +171,12 @@ public TimeSpan SafeTcpClientPooledConnectionTimeout /// public bool TcpNoDelay { get; set; } + /// + /// When true (default), an exception will be thrown if a data stream's actual size doesn't match the declared size. + /// When false, size mismatches are only logged at Error level. + /// + public bool ThrowOnDataStreamSizeMismatch { get; set; } = true; + /// /// In the future these will become the default /// diff --git a/source/Halibut/Transport/Protocol/IHalibutMessage.cs b/source/Halibut/Transport/Protocol/IHalibutMessage.cs new file mode 100644 index 000000000..06abff703 --- /dev/null +++ b/source/Halibut/Transport/Protocol/IHalibutMessage.cs @@ -0,0 +1,7 @@ +namespace Halibut.Transport.Protocol +{ + public interface IHalibutMessage + { + public string Id { get; } + } +} \ No newline at end of file diff --git a/source/Halibut/Transport/Protocol/IMessageSerializer.cs b/source/Halibut/Transport/Protocol/IMessageSerializer.cs index 84a048eb4..474d1cb0c 100644 --- a/source/Halibut/Transport/Protocol/IMessageSerializer.cs +++ b/source/Halibut/Transport/Protocol/IMessageSerializer.cs @@ -10,6 +10,6 @@ namespace Halibut.Transport.Protocol public interface IMessageSerializer { Task> WriteMessageAsync(Stream stream, T message, CancellationToken cancellationToken); - Task<(T Message, IReadOnlyList DataStreams)> ReadMessageAsync(RewindableBufferStream stream, CancellationToken cancellationToken); + Task<(T? Message, IReadOnlyList DataStreams)> ReadMessageAsync(RewindableBufferStream stream, CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index 334462a4c..dd1b26b8c 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -189,7 +189,8 @@ public async Task ReadRemoteIdentityAsync(CancellationToken canc public async Task SendAsync(T message, CancellationToken cancellationToken) { var serializedStreams = await serializer.WriteMessageAsync(stream, message, cancellationToken); - await WriteEachStreamAsync(serializedStreams, cancellationToken); + var messageId = (message as IHalibutMessage)?.Id ?? "unknown"; + await WriteEachStreamAsync(messageId, serializedStreams, cancellationToken); log.Write(EventType.Diagnostic, "Sent message"); } @@ -214,9 +215,13 @@ await stream.WithReadTimeout( } async Task ReceiveAsync(CancellationToken cancellationToken) + where T : IHalibutMessage { var (result, dataStreams) = await serializer.ReadMessageAsync(stream, cancellationToken); - await ReadStreamsAsync(dataStreams, cancellationToken); + if (dataStreams.Count > 0) + { + await ReadStreamsAsync(result?.Id ?? "", dataStreams, cancellationToken); + } log.Write(EventType.Diagnostic, "Received Message"); return result; } @@ -256,36 +261,45 @@ async Task ExpectServerIdentityAsync(CancellationToken cancellationToken) } } - async Task ReadStreamsAsync(IReadOnlyList deserializedStreams, CancellationToken cancellationToken) + async Task ReadStreamsAsync(string messageId, IReadOnlyList deserializedStreams, CancellationToken cancellationToken) { var expected = deserializedStreams.Count; for (var i = 0; i < expected; i++) { - await ReadStreamAsync(deserializedStreams, cancellationToken); + await ReadStreamAsync(messageId, deserializedStreams, cancellationToken); } } - async Task ReadStreamAsync(IReadOnlyList deserializedStreams, CancellationToken cancellationToken) + async Task ReadStreamAsync(string messageId, IReadOnlyList deserializedStreams, CancellationToken cancellationToken) { var id = new Guid(await stream.ReadBytesAsync(16, cancellationToken)); var length = await stream.ReadInt64Async(cancellationToken); var dataStream = FindStreamById(deserializedStreams, id); - var tempFile = await CopyStreamToFileAsync(id, length, stream, cancellationToken); + long totalSizeOfAllDataStreams = deserializedStreams.Select(d => d.Length).Sum(); + + var tempFile = await CopyStreamToFileAsync(id, length, stream, messageId, totalSizeOfAllDataStreams, cancellationToken); + var lengthAgain = await stream.ReadInt64Async(cancellationToken); if (lengthAgain != length) { - throw new ProtocolException("There was a problem receiving a file stream: the length of the file was expected to be: " + length + " but less data was actually sent. This can happen if the remote party is sending a stream but the stream had already been partially read, or if the stream was being reused between calls."); + log.Write(EventType.Error, "Data stream size mismatch detected. Message ID: {0}, Stream ID: {1}, " + + "Expected length: {2}, Actual length claimed at end: {3}. " + + "Total length of all DataStreams to be sent is {4}", + messageId, id, length, lengthAgain, totalSizeOfAllDataStreams); + throw new ProtocolException($"Data stream size mismatch detected. Message Id: {messageId}, Stream ID: {id}, " + + $"Expected length: {length}, Actual length claimed at end: {lengthAgain}. " + + $"Total length of all DataStreams to be sent is {totalSizeOfAllDataStreams}"); } ((IDataStreamInternal)dataStream).Received(tempFile); } - async Task CopyStreamToFileAsync(Guid id, long length, Stream stream, CancellationToken cancellationToken) + async Task CopyStreamToFileAsync(Guid dataStreamId, long dataSteamLength, Stream networkStream, string messageId, long totalSizeOfAllDataStreams, CancellationToken cancellationToken) { - var path = Path.Combine(Path.GetTempPath(), string.Format("{0}_{1}", id.ToString(), Interlocked.Increment(ref streamCount))); - long bytesLeftToRead = length; + var path = Path.Combine(Path.GetTempPath(), string.Format("{0}_{1}", dataStreamId.ToString(), Interlocked.Increment(ref streamCount))); + long bytesLeftToRead = dataSteamLength; #if !NETFRAMEWORK await #endif @@ -294,8 +308,35 @@ async Task CopyStreamToFileAsync(Guid id, long length, Stre var buffer = new byte[65*1024]; while (bytesLeftToRead > 0) { - var read = await stream.ReadAsync(buffer, 0, (int)Math.Min(buffer.Length, bytesLeftToRead), cancellationToken); - if (read == 0) throw new ProtocolException($"Stream with length {length} was closed after only reading {length - bytesLeftToRead} bytes."); + int read = 0; + try + { + read = await networkStream.ReadAsync(buffer, 0, (int)Math.Min(buffer.Length, bytesLeftToRead), cancellationToken); + } + catch (Exception ex) + { + log.WriteException(EventType.Error, "Data stream reading failed. Message ID: {0}, Stream ID: {1}, " + + "Expected length: {2}, Actual bytes read: {3}. " + + "Total length of all DataStreams to be sent is {4}.", + ex, + messageId, dataStreamId, dataSteamLength, dataSteamLength - bytesLeftToRead, totalSizeOfAllDataStreams); + throw; + } + + if (read == 0) + { + var bytesRead = dataSteamLength - bytesLeftToRead; + log.Write(EventType.Error, "Data stream reading failed, we read zero bytes from the stream which implies EOF." + + "Message ID: {0}, Stream ID: {1}, " + + "Expected length: {2}, Actual bytes read: {3}. " + + "Total length of all DataStreams to be sent is {4}.", + messageId, dataStreamId, dataSteamLength, bytesRead, totalSizeOfAllDataStreams); + throw new ProtocolException($"Data stream reading failed. Message Id: {messageId}, Stream ID: {dataStreamId}, " + + $"Expected length: {dataSteamLength}, Actual bytes read: {bytesRead}. " + + $"Total length of all DataStreams to be sent is {totalSizeOfAllDataStreams}. " + + $"Stream with length {dataSteamLength} was closed after only reading {bytesRead} bytes."); + } + bytesLeftToRead -= read; await fileStream.WriteAsync(buffer, 0, read, cancellationToken); } @@ -315,17 +356,36 @@ static DataStream FindStreamById(IReadOnlyList deserializedStreams, return dataStream; } - async Task WriteEachStreamAsync(IEnumerable streams, CancellationToken cancellationToken) + async Task WriteEachStreamAsync(string messageId, IEnumerable streams, CancellationToken cancellationToken) { - foreach (var dataStream in streams) + var streamsList = streams.ToList(); + var totalDataStreamLength = streamsList.Select(d => d.Length).Sum(); + + foreach (var dataStream in streamsList) { await stream.WriteByteArrayAsync(dataStream.Id.ToByteArray(), cancellationToken); await stream.WriteLongAsync(dataStream.Length, cancellationToken); await stream.FlushAsync(cancellationToken); - await ((IDataStreamInternal)dataStream).TransmitAsync(stream, cancellationToken); + await using var byteCountingStream = new ByteCountingStream(stream, OnDispose.LeaveInputStreamOpen); + await ((IDataStreamInternal)dataStream).TransmitAsync(byteCountingStream, cancellationToken); await stream.FlushAsync(cancellationToken); + if (byteCountingStream.BytesWritten != dataStream.Length) + { + log.Write(EventType.Error, "Data stream size mismatch detected during send. Message ID: {0}, Stream ID: {1}, " + + "Declared length: {2}, Actual bytes written: {3}. " + + "Total length of all DataStreams to be sent is {4}", + messageId, dataStream.Id, dataStream.Length, byteCountingStream.BytesWritten, totalDataStreamLength); + + if (halibutTimeoutsAndLimits.ThrowOnDataStreamSizeMismatch) + { + throw new ProtocolException($"Data stream size mismatch detected during send. Message Id: {messageId}, Stream ID: {dataStream.Id}, " + + $"Declared length: {dataStream.Length}, Actual bytes written: {byteCountingStream.BytesWritten}. " + + $"Total length of all DataStreams to be sent is {totalDataStreamLength}."); + } + } + await stream.WriteLongAsync(dataStream.Length, cancellationToken); await stream.FlushAsync(cancellationToken); } diff --git a/source/Halibut/Transport/Protocol/MessageSerializer.cs b/source/Halibut/Transport/Protocol/MessageSerializer.cs index 201b182d6..f9de44442 100644 --- a/source/Halibut/Transport/Protocol/MessageSerializer.cs +++ b/source/Halibut/Transport/Protocol/MessageSerializer.cs @@ -64,7 +64,7 @@ public async Task> WriteMessageAsync(Stream stream, return serializedStreams; } - public async Task<(T Message, IReadOnlyList DataStreams)> ReadMessageAsync(RewindableBufferStream stream, CancellationToken cancellationToken) + public async Task<(T? Message, IReadOnlyList DataStreams)> ReadMessageAsync(RewindableBufferStream stream, CancellationToken cancellationToken) { await using (var errorRecordingStream = new ErrorRecordingStream(stream, closeInner: false)) { diff --git a/source/Halibut/Transport/Protocol/RequestMessage.cs b/source/Halibut/Transport/Protocol/RequestMessage.cs index 25daec614..d591c5bb9 100644 --- a/source/Halibut/Transport/Protocol/RequestMessage.cs +++ b/source/Halibut/Transport/Protocol/RequestMessage.cs @@ -3,7 +3,7 @@ namespace Halibut.Transport.Protocol { - public class RequestMessage + public class RequestMessage : IHalibutMessage { #pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. [JsonProperty("id")] diff --git a/source/Halibut/Transport/Protocol/ResponseMessage.cs b/source/Halibut/Transport/Protocol/ResponseMessage.cs index f73dc6ca8..ee7102aea 100644 --- a/source/Halibut/Transport/Protocol/ResponseMessage.cs +++ b/source/Halibut/Transport/Protocol/ResponseMessage.cs @@ -5,7 +5,7 @@ namespace Halibut.Transport.Protocol { - public class ResponseMessage + public class ResponseMessage : IHalibutMessage { [JsonProperty("id")] #pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.