diff --git a/Runtime/Scripts/AssemblyInfo.cs b/Runtime/Scripts/AssemblyInfo.cs index e667b32e..9e802523 100644 --- a/Runtime/Scripts/AssemblyInfo.cs +++ b/Runtime/Scripts/AssemblyInfo.cs @@ -2,3 +2,4 @@ [assembly: InternalsVisibleTo("EditModeTests")] [assembly: InternalsVisibleTo("PlayModeTests")] +[assembly: InternalsVisibleTo("PlayModeTests.UniTask")] diff --git a/Runtime/Scripts/DataStream.cs b/Runtime/Scripts/DataStream.cs index 68b18b5a..131ccf56 100644 --- a/Runtime/Scripts/DataStream.cs +++ b/Runtime/Scripts/DataStream.cs @@ -95,7 +95,13 @@ public abstract class ReadIncrementalInstructionBase : StreamYieldInst /// public bool IsError => Error != null; - protected TContent LatestChunk + /// + /// The chunk from the most recent completed read. Throws the captured + /// if the last read errored. Public so the optional + /// UniTask async-enumerable adapter can read it generically; the typed + /// Bytes/Text accessors on the concrete readers delegate here. + /// + public TContent LatestChunk { get { diff --git a/Runtime/Scripts/Internal/YieldInstruction.cs b/Runtime/Scripts/Internal/YieldInstruction.cs index d6520c47..80a29010 100644 --- a/Runtime/Scripts/Internal/YieldInstruction.cs +++ b/Runtime/Scripts/Internal/YieldInstruction.cs @@ -115,10 +115,16 @@ protected set } } - internal bool IsCurrentReadDone + /// + /// True once a chunk is ready for the current read (before is + /// called for the next one). Public getter mirrors the sibling + /// DataTrack.ReadFrameInstruction.IsCurrentReadDone; the setter stays internal + /// because only the SDK's stream readers advance this state. + /// + public bool IsCurrentReadDone { get => _isCurrentReadDone; - set + internal set { _isCurrentReadDone = value; if (value) InvokeContinuation(); diff --git a/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs b/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs new file mode 100644 index 00000000..f55a9165 --- /dev/null +++ b/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs @@ -0,0 +1,79 @@ +#if LIVEKIT_UNITASK +using System.Threading; +using Cysharp.Threading.Tasks; +using Cysharp.Threading.Tasks.Linq; + +namespace LiveKit +{ + /// + /// Exposes the SDK's incremental stream readers as + /// so chunks can be consumed with await foreach. Available only when the + /// com.cysharp.unitask package is installed (gated by LIVEKIT_UNITASK). + /// + public static class StreamReaderUniTaskExtensions + { + /// + /// Adapts an incremental stream read into an async sequence of chunks. Works for both + /// (byte[]) and + /// (string). + /// + /// + /// Iteration ends when the stream reaches end-of-stream. If the stream ends with an + /// error, the enumerable throws that (idiomatic for + /// await foreach; this is the one place the UniTask surface throws rather than + /// exposing IsError). Cancellation (via the token or the enumerator) surfaces as + /// with abandon-awaiter semantics — the + /// underlying FFI read is not cancelled on the wire. + /// + /// Like the coroutine consumer, this delivers the current chunk on the iteration where + /// end-of-stream is also observed, then stops. Chunks buffered beyond the + /// current one when end-of-stream arrives are not drainable — a pre-existing limitation + /// of the reader (its Reset() is disallowed past end-of-stream), not specific to + /// this adapter. + /// + public static IUniTaskAsyncEnumerable AsAsyncEnumerable( + this ReadIncrementalInstructionBase instruction, + CancellationToken cancellationToken = default) + { + if (instruction == null) throw new System.ArgumentNullException(nameof(instruction)); + + return UniTaskAsyncEnumerable.Create(async (writer, token) => + { + // The enumerator hands us its own token; honor both it and the caller's. + using var linked = CancellationTokenSource.CreateLinkedTokenSource(token, cancellationToken); + var ct = linked.Token; + + while (true) + { + // Completes when a chunk is ready (IsCurrentReadDone) or the stream ends (IsEos). + await instruction.AsUniTask(ct); + + if (instruction.IsCurrentReadDone) + { + var chunk = instruction.LatestChunk; + await writer.YieldAsync(chunk); + + // Re-check IsEos AFTER yielding: end-of-stream may have arrived while + // the consumer was suspended. Reset() throws once IsEos is set, so this + // re-check (not a value captured before the yield) is what keeps the + // loop safe — mirroring the coroutine consumer's "if (IsEos) break; + // else Reset()" ordering. + if (instruction.IsEos) + { + if (instruction.IsError) throw instruction.Error; + return; + } + + instruction.Reset(); + continue; + } + + // Not IsCurrentReadDone => end-of-stream with nothing left to read. + if (instruction.IsError) throw instruction.Error; + return; + } + }); + } + } +} +#endif diff --git a/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs.meta b/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs.meta new file mode 100644 index 00000000..883d5c63 --- /dev/null +++ b/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: ba02c0c61aa014db28635be5e1cf6e64 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef b/Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef index d00c9d6a..a1e3e218 100644 --- a/Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef +++ b/Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef @@ -3,7 +3,8 @@ "rootNamespace": "LiveKit.UniTaskExtensions", "references": [ "LiveKit", - "UniTask" + "UniTask", + "UniTask.Linq" ], "includePlatforms": [], "excludePlatforms": [], diff --git a/Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef b/Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef index 37e31869..db1f62ad 100644 --- a/Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef +++ b/Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef @@ -6,7 +6,8 @@ "UnityEditor.TestRunner", "LiveKit", "LiveKit.UniTask", - "UniTask" + "UniTask", + "UniTask.Linq" ], "includePlatforms": [], "excludePlatforms": [], diff --git a/Tests/PlayMode/UniTask/StreamUniTaskTests.cs b/Tests/PlayMode/UniTask/StreamUniTaskTests.cs new file mode 100644 index 00000000..bb07f4b6 --- /dev/null +++ b/Tests/PlayMode/UniTask/StreamUniTaskTests.cs @@ -0,0 +1,142 @@ +#if LIVEKIT_UNITASK +using System; +using System.Collections.Generic; +using System.Threading; +using Cysharp.Threading.Tasks; +using LiveKit.Internal; +using NUnit.Framework; +using UnityEngine.TestTools; + +namespace LiveKit.PlayModeTests.UniTaskBridge +{ + public class StreamUniTaskTests + { + // Synthetic incremental reader that drives the base chunk/EoS machinery directly, + // with no FFI — the same seam used by the EditMode DataStreamIncrementalReadTests. + // FfiHandle is public; new FfiHandle(IntPtr.Zero) is a valid dummy handle. + private sealed class TestIncrementalReader : ReadIncrementalInstructionBase + { + public TestIncrementalReader(FfiHandle h) : base(h) { } + public void PushChunk(string content) => OnChunk(content); + public void PushEos(LiveKit.Proto.StreamError error = null) => OnEos(error); + } + + // Chunks pushed and consumed one at a time arrive in order; the sequence ends when + // EoS is observed. Manual enumeration interleaves push/pull so EoS only follows a + // fully drained queue (matching how chunks arrive over time in production). + [UnityTest] + public System.Collections.IEnumerator AsAsyncEnumerable_DeliversChunksInOrder_ThenStops() => UniTask.ToCoroutine(async () => + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + + var e = reader.AsAsyncEnumerable().GetAsyncEnumerator(); + try + { + reader.PushChunk("A"); + Assert.IsTrue(await e.MoveNextAsync(), "Expected chunk A"); + Assert.AreEqual("A", e.Current); + + reader.PushChunk("B"); + Assert.IsTrue(await e.MoveNextAsync(), "Expected chunk B"); + Assert.AreEqual("B", e.Current); + + reader.PushChunk("C"); + Assert.IsTrue(await e.MoveNextAsync(), "Expected chunk C"); + Assert.AreEqual("C", e.Current); + + reader.PushEos(); + Assert.IsFalse(await e.MoveNextAsync(), "Enumeration must end at EoS"); + } + finally + { + await e.DisposeAsync(); + } + }); + + // The current chunk is delivered even when EoS is already set at the time it is read, + // then the sequence ends. (Chunks buffered beyond the current one when EoS arrives are + // not drainable — a pre-existing reader limitation, asserted here for clarity.) + [UnityTest] + public System.Collections.IEnumerator AsAsyncEnumerable_DeliversFinalChunkThenEos() => UniTask.ToCoroutine(async () => + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + + reader.PushChunk("only"); + reader.PushEos(); + + var observed = new List(); + await foreach (var chunk in reader.AsAsyncEnumerable()) + observed.Add(chunk); + + CollectionAssert.AreEqual(new[] { "only" }, observed); + }); + + // A chunk delivered before the stream errors is observed; the subsequent error EoS + // then surfaces as a thrown StreamError. Manual enumeration models the real timeline + // (chunk arrives, is consumed, THEN the error ends the stream) — note that once the + // error is set, LatestChunk itself throws, so the error must follow chunk delivery. + [UnityTest] + public System.Collections.IEnumerator AsAsyncEnumerable_ThrowsStreamError_AfterDeliveringChunk() => UniTask.ToCoroutine(async () => + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + + var e = reader.AsAsyncEnumerable().GetAsyncEnumerator(); + try + { + reader.PushChunk("partial"); + Assert.IsTrue(await e.MoveNextAsync(), "Expected the pre-error chunk"); + Assert.AreEqual("partial", e.Current); + + reader.PushEos(new LiveKit.Proto.StreamError { Description = "boom" }); + + StreamError caught = null; + try + { + await e.MoveNextAsync(); + } + catch (StreamError ex) + { + caught = ex; + } + + Assert.IsNotNull(caught, "Expected the error EoS to throw a StreamError"); + Assert.AreEqual("boom", caught.Message); + } + finally + { + await e.DisposeAsync(); + } + }); + + // A cancelled token surfaces as OperationCanceledException with abandon-awaiter + // semantics: nothing is observed and the underlying reader is untouched. + [UnityTest] + public System.Collections.IEnumerator AsAsyncEnumerable_Cancellation_ThrowsOperationCanceled() => UniTask.ToCoroutine(async () => + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + var observed = new List(); + bool threw = false; + try + { + await foreach (var chunk in reader.AsAsyncEnumerable(cts.Token)) + observed.Add(chunk); + } + catch (OperationCanceledException) + { + threw = true; + } + + Assert.IsTrue(threw, "Expected OperationCanceledException for a cancelled token"); + CollectionAssert.IsEmpty(observed); + Assert.IsFalse(reader.IsEos, "Abandon-awaiter semantics: reader state is untouched"); + }); + } +} +#endif diff --git a/Tests/PlayMode/UniTask/StreamUniTaskTests.cs.meta b/Tests/PlayMode/UniTask/StreamUniTaskTests.cs.meta new file mode 100644 index 00000000..a10435c8 --- /dev/null +++ b/Tests/PlayMode/UniTask/StreamUniTaskTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: a2e6312b068f8432fa2b267f28d3e10b +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: