Add guard rails for DataStreams that differ in-size from their reported size.#704
Add guard rails for DataStreams that differ in-size from their reported size.#704LukeButters merged 13 commits intomainfrom
Conversation
rhysparry
left a comment
There was a problem hiding this comment.
There are a few things that are a little trickier to understand than the could be. Assuming the ByteCountingStream works properly this looks to at least do all the same things, just clearer errors and a new exception if we haven't written the expected number of bytes.
Let me know when you've addressed the comments and I'll ✅
| { | ||
| var result = await ReadMessage<ResponseMessage>(sut, stream); | ||
| result.Error.Should().NotBeNull(); | ||
| result!.Error.Should().NotBeNull(); |
There was a problem hiding this comment.
I'd prefer an assertion rather than the null-forgiving operator. It "should" give a clearer error too.
i.e.
result.Should().NotBeNull();I'm pretty sure this properly marks result as non-null for the analyzers (at least, I've seen it do that).
There was a problem hiding this comment.
It seems I still need the bang so changing to:
result.Should().NotBeNull();
result!.Error.Should().NotBeNull();
| var result = await ReadMessage<ResponseMessage>(sut, stream); | ||
| result.Error.Should().NotBeNull(); | ||
| result!.Error.Should().NotBeNull(); | ||
| result.Error!.Message = "foo"; |
There was a problem hiding this comment.
In theory you might even be able to drop this null-forgiving operator too.
| } | ||
|
|
||
| async Task<TemporaryFileStream> CopyStreamToFileAsync(Guid id, long length, Stream stream, CancellationToken cancellationToken) | ||
| async Task<TemporaryFileStream> CopyStreamToFileAsync(string messageId, Guid id, long length, Stream stream, IReadOnlyList<DataStream> deserializedStreams, CancellationToken cancellationToken) |
There was a problem hiding this comment.
Including the deserializedStreams here was a little confusing. We seem to use this to determine the total data stream length so that it can be included in the error message.
Some options:
- Provide the total length instead of the data streams (we can then use it in
ReadStreamAsyncwhen we are logging/throwing exceptions rather than recalculating it) - Catch and provide the extra details in
ReadStreamAsync.
There was a problem hiding this comment.
Passing in the total size, and re-orderd the params, and renamed some prams
| "Expected length: {2}, Actual bytes read: {3}. " + | ||
| "Total length of all DataStreams to be sent is {4}. Exception: {5}", | ||
| ex, | ||
| messageId, id, length, length - bytesLeftToRead, totalDataStreamLength, ex.Message); |
There was a problem hiding this comment.
nit: extract a variable for length - bytesLeftToRead to improve clarity.
There was a problem hiding this comment.
nit: Do implementations of WriteException actually not write the exception message? If not, does the duplication make things less clear?
There was a problem hiding this comment.
I forced it to throw and it seems we can ditch printing the exception message.
| await stream.FlushAsync(cancellationToken); | ||
|
|
||
| await ((IDataStreamInternal)dataStream).TransmitAsync(stream, cancellationToken); | ||
| var byteCountingStream = new ByteCountingStream(stream, OnDispose.LeaveInputStreamOpen); |
There was a problem hiding this comment.
Should we dispose the byteCountingStream anyway (via await using) so we get reliable behaviour rather than it being run by the garbage collector?
(I know we won't dispose the wrapped stream)
There was a problem hiding this comment.
Disposing anyway.
| { | ||
| Task<IReadOnlyList<DataStream>> WriteMessageAsync<T>(Stream stream, T message, CancellationToken cancellationToken); | ||
| Task<(T Message, IReadOnlyList<DataStream> DataStreams)> ReadMessageAsync<T>(RewindableBufferStream stream, CancellationToken cancellationToken); | ||
| Task<(T? Message, IReadOnlyList<DataStream> DataStreams)> ReadMessageAsync<T>(RewindableBufferStream stream, CancellationToken cancellationToken); |
There was a problem hiding this comment.
I think this is only used internally (within Halibut), so this change should be safe. I couldn't find usages in Octopus or Tentacle.
There was a problem hiding this comment.
It should be, lol.
Specifically, it should be used internally only.
Background
ref CLOUDPT-11160
ref EFT-3141
DataStreams are sent using the following format:
In the code it is possible to declare the length of the DataStream to be different from the number of bytes sent in the DataStream. 😨
This is especially bad when the DataSent in the DataStream is less then the reported size, e.g. if the DataStream claims to be 100bytes but only 10 bytes is sent THEN we end up in a situation in which the BOTH ends sit waiting for each other to send data.
To ensure this is indeed NOT happening we will now:
Throwing an exception is useful, because that will result in the TCP stream being killed which will result in both end reconnecting in a short amount of time. This will allow for another RPC to be made, useful if RPCs are retried.
Additionally:
Results
How to review this PR
Quality ✔️
Pre-requisites