diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index dd1b26b8..5ad4fc5c 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Linq; using System.Security.Authentication; @@ -220,7 +221,8 @@ await stream.WithReadTimeout( var (result, dataStreams) = await serializer.ReadMessageAsync(stream, cancellationToken); if (dataStreams.Count > 0) { - await ReadStreamsAsync(result?.Id ?? "", dataStreams, cancellationToken); + var messageId = result?.Id ?? "Unknown"; + await ReadStreamsAsync(messageId, dataStreams, cancellationToken); } log.Write(EventType.Diagnostic, "Received Message"); return result; @@ -263,40 +265,44 @@ async Task ExpectServerIdentityAsync(CancellationToken cancellationToken) async Task ReadStreamsAsync(string messageId, IReadOnlyList deserializedStreams, CancellationToken cancellationToken) { + var context = new DataStreamTransferContext(messageId) + { + TotalSizeOfAllDataStreams = deserializedStreams.Select(d => d.Length).Sum() + }; var expected = deserializedStreams.Count; for (var i = 0; i < expected; i++) { - await ReadStreamAsync(messageId, deserializedStreams, cancellationToken); + await ReadStreamAsync(context, deserializedStreams, cancellationToken); } } - async Task ReadStreamAsync(string messageId, IReadOnlyList deserializedStreams, CancellationToken cancellationToken) + async Task ReadStreamAsync(DataStreamTransferContext context, IReadOnlyList deserializedStreams, CancellationToken cancellationToken) { - var id = new Guid(await stream.ReadBytesAsync(16, cancellationToken)); var length = await stream.ReadInt64Async(cancellationToken); var dataStream = FindStreamById(deserializedStreams, id); - long totalSizeOfAllDataStreams = deserializedStreams.Select(d => d.Length).Sum(); - - var tempFile = await CopyStreamToFileAsync(id, length, stream, messageId, totalSizeOfAllDataStreams, cancellationToken); - + + var tempFile = await CopyStreamToFileAsync(id, length, stream, context, cancellationToken); + var lengthAgain = await stream.ReadInt64Async(cancellationToken); if (lengthAgain != length) { log.Write(EventType.Error, "Data stream size mismatch detected. Message ID: {0}, Stream ID: {1}, " + "Expected length: {2}, Actual length claimed at end: {3}. " + - "Total length of all DataStreams to be sent is {4}", - messageId, id, length, lengthAgain, totalSizeOfAllDataStreams); - throw new ProtocolException($"Data stream size mismatch detected. Message Id: {messageId}, Stream ID: {id}, " + + "Total length of all DataStreams to be sent is {4}. " + + "Time elapsed downloading all streams: {5}ms", + context.MessageId, id, length, lengthAgain, context.TotalSizeOfAllDataStreams, context.Stopwatch.ElapsedMilliseconds); + throw new ProtocolException($"Data stream size mismatch detected. Message Id: {context.MessageId}, Stream ID: {id}, " + $"Expected length: {length}, Actual length claimed at end: {lengthAgain}. " + - $"Total length of all DataStreams to be sent is {totalSizeOfAllDataStreams}"); + $"Total length of all DataStreams to be sent is {context.TotalSizeOfAllDataStreams}. " + + $"Time elapsed downloading all streams: {context.Stopwatch.ElapsedMilliseconds}ms"); } ((IDataStreamInternal)dataStream).Received(tempFile); } - - async Task CopyStreamToFileAsync(Guid dataStreamId, long dataSteamLength, Stream networkStream, string messageId, long totalSizeOfAllDataStreams, CancellationToken cancellationToken) + + async Task CopyStreamToFileAsync(Guid dataStreamId, long dataSteamLength, Stream networkStream, DataStreamTransferContext context, CancellationToken cancellationToken) { var path = Path.Combine(Path.GetTempPath(), string.Format("{0}_{1}", dataStreamId.ToString(), Interlocked.Increment(ref streamCount))); long bytesLeftToRead = dataSteamLength; @@ -317,26 +323,29 @@ async Task CopyStreamToFileAsync(Guid dataStreamId, long da { log.WriteException(EventType.Error, "Data stream reading failed. Message ID: {0}, Stream ID: {1}, " + "Expected length: {2}, Actual bytes read: {3}. " + - "Total length of all DataStreams to be sent is {4}.", + "Total length of all DataStreams to be sent is {4}. " + + "Time elapsed downloading all streams: {5}ms.", ex, - messageId, dataStreamId, dataSteamLength, dataSteamLength - bytesLeftToRead, totalSizeOfAllDataStreams); + context.MessageId, dataStreamId, dataSteamLength, dataSteamLength - bytesLeftToRead, context.TotalSizeOfAllDataStreams, context.Stopwatch.ElapsedMilliseconds); throw; } - + if (read == 0) { var bytesRead = dataSteamLength - bytesLeftToRead; log.Write(EventType.Error, "Data stream reading failed, we read zero bytes from the stream which implies EOF." + "Message ID: {0}, Stream ID: {1}, " + "Expected length: {2}, Actual bytes read: {3}. " + - "Total length of all DataStreams to be sent is {4}.", - messageId, dataStreamId, dataSteamLength, bytesRead, totalSizeOfAllDataStreams); - throw new ProtocolException($"Data stream reading failed. Message Id: {messageId}, Stream ID: {dataStreamId}, " + + "Total length of all DataStreams to be sent is {4}. " + + "Time elapsed downloading all streams: {5}ms.", + context.MessageId, dataStreamId, dataSteamLength, bytesRead, context.TotalSizeOfAllDataStreams, context.Stopwatch.ElapsedMilliseconds); + throw new ProtocolException($"Data stream reading failed. Message Id: {context.MessageId}, Stream ID: {dataStreamId}, " + $"Expected length: {dataSteamLength}, Actual bytes read: {bytesRead}. " + - $"Total length of all DataStreams to be sent is {totalSizeOfAllDataStreams}. " + - $"Stream with length {dataSteamLength} was closed after only reading {bytesRead} bytes."); + $"Total length of all DataStreams to be sent is {context.TotalSizeOfAllDataStreams}. " + + $"Stream with length {dataSteamLength} was closed after only reading {bytesRead} bytes. " + + $"Time elapsed downloading all streams: {context.Stopwatch.ElapsedMilliseconds}ms."); } - + bytesLeftToRead -= read; await fileStream.WriteAsync(buffer, 0, read, cancellationToken); } @@ -344,6 +353,19 @@ async Task CopyStreamToFileAsync(Guid dataStreamId, long da return new TemporaryFileStream(path, log); } + class DataStreamTransferContext + { + public DataStreamTransferContext(string messageId) + { + MessageId = messageId; + Stopwatch = Stopwatch.StartNew(); + } + + public string MessageId { get; } + public Stopwatch Stopwatch { get; } + public long TotalSizeOfAllDataStreams { get; set; } + } + static DataStream FindStreamById(IReadOnlyList deserializedStreams, Guid id) { var dataStream = deserializedStreams.FirstOrDefault(d => d.Id == id); @@ -359,8 +381,11 @@ static DataStream FindStreamById(IReadOnlyList deserializedStreams, async Task WriteEachStreamAsync(string messageId, IEnumerable streams, CancellationToken cancellationToken) { var streamsList = streams.ToList(); - var totalDataStreamLength = streamsList.Select(d => d.Length).Sum(); - + var context = new DataStreamTransferContext(messageId) + { + TotalSizeOfAllDataStreams = streamsList.Select(d => d.Length).Sum() + }; + foreach (var dataStream in streamsList) { await stream.WriteByteArrayAsync(dataStream.Id.ToByteArray(), cancellationToken); @@ -376,13 +401,13 @@ async Task WriteEachStreamAsync(string messageId, IEnumerable stream log.Write(EventType.Error, "Data stream size mismatch detected during send. Message ID: {0}, Stream ID: {1}, " + "Declared length: {2}, Actual bytes written: {3}. " + "Total length of all DataStreams to be sent is {4}", - messageId, dataStream.Id, dataStream.Length, byteCountingStream.BytesWritten, totalDataStreamLength); - + context.MessageId, dataStream.Id, dataStream.Length, byteCountingStream.BytesWritten, context.TotalSizeOfAllDataStreams); + if (halibutTimeoutsAndLimits.ThrowOnDataStreamSizeMismatch) { - throw new ProtocolException($"Data stream size mismatch detected during send. Message Id: {messageId}, Stream ID: {dataStream.Id}, " + + throw new ProtocolException($"Data stream size mismatch detected during send. Message Id: {context.MessageId}, Stream ID: {dataStream.Id}, " + $"Declared length: {dataStream.Length}, Actual bytes written: {byteCountingStream.BytesWritten}. " + - $"Total length of all DataStreams to be sent is {totalDataStreamLength}."); + $"Total length of all DataStreams to be sent is {context.TotalSizeOfAllDataStreams}."); } }