From 971623c6601ff07215a39758bb4b2ede9993f0a5 Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Thu, 26 Mar 2026 13:01:06 +1100 Subject: [PATCH 1/4] Fix a bug where we would not flush messages before waiting for a response --- .../Support/Streams/FlushBufferedStream.cs | 81 +++++++++++++++++++ ...TransportUsesFlushBufferedStreamFixture.cs | 35 ++++++++ .../Protocol/MessageExchangeStream.cs | 1 + 3 files changed, 117 insertions(+) create mode 100644 source/Halibut.Tests/Support/Streams/FlushBufferedStream.cs create mode 100644 source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs diff --git a/source/Halibut.Tests/Support/Streams/FlushBufferedStream.cs b/source/Halibut.Tests/Support/Streams/FlushBufferedStream.cs new file mode 100644 index 000000000..3921b2bde --- /dev/null +++ b/source/Halibut.Tests/Support/Streams/FlushBufferedStream.cs @@ -0,0 +1,81 @@ +#nullable enable +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Halibut.Tests.Support.Streams +{ + /// + /// A stream that buffers all writes in memory and only forwards them to the underlying + /// stream when Flush or FlushAsync is called. + /// + public class FlushBufferedStream : DelegateStreamBase + { + readonly Stream inner; + MemoryStream writeBuffer = new MemoryStream(); + + public FlushBufferedStream(Stream inner) + { + this.inner = inner; + } + + public override Stream Inner => inner; + + public override void Write(byte[] buffer, int offset, int count) + => writeBuffer.Write(buffer, offset, count); + + public override void WriteByte(byte value) + => writeBuffer.WriteByte(value); + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => writeBuffer.WriteAsync(buffer, offset, count, cancellationToken); + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + => writeBuffer.BeginWrite(buffer, offset, count, callback, state); + + public override void EndWrite(IAsyncResult asyncResult) + => writeBuffer.EndWrite(asyncResult); + +#if !NETFRAMEWORK + public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + => await writeBuffer.WriteAsync(buffer, cancellationToken); + + public override void Write(ReadOnlySpan buffer) + => writeBuffer.Write(buffer); +#endif + + public override void Flush() + { + writeBuffer.Position = 0; + writeBuffer.CopyTo(inner); + writeBuffer = new MemoryStream(); + inner.Flush(); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + writeBuffer.Position = 0; + await writeBuffer.CopyToAsync(inner, 8192, cancellationToken); + writeBuffer = new MemoryStream(); + await inner.FlushAsync(cancellationToken); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + writeBuffer.Dispose(); + } + base.Dispose(disposing); + } + +#if !NETFRAMEWORK + public override async ValueTask DisposeAsync() + { + await writeBuffer.DisposeAsync(); + await inner.DisposeAsync(); + } +#endif + } +} diff --git a/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs b/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs new file mode 100644 index 000000000..28027702e --- /dev/null +++ b/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs @@ -0,0 +1,35 @@ +using System.Threading.Tasks; +using FluentAssertions; +using Halibut.Tests.Support; +using Halibut.Tests.Support.Streams; +using Halibut.Tests.Support.TestAttributes; +using Halibut.Tests.Support.TestCases; +using Halibut.Tests.TestServices.Async; +using Halibut.TestUtils.Contracts; +using NUnit.Framework; + +namespace Halibut.Tests.Transport +{ + public class WhenTransportUsesFlushBufferedStreamFixture : BaseTest + { + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] + public async Task RequestsSucceed_WhenStreamsOnlyForwardDataOnFlush(ClientAndServiceTestCase clientAndServiceTestCase) + { + await using var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .AsLatestClientAndLatestServiceBuilder() + .WithClientStreamFactory(new StreamWrappingStreamFactory { WrapStreamWith = s => new FlushBufferedStream(s) }) + .WithServiceStreamFactory(new StreamWrappingStreamFactory { WrapStreamWith = s => new FlushBufferedStream(s) }) + .WithEchoService() + .Build(CancellationToken); + + var echo = clientAndService.CreateAsyncClient(); + + for (var i = 0; i < clientAndServiceTestCase.RecommendedIterations; i++) + { + var result = await echo.SayHelloAsync("hello"); + result.Should().Be("hello..."); + } + } + } +} diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index 334462a4c..1f7050104 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -191,6 +191,7 @@ public async Task SendAsync(T message, CancellationToken cancellationToken) var serializedStreams = await serializer.WriteMessageAsync(stream, message, cancellationToken); await WriteEachStreamAsync(serializedStreams, cancellationToken); + await stream.FlushAsync(cancellationToken); log.Write(EventType.Diagnostic, "Sent message"); } From 3c1244c5e2b3df9e8688426e7b7bfd2fc4b9f460 Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Thu, 26 Mar 2026 17:04:09 +1100 Subject: [PATCH 2/4] . --- source/Halibut.Tests/Support/Streams/FlushBufferedStream.cs | 4 ++-- .../Transport/WhenTransportUsesFlushBufferedStreamFixture.cs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/Halibut.Tests/Support/Streams/FlushBufferedStream.cs b/source/Halibut.Tests/Support/Streams/FlushBufferedStream.cs index 3921b2bde..0aa5b9be1 100644 --- a/source/Halibut.Tests/Support/Streams/FlushBufferedStream.cs +++ b/source/Halibut.Tests/Support/Streams/FlushBufferedStream.cs @@ -10,12 +10,12 @@ namespace Halibut.Tests.Support.Streams /// A stream that buffers all writes in memory and only forwards them to the underlying /// stream when Flush or FlushAsync is called. /// - public class FlushBufferedStream : DelegateStreamBase + public class TestOnlySendDataWhenFlushedStream : DelegateStreamBase { readonly Stream inner; MemoryStream writeBuffer = new MemoryStream(); - public FlushBufferedStream(Stream inner) + public TestOnlySendDataWhenFlushedStream(Stream inner) { this.inner = inner; } diff --git a/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs b/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs index 28027702e..af24aef62 100644 --- a/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs +++ b/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs @@ -18,8 +18,8 @@ public async Task RequestsSucceed_WhenStreamsOnlyForwardDataOnFlush(ClientAndSer { await using var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() .AsLatestClientAndLatestServiceBuilder() - .WithClientStreamFactory(new StreamWrappingStreamFactory { WrapStreamWith = s => new FlushBufferedStream(s) }) - .WithServiceStreamFactory(new StreamWrappingStreamFactory { WrapStreamWith = s => new FlushBufferedStream(s) }) + .WithClientStreamFactory(new StreamWrappingStreamFactory { WrapStreamWith = s => new TestOnlySendDataWhenFlushedStream(s) }) + .WithServiceStreamFactory(new StreamWrappingStreamFactory { WrapStreamWith = s => new TestOnlySendDataWhenFlushedStream(s) }) .WithEchoService() .Build(CancellationToken); From 02110f2093a0908a15a1ab81b1d9b786acf824c4 Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Fri, 27 Mar 2026 08:11:51 +1100 Subject: [PATCH 3/4] . --- ...enTransportUsesFlushBufferedStreamFixture.cs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs b/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs index af24aef62..6becdd9fa 100644 --- a/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs +++ b/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs @@ -12,6 +12,22 @@ namespace Halibut.Tests.Transport { public class WhenTransportUsesFlushBufferedStreamFixture : BaseTest { +#if !NETFRAMEWORK + // On net48, SslStream uses APM (BeginWrite/EndWrite) internally during the TLS handshake rather than + // WriteAsync/FlushAsync. TestOnlySendDataWhenFlushedStream.BeginWrite only writes to an in-memory buffer + // and never flushes, so handshake data from both sides sits buffered and neither end receives anything — + // the client times out waiting for the ServerHello. On modern .NET, SslStream calls FlushAsync after each + // TLS record, so the handshake works correctly. There is no hook to inject a flush between SslStream's + // internal BeginWrite/EndWrite calls on net48, so this test cannot run there. + // + // Why net48's SslStream never calls Flush: it was designed assuming the underlying stream sends data + // immediately on Write/BeginWrite — a valid assumption for NetworkStream over TCP, where bytes go out + // without needing an explicit Flush. The net48 implementation treated Flush as a no-op concern and never + // added the call. When SslStream was rewritten for modern .NET it was made transport-agnostic, explicitly + // calling FlushAsync after each TLS record so it works correctly with any stream implementation. + // TestOnlySendDataWhenFlushedStream violates the net48 assumption by holding writes in a MemoryStream + // until Flush is called, which net48's SslStream never does. + [Test] [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] public async Task RequestsSucceed_WhenStreamsOnlyForwardDataOnFlush(ClientAndServiceTestCase clientAndServiceTestCase) @@ -32,4 +48,5 @@ public async Task RequestsSucceed_WhenStreamsOnlyForwardDataOnFlush(ClientAndSer } } } +#endif } From cd619d982b0cd4ffab61a06c21e190c8b2f6dbca Mon Sep 17 00:00:00 2001 From: Luke Butters Date: Fri, 27 Mar 2026 10:01:34 +1100 Subject: [PATCH 4/4] Fix compilation --- .../Transport/WhenTransportUsesFlushBufferedStreamFixture.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs b/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs index 6becdd9fa..d22f7c1c0 100644 --- a/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs +++ b/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs @@ -47,6 +47,6 @@ public async Task RequestsSucceed_WhenStreamsOnlyForwardDataOnFlush(ClientAndSer result.Should().Be("hello..."); } } - } #endif + } }