Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions source/Halibut.Tests/Support/Streams/FlushBufferedStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#nullable enable
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Halibut.Tests.Support.Streams
{
/// <summary>
/// A stream that buffers all writes in memory and only forwards them to the underlying
/// stream when Flush or FlushAsync is called.
/// </summary>
public class TestOnlySendDataWhenFlushedStream : DelegateStreamBase
{
readonly Stream inner;
MemoryStream writeBuffer = new MemoryStream();

public TestOnlySendDataWhenFlushedStream(Stream inner)
{
this.inner = inner;
}

public override Stream Inner => inner;

public override void Write(byte[] buffer, int offset, int count)
=> writeBuffer.Write(buffer, offset, count);

public override void WriteByte(byte value)
=> writeBuffer.WriteByte(value);

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> writeBuffer.WriteAsync(buffer, offset, count, cancellationToken);

public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
=> writeBuffer.BeginWrite(buffer, offset, count, callback, state);

public override void EndWrite(IAsyncResult asyncResult)
=> writeBuffer.EndWrite(asyncResult);

#if !NETFRAMEWORK
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
=> await writeBuffer.WriteAsync(buffer, cancellationToken);

public override void Write(ReadOnlySpan<byte> buffer)
=> writeBuffer.Write(buffer);
#endif

public override void Flush()
{
writeBuffer.Position = 0;
writeBuffer.CopyTo(inner);
writeBuffer = new MemoryStream();
inner.Flush();
}

public override async Task FlushAsync(CancellationToken cancellationToken)
{
writeBuffer.Position = 0;
await writeBuffer.CopyToAsync(inner, 8192, cancellationToken);
writeBuffer = new MemoryStream();
await inner.FlushAsync(cancellationToken);
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
writeBuffer.Dispose();
}
base.Dispose(disposing);
}

#if !NETFRAMEWORK
public override async ValueTask DisposeAsync()
{
await writeBuffer.DisposeAsync();
await inner.DisposeAsync();
}
#endif
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System.Threading.Tasks;
using FluentAssertions;
using Halibut.Tests.Support;
using Halibut.Tests.Support.Streams;
using Halibut.Tests.Support.TestAttributes;
using Halibut.Tests.Support.TestCases;
using Halibut.Tests.TestServices.Async;
using Halibut.TestUtils.Contracts;
using NUnit.Framework;

namespace Halibut.Tests.Transport
{
public class WhenTransportUsesFlushBufferedStreamFixture : BaseTest
{
#if !NETFRAMEWORK
// On net48, SslStream uses APM (BeginWrite/EndWrite) internally during the TLS handshake rather than
// WriteAsync/FlushAsync. TestOnlySendDataWhenFlushedStream.BeginWrite only writes to an in-memory buffer
// and never flushes, so handshake data from both sides sits buffered and neither end receives anything —
// the client times out waiting for the ServerHello. On modern .NET, SslStream calls FlushAsync after each
// TLS record, so the handshake works correctly. There is no hook to inject a flush between SslStream's
// internal BeginWrite/EndWrite calls on net48, so this test cannot run there.
//
// Why net48's SslStream never calls Flush: it was designed assuming the underlying stream sends data
// immediately on Write/BeginWrite — a valid assumption for NetworkStream over TCP, where bytes go out
// without needing an explicit Flush. The net48 implementation treated Flush as a no-op concern and never
// added the call. When SslStream was rewritten for modern .NET it was made transport-agnostic, explicitly
// calling FlushAsync after each TLS record so it works correctly with any stream implementation.
// TestOnlySendDataWhenFlushedStream violates the net48 assumption by holding writes in a MemoryStream
// until Flush is called, which net48's SslStream never does.

[Test]
[LatestClientAndLatestServiceTestCases(testNetworkConditions: false)]
public async Task RequestsSucceed_WhenStreamsOnlyForwardDataOnFlush(ClientAndServiceTestCase clientAndServiceTestCase)
{
await using var clientAndService = await clientAndServiceTestCase.CreateTestCaseBuilder()
.AsLatestClientAndLatestServiceBuilder()
.WithClientStreamFactory(new StreamWrappingStreamFactory { WrapStreamWith = s => new TestOnlySendDataWhenFlushedStream(s) })
.WithServiceStreamFactory(new StreamWrappingStreamFactory { WrapStreamWith = s => new TestOnlySendDataWhenFlushedStream(s) })
.WithEchoService()
.Build(CancellationToken);

var echo = clientAndService.CreateAsyncClient<IEchoService, IAsyncClientEchoService>();

for (var i = 0; i < clientAndServiceTestCase.RecommendedIterations; i++)
{
var result = await echo.SayHelloAsync("hello");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the fix omitted, this fails with "Attempted to read past the end of the stream" (and takes 30-45 seconds to do so for each of the three combinations on my machine).

Not sure if it's worth improving the failure behaviour, but if something obvious can be improved here that could help in the future.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not, its not something we can obviously improve.

result.Should().Be("hello...");
}
}
#endif
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ public async Task SendAsync<T>(T message, CancellationToken cancellationToken)
var serializedStreams = await serializer.WriteMessageAsync(stream, message, cancellationToken);
await WriteEachStreamAsync(serializedStreams, cancellationToken);

await stream.FlushAsync(cancellationToken);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix

log.Write(EventType.Diagnostic, "Sent message");
}

Expand Down