diff --git a/source/Halibut.Tests/Support/Streams/FlushBufferedStream.cs b/source/Halibut.Tests/Support/Streams/FlushBufferedStream.cs new file mode 100644 index 000000000..0aa5b9be1 --- /dev/null +++ b/source/Halibut.Tests/Support/Streams/FlushBufferedStream.cs @@ -0,0 +1,81 @@ +#nullable enable +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Halibut.Tests.Support.Streams +{ + /// + /// A stream that buffers all writes in memory and only forwards them to the underlying + /// stream when Flush or FlushAsync is called. + /// + public class TestOnlySendDataWhenFlushedStream : DelegateStreamBase + { + readonly Stream inner; + MemoryStream writeBuffer = new MemoryStream(); + + public TestOnlySendDataWhenFlushedStream(Stream inner) + { + this.inner = inner; + } + + public override Stream Inner => inner; + + public override void Write(byte[] buffer, int offset, int count) + => writeBuffer.Write(buffer, offset, count); + + public override void WriteByte(byte value) + => writeBuffer.WriteByte(value); + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => writeBuffer.WriteAsync(buffer, offset, count, cancellationToken); + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + => writeBuffer.BeginWrite(buffer, offset, count, callback, state); + + public override void EndWrite(IAsyncResult asyncResult) + => writeBuffer.EndWrite(asyncResult); + +#if !NETFRAMEWORK + public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + => await writeBuffer.WriteAsync(buffer, cancellationToken); + + public override void Write(ReadOnlySpan buffer) + => writeBuffer.Write(buffer); +#endif + + public override void Flush() + { + writeBuffer.Position = 0; + writeBuffer.CopyTo(inner); + writeBuffer = new MemoryStream(); + inner.Flush(); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + writeBuffer.Position = 0; + await writeBuffer.CopyToAsync(inner, 8192, cancellationToken); + writeBuffer = new MemoryStream(); + await inner.FlushAsync(cancellationToken); + } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + writeBuffer.Dispose(); + } + base.Dispose(disposing); + } + +#if !NETFRAMEWORK + public override async ValueTask DisposeAsync() + { + await writeBuffer.DisposeAsync(); + await inner.DisposeAsync(); + } +#endif + } +} diff --git a/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs b/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs new file mode 100644 index 000000000..d22f7c1c0 --- /dev/null +++ b/source/Halibut.Tests/Transport/WhenTransportUsesFlushBufferedStreamFixture.cs @@ -0,0 +1,52 @@ +using System.Threading.Tasks; +using FluentAssertions; +using Halibut.Tests.Support; +using Halibut.Tests.Support.Streams; +using Halibut.Tests.Support.TestAttributes; +using Halibut.Tests.Support.TestCases; +using Halibut.Tests.TestServices.Async; +using Halibut.TestUtils.Contracts; +using NUnit.Framework; + +namespace Halibut.Tests.Transport +{ + public class WhenTransportUsesFlushBufferedStreamFixture : BaseTest + { +#if !NETFRAMEWORK + // On net48, SslStream uses APM (BeginWrite/EndWrite) internally during the TLS handshake rather than + // WriteAsync/FlushAsync. TestOnlySendDataWhenFlushedStream.BeginWrite only writes to an in-memory buffer + // and never flushes, so handshake data from both sides sits buffered and neither end receives anything — + // the client times out waiting for the ServerHello. On modern .NET, SslStream calls FlushAsync after each + // TLS record, so the handshake works correctly. There is no hook to inject a flush between SslStream's + // internal BeginWrite/EndWrite calls on net48, so this test cannot run there. + // + // Why net48's SslStream never calls Flush: it was designed assuming the underlying stream sends data + // immediately on Write/BeginWrite — a valid assumption for NetworkStream over TCP, where bytes go out + // without needing an explicit Flush. The net48 implementation treated Flush as a no-op concern and never + // added the call. When SslStream was rewritten for modern .NET it was made transport-agnostic, explicitly + // calling FlushAsync after each TLS record so it works correctly with any stream implementation. + // TestOnlySendDataWhenFlushedStream violates the net48 assumption by holding writes in a MemoryStream + // until Flush is called, which net48's SslStream never does. + + [Test] + [LatestClientAndLatestServiceTestCases(testNetworkConditions: false)] + public async Task RequestsSucceed_WhenStreamsOnlyForwardDataOnFlush(ClientAndServiceTestCase clientAndServiceTestCase) + { + await using var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder() + .AsLatestClientAndLatestServiceBuilder() + .WithClientStreamFactory(new StreamWrappingStreamFactory { WrapStreamWith = s => new TestOnlySendDataWhenFlushedStream(s) }) + .WithServiceStreamFactory(new StreamWrappingStreamFactory { WrapStreamWith = s => new TestOnlySendDataWhenFlushedStream(s) }) + .WithEchoService() + .Build(CancellationToken); + + var echo = clientAndService.CreateAsyncClient(); + + for (var i = 0; i < clientAndServiceTestCase.RecommendedIterations; i++) + { + var result = await echo.SayHelloAsync("hello"); + result.Should().Be("hello..."); + } + } +#endif + } +} diff --git a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs index 334462a4c..1f7050104 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeStream.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeStream.cs @@ -191,6 +191,7 @@ public async Task SendAsync(T message, CancellationToken cancellationToken) var serializedStreams = await serializer.WriteMessageAsync(stream, message, cancellationToken); await WriteEachStreamAsync(serializedStreams, cancellationToken); + await stream.FlushAsync(cancellationToken); log.Write(EventType.Diagnostic, "Sent message"); }