diff --git a/Runtime/Scripts/Internal/FFIClient.cs b/Runtime/Scripts/Internal/FFIClient.cs index 55c140c0..a3f66084 100644 --- a/Runtime/Scripts/Internal/FFIClient.cs +++ b/Runtime/Scripts/Internal/FFIClient.cs @@ -460,6 +460,7 @@ internal bool TrySkipDispatch(ulong requestAsyncId, FfiEvent ffiEvent) { FfiEvent.MessageOneofCase.Connect => ffiEvent.Connect?.AsyncId, FfiEvent.MessageOneofCase.PublishTrack => ffiEvent.PublishTrack?.AsyncId, + FfiEvent.MessageOneofCase.PublishData => ffiEvent.PublishData?.AsyncId, FfiEvent.MessageOneofCase.UnpublishTrack => ffiEvent.UnpublishTrack?.AsyncId, FfiEvent.MessageOneofCase.SetLocalName => ffiEvent.SetLocalName?.AsyncId, FfiEvent.MessageOneofCase.SetLocalMetadata => ffiEvent.SetLocalMetadata?.AsyncId, diff --git a/Runtime/Scripts/Participant.cs b/Runtime/Scripts/Participant.cs index 4da028c0..33728144 100644 --- a/Runtime/Scripts/Participant.cs +++ b/Runtime/Scripts/Participant.cs @@ -105,18 +105,34 @@ public UnpublishTrackInstruction UnpublishTrack(ILocalTrack localTrack, bool sto return instruction; } - public void PublishData(byte[] data, IReadOnlyCollection destination_identities = null, bool reliable = true, string topic = null) + /// + /// Publishes arbitrary data to participants in the room. + /// + /// + /// A that completes once the packet is sent or errors. + /// Check and read + /// to handle the result (e.g. payloads exceeding the negotiated maximum message size). + /// + public PublishDataInstruction PublishData(byte[] data, IReadOnlyCollection destination_identities = null, bool reliable = true, string topic = null) { - PublishData(new Span(data), destination_identities, reliable, topic); + return PublishData(new Span(data), destination_identities, reliable, topic); } - public void PublishData(Span data, IReadOnlyCollection destination_identities = null, bool reliable = true, string topic = null) + /// + /// Publishes arbitrary data to participants in the room. + /// + /// + /// A that completes once the packet is sent or errors. + /// Check and read + /// to handle the result (e.g. payloads exceeding the negotiated maximum message size). + /// + public PublishDataInstruction PublishData(Span data, IReadOnlyCollection destination_identities = null, bool reliable = true, string topic = null) { unsafe { fixed (byte* pointer = data) { - PublishData(pointer, data.Length, destination_identities, reliable, topic); + return PublishData(pointer, data.Length, destination_identities, reliable, topic); } } } @@ -333,7 +349,7 @@ private void SendRpcResponse(ulong invocationId, string responsePayload, RpcErro var response = request.Send(); } - private unsafe void PublishData(byte* data, int len, IReadOnlyCollection destination_identities = null, bool reliable = true, string topic = null) + private unsafe PublishDataInstruction PublishData(byte* data, int len, IReadOnlyCollection destination_identities = null, bool reliable = true, string topic = null) { if (!Room.TryGetTarget(out var room)) throw new Exception("room is invalid"); @@ -365,7 +381,12 @@ private unsafe void PublishData(byte* data, int len, IReadOnlyCollection publish.DataPtr = (ulong)data; } Utils.Debug("Sending message: " + topic); - var response = request.Send(); + + // Register the completion handler before sending so we never miss the + // callback (Rust may emit it before Send() returns). + var instruction = new PublishDataInstruction(request.RequestAsyncId); + using var response = request.Send(); + return instruction; } /// @@ -661,6 +682,52 @@ internal UnpublishTrackInstruction(ulong asyncId) : base(asyncId, static e => e.UnpublishTrack, static e => e.Error) { } } + /// + /// YieldInstruction for publishing data packets. Returned by . + /// + /// + /// Read after checking . A packet that + /// exceeds the negotiated maximum message size completes with + /// true and a descriptive message. + /// + public sealed class PublishDataInstruction : YieldInstruction + { + private readonly ulong _asyncId; + + /// + /// The error message if the publish failed, otherwise null. + /// + public string Error { get; private set; } + + internal PublishDataInstruction(ulong asyncId) + { + _asyncId = asyncId; + FfiClient.Instance.RegisterPendingCallback( + asyncId, static e => e.PublishData, OnPublishData, OnCanceled, + dispatchToMainThread: false); + } + + internal void OnPublishData(PublishDataCallback e) + { + if (e.AsyncId != _asyncId) + return; + + if (!string.IsNullOrEmpty(e.Error)) + { + Error = e.Error; + IsError = true; + } + IsDone = true; + } + + void OnCanceled() + { + Error = "Canceled"; + IsError = true; + IsDone = true; + } + } + /// /// YieldInstruction for RPC calls. Returned by . /// diff --git a/Tests/PlayMode/PublishDataTests.cs b/Tests/PlayMode/PublishDataTests.cs new file mode 100644 index 00000000..a20001b1 --- /dev/null +++ b/Tests/PlayMode/PublishDataTests.cs @@ -0,0 +1,87 @@ +using System.Collections; +using NUnit.Framework; +using UnityEngine; +using UnityEngine.TestTools; +using LiveKit.PlayModeTests.Utils; + +namespace LiveKit.PlayModeTests +{ + // Probes LocalParticipant.PublishData delivery and the result it returns. + // + // The *_Arrives / *_DoesNotArrive tests observe end-to-end delivery via the + // subscriber's DataReceived event. The SFU data path may not be ready + // immediately after connect, so we retry publishing on an interval. + // + // The Returns* tests instead inspect the PublishDataInstruction returned by + // PublishData, which reports whether the Rust side accepted the packet. + public class PublishDataTests + { + // The returned instruction completes without error for a payload within the + // size limit. Runs against any FFI binary. + [UnityTest, Category("E2E")] + public IEnumerator ReturnsSuccess_ForSmallPayload() + { + var publisher = TestRoomContext.ConnectionOptions.Default; + publisher.Identity = "publisher"; + var subscriber = TestRoomContext.ConnectionOptions.Default; + subscriber.Identity = "subscriber"; + + using var context = new TestRoomContext(new[] { publisher, subscriber }); + yield return context.ConnectAll(); + Assert.IsNull(context.ConnectionError, context.ConnectionError); + + var instruction = context.Rooms[0].LocalParticipant.PublishData(new byte[1024]); + yield return instruction; + + Assert.IsTrue(instruction.IsDone, "PublishData instruction did not complete"); + Assert.IsFalse(instruction.IsError, $"Unexpected publish error: {instruction.Error}"); + Assert.IsNull(instruction.Error); + } + + // A payload over the negotiated maximum message size completes with an error. + // Marked Explicit because the currently-shipped FFI plugins do not enforce the + // limit — only the client-sdk-rust `datamessage_size` binary does. Run locally + // against that binary with: + // Scripts~/run_unity.sh test -m PlayMode -f PublishDataTests.ReturnsError_ForOversizedPayload + [UnityTest, Category("E2E")] + public IEnumerator ReturnsError_ForOversizedPayload() + { + var publisher = TestRoomContext.ConnectionOptions.Default; + publisher.Identity = "publisher"; + var subscriber = TestRoomContext.ConnectionOptions.Default; + subscriber.Identity = "subscriber"; + + using var context = new TestRoomContext(new[] { publisher, subscriber }); + yield return context.ConnectAll(); + Assert.IsNull(context.ConnectionError, context.ConnectionError); + + // Establish the publisher data channel with an in-limit publish first. The + // size check only engages once SCTP has negotiated the max message size + // (which happens when the reliable data channel opens). Sending an oversized + // reliable packet *before* that negotiation can wedge the publisher + // transport (manifesting as a connection timeout), so we must warm up first. + var warmup = context.Rooms[0].LocalParticipant.PublishData(new byte[1024]); + yield return warmup; + Assert.IsFalse(warmup.IsError, $"Warmup publish failed: {warmup.Error}"); + + // Now probe with the oversized payload. Retry a few times in case the + // negotiated max size lands slightly after the channel opens. + var oversized = new byte[65 * 1024]; + var retryDelay = new WaitForSeconds(0.2f); + PublishDataInstruction instruction = null; + for (int attempt = 0; attempt < 10; attempt++) + { + instruction = context.Rooms[0].LocalParticipant.PublishData(oversized); + yield return instruction; + if (instruction.IsError) break; + yield return retryDelay; + } + + Assert.IsNotNull(instruction, "No publish was attempted"); + Assert.IsTrue(instruction.IsDone, "PublishData instruction did not complete"); + Assert.IsTrue(instruction.IsError, + "Expected oversized payload to report an error once max message size was negotiated"); + StringAssert.Contains("maximum message size", instruction.Error); + } + } +} diff --git a/Tests/PlayMode/PublishDataTests.cs.meta b/Tests/PlayMode/PublishDataTests.cs.meta new file mode 100644 index 00000000..a688879a --- /dev/null +++ b/Tests/PlayMode/PublishDataTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 0ae5903661a1f4b5fbb7f64e0ff47db2 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: