-
Notifications
You must be signed in to change notification settings - Fork 48
Fix a bug where we would not flush messages before waiting for a response #705
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
81 changes: 81 additions & 0 deletions
81
source/Halibut.Tests/Support/Streams/FlushBufferedStream.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| #nullable enable | ||
| using System; | ||
| using System.IO; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
|
|
||
| namespace Halibut.Tests.Support.Streams | ||
| { | ||
| /// <summary> | ||
| /// A stream that buffers all writes in memory and only forwards them to the underlying | ||
| /// stream when Flush or FlushAsync is called. | ||
| /// </summary> | ||
| public class TestOnlySendDataWhenFlushedStream : DelegateStreamBase | ||
| { | ||
| readonly Stream inner; | ||
| MemoryStream writeBuffer = new MemoryStream(); | ||
|
|
||
| public TestOnlySendDataWhenFlushedStream(Stream inner) | ||
| { | ||
| this.inner = inner; | ||
| } | ||
|
|
||
| public override Stream Inner => inner; | ||
|
|
||
| public override void Write(byte[] buffer, int offset, int count) | ||
| => writeBuffer.Write(buffer, offset, count); | ||
|
|
||
| public override void WriteByte(byte value) | ||
| => writeBuffer.WriteByte(value); | ||
|
|
||
| public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | ||
| => writeBuffer.WriteAsync(buffer, offset, count, cancellationToken); | ||
|
|
||
| public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) | ||
| => writeBuffer.BeginWrite(buffer, offset, count, callback, state); | ||
|
|
||
| public override void EndWrite(IAsyncResult asyncResult) | ||
| => writeBuffer.EndWrite(asyncResult); | ||
|
|
||
| #if !NETFRAMEWORK | ||
| public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) | ||
| => await writeBuffer.WriteAsync(buffer, cancellationToken); | ||
|
|
||
| public override void Write(ReadOnlySpan<byte> buffer) | ||
| => writeBuffer.Write(buffer); | ||
| #endif | ||
|
|
||
| public override void Flush() | ||
| { | ||
| writeBuffer.Position = 0; | ||
| writeBuffer.CopyTo(inner); | ||
| writeBuffer = new MemoryStream(); | ||
| inner.Flush(); | ||
| } | ||
|
|
||
| public override async Task FlushAsync(CancellationToken cancellationToken) | ||
| { | ||
| writeBuffer.Position = 0; | ||
| await writeBuffer.CopyToAsync(inner, 8192, cancellationToken); | ||
| writeBuffer = new MemoryStream(); | ||
| await inner.FlushAsync(cancellationToken); | ||
| } | ||
|
|
||
| protected override void Dispose(bool disposing) | ||
| { | ||
| if (disposing) | ||
| { | ||
| writeBuffer.Dispose(); | ||
| } | ||
| base.Dispose(disposing); | ||
| } | ||
|
|
||
| #if !NETFRAMEWORK | ||
| public override async ValueTask DisposeAsync() | ||
| { | ||
| await writeBuffer.DisposeAsync(); | ||
| await inner.DisposeAsync(); | ||
| } | ||
| #endif | ||
| } | ||
| } |
52 changes: 52 additions & 0 deletions
52
source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| using System.Threading.Tasks; | ||
| using FluentAssertions; | ||
| using Halibut.Tests.Support; | ||
| using Halibut.Tests.Support.Streams; | ||
| using Halibut.Tests.Support.TestAttributes; | ||
| using Halibut.Tests.Support.TestCases; | ||
| using Halibut.Tests.TestServices.Async; | ||
| using Halibut.TestUtils.Contracts; | ||
| using NUnit.Framework; | ||
|
|
||
| namespace Halibut.Tests.Transport | ||
| { | ||
| public class WhenTransportUsesFlushBufferedStreamFixture : BaseTest | ||
| { | ||
| #if !NETFRAMEWORK | ||
| // On net48, SslStream uses APM (BeginWrite/EndWrite) internally during the TLS handshake rather than | ||
| // WriteAsync/FlushAsync. TestOnlySendDataWhenFlushedStream.BeginWrite only writes to an in-memory buffer | ||
| // and never flushes, so handshake data from both sides sits buffered and neither end receives anything — | ||
| // the client times out waiting for the ServerHello. On modern .NET, SslStream calls FlushAsync after each | ||
| // TLS record, so the handshake works correctly. There is no hook to inject a flush between SslStream's | ||
| // internal BeginWrite/EndWrite calls on net48, so this test cannot run there. | ||
| // | ||
| // Why net48's SslStream never calls Flush: it was designed assuming the underlying stream sends data | ||
| // immediately on Write/BeginWrite — a valid assumption for NetworkStream over TCP, where bytes go out | ||
| // without needing an explicit Flush. The net48 implementation treated Flush as a no-op concern and never | ||
| // added the call. When SslStream was rewritten for modern .NET it was made transport-agnostic, explicitly | ||
| // calling FlushAsync after each TLS record so it works correctly with any stream implementation. | ||
| // TestOnlySendDataWhenFlushedStream violates the net48 assumption by holding writes in a MemoryStream | ||
| // until Flush is called, which net48's SslStream never does. | ||
|
|
||
| [Test] | ||
| [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] | ||
| public async Task RequestsSucceed_WhenStreamsOnlyForwardDataOnFlush(ClientAndServiceTestCase clientAndServiceTestCase) | ||
| { | ||
| await using var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() | ||
| .AsLatestClientAndLatestServiceBuilder() | ||
| .WithClientStreamFactory(new StreamWrappingStreamFactory { WrapStreamWith = s => new TestOnlySendDataWhenFlushedStream(s) }) | ||
| .WithServiceStreamFactory(new StreamWrappingStreamFactory { WrapStreamWith = s => new TestOnlySendDataWhenFlushedStream(s) }) | ||
| .WithEchoService() | ||
| .Build(CancellationToken); | ||
|
|
||
| var echo = clientAndService.CreateAsyncClient<IEchoService, IAsyncClientEchoService>(); | ||
|
|
||
| for (var i = 0; i < clientAndServiceTestCase.RecommendedIterations; i++) | ||
| { | ||
| var result = await echo.SayHelloAsync("hello"); | ||
| result.Should().Be("hello..."); | ||
| } | ||
| } | ||
| #endif | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -191,6 +191,7 @@ public async Task SendAsync<T>(T message, CancellationToken cancellationToken) | |
| var serializedStreams = await serializer.WriteMessageAsync(stream, message, cancellationToken); | ||
| await WriteEachStreamAsync(serializedStreams, cancellationToken); | ||
|
|
||
| await stream.FlushAsync(cancellationToken); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The fix |
||
| log.Write(EventType.Diagnostic, "Sent message"); | ||
| } | ||
|
|
||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the fix omitted, this fails with "Attempted to read past the end of the stream" (and takes 30-45 seconds to do so for each of the three combinations on my machine).
Not sure if it's worth improving the failure behaviour, but if something obvious can be improved here that could help in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not, its not something we can obviously improve.