From 3f5a9a92858edc500fcaac9a50bdfbdcabdae33d Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Thu, 21 May 2026 17:20:48 +0200 Subject: [PATCH 1/2] Add GetAwaiter to YieldInstruction and StreamYieldInstruction Stage 1 of the UniTask migration: enable `await room.Connect(...)` and similar without taking on a UniTask dependency. The awaiter's continuation is invoked from the existing IsDone / IsCurrentReadDone / IsEos property setters, so all nine concrete instructions (Connect, PublishTrack, RPC, SendText/File, stream open/write/close, etc.) become awaitable with no change to their completion code paths. Race between FFI-thread completion and main-thread await registration is resolved with a sentinel-value Interlocked.CompareExchange on a single continuation slot. GetResult() is intentionally a no-op so the await surface keeps strict parity with `yield return` (callers still inspect IsError); a throwing variant can be layered on later. Co-Authored-By: Claude Opus 4.7 (1M context) --- Runtime/Scripts/Internal/YieldInstruction.cs | 144 ++++++++++++++++++- Tests/PlayMode/RoomTests.cs | 30 ++++ 2 files changed, 171 insertions(+), 3 deletions(-) diff --git a/Runtime/Scripts/Internal/YieldInstruction.cs b/Runtime/Scripts/Internal/YieldInstruction.cs index 748e884d..d6520c47 100644 --- a/Runtime/Scripts/Internal/YieldInstruction.cs +++ b/Runtime/Scripts/Internal/YieldInstruction.cs @@ -1,4 +1,6 @@ using System; +using System.Runtime.CompilerServices; +using System.Threading; using UnityEngine; namespace LiveKit @@ -13,10 +15,79 @@ public class YieldInstruction : CustomYieldInstruction private volatile bool _isDone; private volatile bool _isError; - public bool IsDone { get => _isDone; protected set => _isDone = value; } + // Sentinel published once completion has fired so any continuation registered + // afterwards runs inline instead of being silently dropped. + private static readonly Action s_completedSentinel = () => { }; + private Action? _continuation; + + public bool IsDone + { + get => _isDone; + protected set + { + _isDone = value; + if (value) InvokeContinuation(); + } + } public bool IsError { get => _isError; protected set => _isError = value; } public override bool keepWaiting => !_isDone; + + /// + /// Returns an awaiter so callers can await this instruction directly. + /// + /// + /// The awaiter completes when becomes true. As with the + /// coroutine path, success vs. failure is inspected on the instruction itself + /// ( and any subclass-specific result fields); GetResult + /// does not throw. + /// + public YieldInstructionAwaiter GetAwaiter() => new YieldInstructionAwaiter(this); + + internal void RegisterContinuation(Action continuation) + { + // Race between completion-side (FFI thread writes sentinel) and await-side + // (registers continuation): CompareExchange decides who wrote first. + // null -> we won, completion will invoke our continuation later + // sentinel -> completion already fired; invoke inline + // other -> a second awaiter beat us here, which we don't support + var prev = Interlocked.CompareExchange(ref _continuation, continuation, null); + if (prev == null) return; + if (ReferenceEquals(prev, s_completedSentinel)) + { + continuation(); + return; + } + throw new InvalidOperationException( + "YieldInstruction does not support multiple awaiters; await it only once."); + } + + private void InvokeContinuation() + { + var prev = Interlocked.Exchange(ref _continuation, s_completedSentinel); + if (prev != null && !ReferenceEquals(prev, s_completedSentinel)) + { + prev(); + } + } + } + + public readonly struct YieldInstructionAwaiter : INotifyCompletion + { + private readonly YieldInstruction _instruction; + + internal YieldInstructionAwaiter(YieldInstruction instruction) + { + _instruction = instruction; + } + + public bool IsCompleted => _instruction.IsDone; + + public void OnCompleted(Action continuation) => _instruction.RegisterContinuation(continuation); + + // Intentionally a no-op. Parity with the coroutine path: callers inspect IsError + // and subclass-specific result fields on the instruction itself. + public void GetResult() { } } public class StreamYieldInstruction : CustomYieldInstruction @@ -28,12 +99,31 @@ public class StreamYieldInstruction : CustomYieldInstruction private volatile bool _isEos; private volatile bool _isCurrentReadDone; + private static readonly Action s_completedSentinel = () => { }; + private Action? _continuation; + /// /// True if the stream has reached the end. /// - public bool IsEos { get => _isEos; protected set => _isEos = value; } + public bool IsEos + { + get => _isEos; + protected set + { + _isEos = value; + if (value) InvokeContinuation(); + } + } - internal bool IsCurrentReadDone { get => _isCurrentReadDone; set => _isCurrentReadDone = value; } + internal bool IsCurrentReadDone + { + get => _isCurrentReadDone; + set + { + _isCurrentReadDone = value; + if (value) InvokeContinuation(); + } + } public override bool keepWaiting => !_isCurrentReadDone && !_isEos; @@ -50,6 +140,54 @@ public override void Reset() throw new InvalidOperationException("Cannot reset after end of stream"); } _isCurrentReadDone = false; + // Drop the sentinel published by the previous completion so the next awaiter + // can install a fresh continuation. Safe because Reset is only called after the + // previous read's await has already resumed. + Volatile.Write(ref _continuation, null); + } + + /// + /// Returns an awaiter that completes when the next chunk is ready or the stream ends. + /// Call between iterations to await the following chunk. + /// + public StreamYieldInstructionAwaiter GetAwaiter() => new StreamYieldInstructionAwaiter(this); + + internal void RegisterContinuation(Action continuation) + { + var prev = Interlocked.CompareExchange(ref _continuation, continuation, null); + if (prev == null) return; + if (ReferenceEquals(prev, s_completedSentinel)) + { + continuation(); + return; + } + throw new InvalidOperationException( + "StreamYieldInstruction does not support multiple concurrent awaiters; await it once per chunk."); + } + + private void InvokeContinuation() + { + var prev = Interlocked.Exchange(ref _continuation, s_completedSentinel); + if (prev != null && !ReferenceEquals(prev, s_completedSentinel)) + { + prev(); + } } } + + public readonly struct StreamYieldInstructionAwaiter : INotifyCompletion + { + private readonly StreamYieldInstruction _instruction; + + internal StreamYieldInstructionAwaiter(StreamYieldInstruction instruction) + { + _instruction = instruction; + } + + public bool IsCompleted => _instruction.IsCurrentReadDone || _instruction.IsEos; + + public void OnCompleted(Action continuation) => _instruction.RegisterContinuation(continuation); + + public void GetResult() { } + } } diff --git a/Tests/PlayMode/RoomTests.cs b/Tests/PlayMode/RoomTests.cs index 1a871e5c..ee27f1da 100644 --- a/Tests/PlayMode/RoomTests.cs +++ b/Tests/PlayMode/RoomTests.cs @@ -1,5 +1,7 @@ using System.Collections; +using System.Threading.Tasks; using NUnit.Framework; +using UnityEngine; using UnityEngine.TestTools; using LiveKit.Proto; using LiveKit.PlayModeTests.Utils; @@ -26,6 +28,34 @@ public IEnumerator Connect_FailsWithInvalidUrl() Assert.IsNotNull(context.ConnectionError, "Expected connection to fail"); } + // Parity check for the awaitable surface added in Stage 1 of the UniTask migration: + // awaiting a ConnectInstruction must observe the same IsError signal that + // yield return does. The outer driver stays IEnumerator because Unity's PlayMode + // runner does not accept [Test] async Task — the await itself is what we're + // validating, wrapped in a Task that the coroutine polls. + [UnityTest, Category("E2E")] + public IEnumerator Connect_FailsWithInvalidUrl_Awaitable() + { + LogAssert.ignoreFailingMessages = true; + + using var room = new Room(); + var connect = room.Connect("invalid-url", "token", new RoomOptions()); + var awaitTask = AwaitInstruction(connect); + + yield return new WaitUntil(() => awaitTask.IsCompleted); + + LogAssert.ignoreFailingMessages = false; + + Assert.IsNull(awaitTask.Exception, awaitTask.Exception?.ToString()); + Assert.IsTrue(connect.IsDone, "Awaiter should not resume before IsDone"); + Assert.IsTrue(connect.IsError, "Expected connection to fail"); + } + + private static async Task AwaitInstruction(YieldInstruction instruction) + { + await instruction; + } + [UnityTest, Category("E2E")] public IEnumerator RoomName_MatchesProvided() { From cebe2d72da385e694c6c819f8c9d75d18ce14ca2 Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Tue, 9 Jun 2026 11:48:16 +0200 Subject: [PATCH 2/2] Make Stage 1 awaiter test deterministic instead of FFI-flaky MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Connect_FailsWithInvalidUrl_Awaitable failed intermittently in the full PlayMode suite: awaiting the ConnectInstruction resumes the instant IsDone is set, but the FFI emits its "error while connecting" log batch a frame or two later — after the test had already reset LogAssert.ignoreFailingMessages, so the late error surfaced as an unhandled message and failed the test. It only passed in isolation because the timing happened to line up. Replace it with two deterministic tests driven by a synthetic YieldInstruction subclass: one for the OnCompleted path (await registered while pending, then completed) and one for the IsCompleted fast path (already done before await). These exercise the GetAwaiter logic directly with no FFI, no dev server, and no LogAssert race. The real connect-fail path stays covered by the existing Connect_FailsWithInvalidUrl coroutine test. Co-Authored-By: Claude Opus 4.8 (1M context) --- Tests/PlayMode/RoomTests.cs | 50 ++++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/Tests/PlayMode/RoomTests.cs b/Tests/PlayMode/RoomTests.cs index ee27f1da..9cdc3031 100644 --- a/Tests/PlayMode/RoomTests.cs +++ b/Tests/PlayMode/RoomTests.cs @@ -28,27 +28,49 @@ public IEnumerator Connect_FailsWithInvalidUrl() Assert.IsNotNull(context.ConnectionError, "Expected connection to fail"); } - // Parity check for the awaitable surface added in Stage 1 of the UniTask migration: - // awaiting a ConnectInstruction must observe the same IsError signal that - // yield return does. The outer driver stays IEnumerator because Unity's PlayMode - // runner does not accept [Test] async Task — the await itself is what we're - // validating, wrapped in a Task that the coroutine polls. - [UnityTest, Category("E2E")] - public IEnumerator Connect_FailsWithInvalidUrl_Awaitable() + // Deterministic coverage of the GetAwaiter surface added in Stage 1, using a + // synthetic instruction so the awaiter logic is exercised without the FFI. These + // are intentionally NOT [Category("E2E")] — they need no dev server. The real + // connect-fail path stays covered by Connect_FailsWithInvalidUrl above; an earlier + // E2E variant of these was flaky because the FFI emits its error log asynchronously, + // which races LogAssert in the frame after the await has already resumed. + private sealed class TestYieldInstruction : YieldInstruction { - LogAssert.ignoreFailingMessages = true; + public void Complete() => IsDone = true; + public void CompleteWithError() { IsError = true; IsDone = true; } + } - using var room = new Room(); - var connect = room.Connect("invalid-url", "token", new RoomOptions()); - var awaitTask = AwaitInstruction(connect); + // OnCompleted path: await registers a continuation while the instruction is still + // pending, then completion fires it and IsError is visible on resume. + [UnityTest] + public IEnumerator GetAwaiter_ResumesOnCompletion_AndSurfacesIsError() + { + var instruction = new TestYieldInstruction(); + var awaitTask = AwaitInstruction(instruction); + Assert.IsFalse(awaitTask.IsCompleted, "Awaiter must not resume before IsDone"); + instruction.CompleteWithError(); yield return new WaitUntil(() => awaitTask.IsCompleted); - LogAssert.ignoreFailingMessages = false; + Assert.IsNull(awaitTask.Exception, awaitTask.Exception?.ToString()); + Assert.IsTrue(instruction.IsDone, "Awaiter resumed, so IsDone must be observable"); + Assert.IsTrue(instruction.IsError, "IsError must be visible on resume"); + } + + // IsCompleted fast path: instruction is already done before it is awaited, so the + // awaiter completes without ever registering a continuation. + [UnityTest] + public IEnumerator GetAwaiter_CompletesImmediately_WhenAlreadyDone() + { + var instruction = new TestYieldInstruction(); + instruction.Complete(); + + var awaitTask = AwaitInstruction(instruction); + yield return new WaitUntil(() => awaitTask.IsCompleted); Assert.IsNull(awaitTask.Exception, awaitTask.Exception?.ToString()); - Assert.IsTrue(connect.IsDone, "Awaiter should not resume before IsDone"); - Assert.IsTrue(connect.IsError, "Expected connection to fail"); + Assert.IsTrue(instruction.IsDone); + Assert.IsFalse(instruction.IsError); } private static async Task AwaitInstruction(YieldInstruction instruction)