Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Runtime/Scripts/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@

[assembly: InternalsVisibleTo("EditModeTests")]
[assembly: InternalsVisibleTo("PlayModeTests")]
[assembly: InternalsVisibleTo("PlayModeTests.UniTask")]
8 changes: 7 additions & 1 deletion Runtime/Scripts/DataStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ public abstract class ReadIncrementalInstructionBase<TContent> : StreamYieldInst
/// </summary>
public bool IsError => Error != null;

protected TContent LatestChunk
/// <summary>
/// The chunk from the most recent completed read. Throws the captured
/// <see cref="StreamError"/> if the last read errored. Public so the optional
/// UniTask async-enumerable adapter can read it generically; the typed
/// <c>Bytes</c>/<c>Text</c> accessors on the concrete readers delegate here.
/// </summary>
public TContent LatestChunk
{
get
{
Expand Down
10 changes: 8 additions & 2 deletions Runtime/Scripts/Internal/YieldInstruction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,16 @@ protected set
}
}

internal bool IsCurrentReadDone
/// <summary>
/// True once a chunk is ready for the current read (before <see cref="Reset"/> is
/// called for the next one). Public getter mirrors the sibling
/// <c>DataTrack.ReadFrameInstruction.IsCurrentReadDone</c>; the setter stays internal
/// because only the SDK's stream readers advance this state.
/// </summary>
public bool IsCurrentReadDone
{
get => _isCurrentReadDone;
set
internal set
{
_isCurrentReadDone = value;
if (value) InvokeContinuation();
Expand Down
79 changes: 79 additions & 0 deletions Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#if LIVEKIT_UNITASK
using System.Threading;
using Cysharp.Threading.Tasks;
using Cysharp.Threading.Tasks.Linq;

namespace LiveKit
{
/// <summary>
/// Exposes the SDK's incremental stream readers as <see cref="IUniTaskAsyncEnumerable{T}"/>
/// so chunks can be consumed with <c>await foreach</c>. Available only when the
/// <c>com.cysharp.unitask</c> package is installed (gated by <c>LIVEKIT_UNITASK</c>).
/// </summary>
public static class StreamReaderUniTaskExtensions
{
/// <summary>
/// Adapts an incremental stream read into an async sequence of chunks. Works for both
/// <see cref="ByteStreamReader.ReadIncrementalInstruction"/> (<c>byte[]</c>) and
/// <see cref="TextStreamReader.ReadIncrementalInstruction"/> (<c>string</c>).
/// </summary>
/// <remarks>
/// Iteration ends when the stream reaches end-of-stream. If the stream ends with an
/// error, the enumerable throws that <see cref="StreamError"/> (idiomatic for
/// <c>await foreach</c>; this is the one place the UniTask surface throws rather than
/// exposing <c>IsError</c>). Cancellation (via the token or the enumerator) surfaces as
/// <see cref="System.OperationCanceledException"/> with abandon-awaiter semantics — the
/// underlying FFI read is not cancelled on the wire.
///
/// Like the coroutine consumer, this delivers the current chunk on the iteration where
/// end-of-stream is also observed, then stops. Chunks buffered <em>beyond</em> the
/// current one when end-of-stream arrives are not drainable — a pre-existing limitation
/// of the reader (its <c>Reset()</c> is disallowed past end-of-stream), not specific to
/// this adapter.
/// </remarks>
public static IUniTaskAsyncEnumerable<TChunk> AsAsyncEnumerable<TChunk>(
this ReadIncrementalInstructionBase<TChunk> instruction,
CancellationToken cancellationToken = default)
{
if (instruction == null) throw new System.ArgumentNullException(nameof(instruction));

return UniTaskAsyncEnumerable.Create<TChunk>(async (writer, token) =>
{
// The enumerator hands us its own token; honor both it and the caller's.
using var linked = CancellationTokenSource.CreateLinkedTokenSource(token, cancellationToken);
var ct = linked.Token;

while (true)
{
// Completes when a chunk is ready (IsCurrentReadDone) or the stream ends (IsEos).
await instruction.AsUniTask(ct);

if (instruction.IsCurrentReadDone)
{
var chunk = instruction.LatestChunk;
await writer.YieldAsync(chunk);

// Re-check IsEos AFTER yielding: end-of-stream may have arrived while
// the consumer was suspended. Reset() throws once IsEos is set, so this
// re-check (not a value captured before the yield) is what keeps the
// loop safe — mirroring the coroutine consumer's "if (IsEos) break;
// else Reset()" ordering.
if (instruction.IsEos)
{
if (instruction.IsError) throw instruction.Error;
return;
}

instruction.Reset();
continue;
}

// Not IsCurrentReadDone => end-of-stream with nothing left to read.
if (instruction.IsError) throw instruction.Error;
return;
}
});
}
}
}
#endif
11 changes: 11 additions & 0 deletions Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
"rootNamespace": "LiveKit.UniTaskExtensions",
"references": [
"LiveKit",
"UniTask"
"UniTask",
"UniTask.Linq"
],
"includePlatforms": [],
"excludePlatforms": [],
Expand Down
3 changes: 2 additions & 1 deletion Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"UnityEditor.TestRunner",
"LiveKit",
"LiveKit.UniTask",
"UniTask"
"UniTask",
"UniTask.Linq"
],
"includePlatforms": [],
"excludePlatforms": [],
Expand Down
142 changes: 142 additions & 0 deletions Tests/PlayMode/UniTask/StreamUniTaskTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
#if LIVEKIT_UNITASK
using System;
using System.Collections.Generic;
using System.Threading;
using Cysharp.Threading.Tasks;
using LiveKit.Internal;
using NUnit.Framework;
using UnityEngine.TestTools;

namespace LiveKit.PlayModeTests.UniTaskBridge
{
public class StreamUniTaskTests
{
// Synthetic incremental reader that drives the base chunk/EoS machinery directly,
// with no FFI — the same seam used by the EditMode DataStreamIncrementalReadTests.
// FfiHandle is public; new FfiHandle(IntPtr.Zero) is a valid dummy handle.
private sealed class TestIncrementalReader : ReadIncrementalInstructionBase<string>
{
public TestIncrementalReader(FfiHandle h) : base(h) { }
public void PushChunk(string content) => OnChunk(content);
public void PushEos(LiveKit.Proto.StreamError error = null) => OnEos(error);
}

// Chunks pushed and consumed one at a time arrive in order; the sequence ends when
// EoS is observed. Manual enumeration interleaves push/pull so EoS only follows a
// fully drained queue (matching how chunks arrive over time in production).
[UnityTest]
public System.Collections.IEnumerator AsAsyncEnumerable_DeliversChunksInOrder_ThenStops() => UniTask.ToCoroutine(async () =>
{
using var handle = new FfiHandle(IntPtr.Zero);
var reader = new TestIncrementalReader(handle);

var e = reader.AsAsyncEnumerable().GetAsyncEnumerator();
try
{
reader.PushChunk("A");
Assert.IsTrue(await e.MoveNextAsync(), "Expected chunk A");
Assert.AreEqual("A", e.Current);

reader.PushChunk("B");
Assert.IsTrue(await e.MoveNextAsync(), "Expected chunk B");
Assert.AreEqual("B", e.Current);

reader.PushChunk("C");
Assert.IsTrue(await e.MoveNextAsync(), "Expected chunk C");
Assert.AreEqual("C", e.Current);

reader.PushEos();
Assert.IsFalse(await e.MoveNextAsync(), "Enumeration must end at EoS");
}
finally
{
await e.DisposeAsync();
}
});

// The current chunk is delivered even when EoS is already set at the time it is read,
// then the sequence ends. (Chunks buffered beyond the current one when EoS arrives are
// not drainable — a pre-existing reader limitation, asserted here for clarity.)
[UnityTest]
public System.Collections.IEnumerator AsAsyncEnumerable_DeliversFinalChunkThenEos() => UniTask.ToCoroutine(async () =>
{
using var handle = new FfiHandle(IntPtr.Zero);
var reader = new TestIncrementalReader(handle);

reader.PushChunk("only");
reader.PushEos();

var observed = new List<string>();
await foreach (var chunk in reader.AsAsyncEnumerable())
observed.Add(chunk);

CollectionAssert.AreEqual(new[] { "only" }, observed);
});

// A chunk delivered before the stream errors is observed; the subsequent error EoS
// then surfaces as a thrown StreamError. Manual enumeration models the real timeline
// (chunk arrives, is consumed, THEN the error ends the stream) — note that once the
// error is set, LatestChunk itself throws, so the error must follow chunk delivery.
[UnityTest]
public System.Collections.IEnumerator AsAsyncEnumerable_ThrowsStreamError_AfterDeliveringChunk() => UniTask.ToCoroutine(async () =>
{
using var handle = new FfiHandle(IntPtr.Zero);
var reader = new TestIncrementalReader(handle);

var e = reader.AsAsyncEnumerable().GetAsyncEnumerator();
try
{
reader.PushChunk("partial");
Assert.IsTrue(await e.MoveNextAsync(), "Expected the pre-error chunk");
Assert.AreEqual("partial", e.Current);

reader.PushEos(new LiveKit.Proto.StreamError { Description = "boom" });

StreamError caught = null;
try
{
await e.MoveNextAsync();
}
catch (StreamError ex)
{
caught = ex;
}

Assert.IsNotNull(caught, "Expected the error EoS to throw a StreamError");
Assert.AreEqual("boom", caught.Message);
}
finally
{
await e.DisposeAsync();
}
});

// A cancelled token surfaces as OperationCanceledException with abandon-awaiter
// semantics: nothing is observed and the underlying reader is untouched.
[UnityTest]
public System.Collections.IEnumerator AsAsyncEnumerable_Cancellation_ThrowsOperationCanceled() => UniTask.ToCoroutine(async () =>
{
using var handle = new FfiHandle(IntPtr.Zero);
var reader = new TestIncrementalReader(handle);
using var cts = new CancellationTokenSource();
cts.Cancel();

var observed = new List<string>();
bool threw = false;
try
{
await foreach (var chunk in reader.AsAsyncEnumerable(cts.Token))
observed.Add(chunk);
}
catch (OperationCanceledException)
{
threw = true;
}

Assert.IsTrue(threw, "Expected OperationCanceledException for a cancelled token");
CollectionAssert.IsEmpty(observed);
Assert.IsFalse(reader.IsEos, "Abandon-awaiter semantics: reader state is untouched");
});
}
}
#endif
11 changes: 11 additions & 0 deletions Tests/PlayMode/UniTask/StreamUniTaskTests.cs.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading