Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 54 additions & 29 deletions source/Halibut/Transport/Protocol/MessageExchangeStream.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Security.Authentication;
Expand Down Expand Up @@ -220,7 +221,8 @@ await stream.WithReadTimeout(
var (result, dataStreams) = await serializer.ReadMessageAsync<T>(stream, cancellationToken);
if (dataStreams.Count > 0)
{
await ReadStreamsAsync(result?.Id ?? "", dataStreams, cancellationToken);
var messageId = result?.Id ?? "Unknown";
await ReadStreamsAsync(messageId, dataStreams, cancellationToken);
}
log.Write(EventType.Diagnostic, "Received Message");
return result;
Expand Down Expand Up @@ -263,40 +265,44 @@ async Task ExpectServerIdentityAsync(CancellationToken cancellationToken)

async Task ReadStreamsAsync(string messageId, IReadOnlyList<DataStream> deserializedStreams, CancellationToken cancellationToken)
{
var context = new DataStreamTransferContext(messageId)
{
TotalSizeOfAllDataStreams = deserializedStreams.Select(d => d.Length).Sum()
};
var expected = deserializedStreams.Count;

for (var i = 0; i < expected; i++)
{
await ReadStreamAsync(messageId, deserializedStreams, cancellationToken);
await ReadStreamAsync(context, deserializedStreams, cancellationToken);
}
}

async Task ReadStreamAsync(string messageId, IReadOnlyList<DataStream> deserializedStreams, CancellationToken cancellationToken)
async Task ReadStreamAsync(DataStreamTransferContext context, IReadOnlyList<DataStream> deserializedStreams, CancellationToken cancellationToken)
{

var id = new Guid(await stream.ReadBytesAsync(16, cancellationToken));
var length = await stream.ReadInt64Async(cancellationToken);
var dataStream = FindStreamById(deserializedStreams, id);
long totalSizeOfAllDataStreams = deserializedStreams.Select(d => d.Length).Sum();

var tempFile = await CopyStreamToFileAsync(id, length, stream, messageId, totalSizeOfAllDataStreams, cancellationToken);


var tempFile = await CopyStreamToFileAsync(id, length, stream, context, cancellationToken);

var lengthAgain = await stream.ReadInt64Async(cancellationToken);
if (lengthAgain != length)
{
log.Write(EventType.Error, "Data stream size mismatch detected. Message ID: {0}, Stream ID: {1}, " +
"Expected length: {2}, Actual length claimed at end: {3}. " +
"Total length of all DataStreams to be sent is {4}",
messageId, id, length, lengthAgain, totalSizeOfAllDataStreams);
throw new ProtocolException($"Data stream size mismatch detected. Message Id: {messageId}, Stream ID: {id}, " +
"Total length of all DataStreams to be sent is {4}. " +
"Time elapsed downloading all streams: {5}ms",
context.MessageId, id, length, lengthAgain, context.TotalSizeOfAllDataStreams, context.Stopwatch.ElapsedMilliseconds);
throw new ProtocolException($"Data stream size mismatch detected. Message Id: {context.MessageId}, Stream ID: {id}, " +
$"Expected length: {length}, Actual length claimed at end: {lengthAgain}. " +
$"Total length of all DataStreams to be sent is {totalSizeOfAllDataStreams}");
$"Total length of all DataStreams to be sent is {context.TotalSizeOfAllDataStreams}. " +
$"Time elapsed downloading all streams: {context.Stopwatch.ElapsedMilliseconds}ms");
}

((IDataStreamInternal)dataStream).Received(tempFile);
}
async Task<TemporaryFileStream> CopyStreamToFileAsync(Guid dataStreamId, long dataSteamLength, Stream networkStream, string messageId, long totalSizeOfAllDataStreams, CancellationToken cancellationToken)

async Task<TemporaryFileStream> CopyStreamToFileAsync(Guid dataStreamId, long dataSteamLength, Stream networkStream, DataStreamTransferContext context, CancellationToken cancellationToken)
{
var path = Path.Combine(Path.GetTempPath(), string.Format("{0}_{1}", dataStreamId.ToString(), Interlocked.Increment(ref streamCount)));
long bytesLeftToRead = dataSteamLength;
Expand All @@ -317,33 +323,49 @@ async Task<TemporaryFileStream> CopyStreamToFileAsync(Guid dataStreamId, long da
{
log.WriteException(EventType.Error, "Data stream reading failed. Message ID: {0}, Stream ID: {1}, " +
"Expected length: {2}, Actual bytes read: {3}. " +
"Total length of all DataStreams to be sent is {4}.",
"Total length of all DataStreams to be sent is {4}. " +
"Time elapsed downloading all streams: {5}ms.",
ex,
messageId, dataStreamId, dataSteamLength, dataSteamLength - bytesLeftToRead, totalSizeOfAllDataStreams);
context.MessageId, dataStreamId, dataSteamLength, dataSteamLength - bytesLeftToRead, context.TotalSizeOfAllDataStreams, context.Stopwatch.ElapsedMilliseconds);
throw;
}

if (read == 0)
{
var bytesRead = dataSteamLength - bytesLeftToRead;
log.Write(EventType.Error, "Data stream reading failed, we read zero bytes from the stream which implies EOF." +
"Message ID: {0}, Stream ID: {1}, " +
"Expected length: {2}, Actual bytes read: {3}. " +
"Total length of all DataStreams to be sent is {4}.",
messageId, dataStreamId, dataSteamLength, bytesRead, totalSizeOfAllDataStreams);
throw new ProtocolException($"Data stream reading failed. Message Id: {messageId}, Stream ID: {dataStreamId}, " +
"Total length of all DataStreams to be sent is {4}. " +
"Time elapsed downloading all streams: {5}ms.",
context.MessageId, dataStreamId, dataSteamLength, bytesRead, context.TotalSizeOfAllDataStreams, context.Stopwatch.ElapsedMilliseconds);
throw new ProtocolException($"Data stream reading failed. Message Id: {context.MessageId}, Stream ID: {dataStreamId}, " +
$"Expected length: {dataSteamLength}, Actual bytes read: {bytesRead}. " +
$"Total length of all DataStreams to be sent is {totalSizeOfAllDataStreams}. " +
$"Stream with length {dataSteamLength} was closed after only reading {bytesRead} bytes.");
$"Total length of all DataStreams to be sent is {context.TotalSizeOfAllDataStreams}. " +
$"Stream with length {dataSteamLength} was closed after only reading {bytesRead} bytes. " +
$"Time elapsed downloading all streams: {context.Stopwatch.ElapsedMilliseconds}ms.");
}

bytesLeftToRead -= read;
await fileStream.WriteAsync(buffer, 0, read, cancellationToken);
}
}
return new TemporaryFileStream(path, log);
}

class DataStreamTransferContext
{
public DataStreamTransferContext(string messageId)
{
MessageId = messageId;
Stopwatch = Stopwatch.StartNew();
}

public string MessageId { get; }
public Stopwatch Stopwatch { get; }
public long TotalSizeOfAllDataStreams { get; set; }
}

static DataStream FindStreamById(IReadOnlyList<DataStream> deserializedStreams, Guid id)
{
var dataStream = deserializedStreams.FirstOrDefault(d => d.Id == id);
Expand All @@ -359,8 +381,11 @@ static DataStream FindStreamById(IReadOnlyList<DataStream> deserializedStreams,
async Task WriteEachStreamAsync(string messageId, IEnumerable<DataStream> streams, CancellationToken cancellationToken)
{
var streamsList = streams.ToList();
var totalDataStreamLength = streamsList.Select(d => d.Length).Sum();

var context = new DataStreamTransferContext(messageId)
{
TotalSizeOfAllDataStreams = streamsList.Select(d => d.Length).Sum()
};

foreach (var dataStream in streamsList)
{
await stream.WriteByteArrayAsync(dataStream.Id.ToByteArray(), cancellationToken);
Expand All @@ -376,13 +401,13 @@ async Task WriteEachStreamAsync(string messageId, IEnumerable<DataStream> stream
log.Write(EventType.Error, "Data stream size mismatch detected during send. Message ID: {0}, Stream ID: {1}, " +
"Declared length: {2}, Actual bytes written: {3}. " +
"Total length of all DataStreams to be sent is {4}",
messageId, dataStream.Id, dataStream.Length, byteCountingStream.BytesWritten, totalDataStreamLength);
context.MessageId, dataStream.Id, dataStream.Length, byteCountingStream.BytesWritten, context.TotalSizeOfAllDataStreams);

if (halibutTimeoutsAndLimits.ThrowOnDataStreamSizeMismatch)
{
throw new ProtocolException($"Data stream size mismatch detected during send. Message Id: {messageId}, Stream ID: {dataStream.Id}, " +
throw new ProtocolException($"Data stream size mismatch detected during send. Message Id: {context.MessageId}, Stream ID: {dataStream.Id}, " +
$"Declared length: {dataStream.Length}, Actual bytes written: {byteCountingStream.BytesWritten}. " +
$"Total length of all DataStreams to be sent is {totalDataStreamLength}.");
$"Total length of all DataStreams to be sent is {context.TotalSizeOfAllDataStreams}.");
}
}

Expand Down