From 0376479bbb7833a407d2d304e54119cd0e72bf9e Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Tue, 9 Jun 2026 11:14:00 +0200 Subject: [PATCH 1/4] Add PlayMode tests for PublishData size limits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit LocalParticipant.PublishData is fire-and-forget — the C# API has no callback, so server-side drops of oversized packets are silent. These tests probe the boundary empirically by publishing payloads of varying sizes between two participants and asserting delivery via the subscriber's DataReceived event. Retries on a 200 ms interval to avoid SFU warm-up flakiness. Verified against livekit-server 1.12.0: 1 KiB and 15 KiB arrive intact (length + bytes); 65 KiB is dropped. Co-Authored-By: Claude Opus 4.7 (1M context) --- Tests/PlayMode/PublishDataTests.cs | 84 +++++++++++++++++++++++++ Tests/PlayMode/PublishDataTests.cs.meta | 11 ++++ 2 files changed, 95 insertions(+) create mode 100644 Tests/PlayMode/PublishDataTests.cs create mode 100644 Tests/PlayMode/PublishDataTests.cs.meta diff --git a/Tests/PlayMode/PublishDataTests.cs b/Tests/PlayMode/PublishDataTests.cs new file mode 100644 index 00000000..d87bae06 --- /dev/null +++ b/Tests/PlayMode/PublishDataTests.cs @@ -0,0 +1,84 @@ +using System.Collections; +using NUnit.Framework; +using UnityEngine; +using UnityEngine.TestTools; +using LiveKit.PlayModeTests.Utils; + +namespace LiveKit.PlayModeTests +{ + // Probes the server-side size limit on LocalParticipant.PublishData. The C# API is + // fire-and-forget (no callback), so the only observable signal is whether the + // subscriber's DataReceived event fires within a timeout. The SFU data path may + // not be ready immediately after connect, so we retry publishing on an interval. + public class PublishDataTests + { + [UnityTest, Category("E2E")] + public IEnumerator Small_1KiB_Arrives() + { + yield return RunSizeProbe(1024, shouldArrive: true); + } + + [UnityTest, Category("E2E")] + public IEnumerator At_15KiB_Arrives() + { + yield return RunSizeProbe(15 * 1024, shouldArrive: true); + } + + [UnityTest, Category("E2E")] + public IEnumerator Above_64KiB_DoesNotArrive() + { + yield return RunSizeProbe(65 * 1024, shouldArrive: false); + } + + private static IEnumerator RunSizeProbe(int payloadBytes, bool shouldArrive) + { + var publisher = TestRoomContext.ConnectionOptions.Default; + publisher.Identity = "publisher"; + var subscriber = TestRoomContext.ConnectionOptions.Default; + subscriber.Identity = "subscriber"; + + using var context = new TestRoomContext(new[] { publisher, subscriber }); + yield return context.ConnectAll(); + Assert.IsNull(context.ConnectionError); + + var publisherRoom = context.Rooms[0]; + var subscriberRoom = context.Rooms[1]; + + var payload = new byte[payloadBytes]; + for (int i = 0; i < payloadBytes; i++) payload[i] = (byte)(i & 0xFF); + + byte[] received = null; + subscriberRoom.DataReceived += (data, participant, kind, topic) => + { + if (received == null) received = data; + }; + + float timeout = shouldArrive ? 5f : 3f; + float start = Time.realtimeSinceStartup; + float lastPublish = -1f; + const float interval = 0.2f; + while (received == null && Time.realtimeSinceStartup - start < timeout) + { + if (Time.realtimeSinceStartup - lastPublish >= interval) + { + publisherRoom.LocalParticipant.PublishData(payload); + lastPublish = Time.realtimeSinceStartup; + } + yield return null; + } + + if (shouldArrive) + { + Assert.IsNotNull(received, + $"Expected {payloadBytes}-byte payload to arrive within {timeout}s"); + Assert.AreEqual(payloadBytes, received.Length, "Received payload length mismatch"); + CollectionAssert.AreEqual(payload, received, "Received payload contents mismatch"); + } + else + { + Assert.IsNull(received, + $"Expected {payloadBytes}-byte payload to be dropped, but it arrived ({received?.Length} bytes)"); + } + } + } +} diff --git a/Tests/PlayMode/PublishDataTests.cs.meta b/Tests/PlayMode/PublishDataTests.cs.meta new file mode 100644 index 00000000..a688879a --- /dev/null +++ b/Tests/PlayMode/PublishDataTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 0ae5903661a1f4b5fbb7f64e0ff47db2 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: From ed651a4b22aebc77e06d84d7d5d9744d0a2b5885 Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Wed, 10 Jun 2026 14:04:50 +0200 Subject: [PATCH 2/4] Return a PublishDataInstruction from PublishData MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PublishData was fire-and-forget (void), so callers could not tell whether the Rust side accepted the packet. The FFI transport already carried the result end-to-end — PublishDataRequest has a request_async_id, Rust emits a PublishDataCallback { async_id, error }, and the data_task forwards the publish result into it — but the C# side discarded the event. Wire it through: - New PublishDataInstruction (YieldInstruction) exposing IsError + Error. - The three PublishData overloads now return it instead of void (source-compatible: callers ignoring the result still compile). - Route the callback by adding PublishData to FFIClient.ExtractRequestAsyncId. Packets over the negotiated maximum message size now surface the error ("data packet size (N bytes) exceeds the negotiated maximum message size (64000 bytes)") via instruction.Error. Tests: ReturnsSuccess_ForSmallPayload verifies the success path on any binary. ReturnsError_ForOversizedPayload (Explicit) verifies the size-limit error and requires the client-sdk-rust datamessage_size FFI binary. Co-Authored-By: Claude Opus 4.8 (1M context) --- Runtime/Scripts/Internal/FFIClient.cs | 1 + Runtime/Scripts/Participant.cs | 79 ++++++++++++++++++++++-- Tests/PlayMode/PublishDataTests.cs | 86 +++++++++++++++++++++++++-- 3 files changed, 156 insertions(+), 10 deletions(-) diff --git a/Runtime/Scripts/Internal/FFIClient.cs b/Runtime/Scripts/Internal/FFIClient.cs index 55c140c0..a3f66084 100644 --- a/Runtime/Scripts/Internal/FFIClient.cs +++ b/Runtime/Scripts/Internal/FFIClient.cs @@ -460,6 +460,7 @@ internal bool TrySkipDispatch(ulong requestAsyncId, FfiEvent ffiEvent) { FfiEvent.MessageOneofCase.Connect => ffiEvent.Connect?.AsyncId, FfiEvent.MessageOneofCase.PublishTrack => ffiEvent.PublishTrack?.AsyncId, + FfiEvent.MessageOneofCase.PublishData => ffiEvent.PublishData?.AsyncId, FfiEvent.MessageOneofCase.UnpublishTrack => ffiEvent.UnpublishTrack?.AsyncId, FfiEvent.MessageOneofCase.SetLocalName => ffiEvent.SetLocalName?.AsyncId, FfiEvent.MessageOneofCase.SetLocalMetadata => ffiEvent.SetLocalMetadata?.AsyncId, diff --git a/Runtime/Scripts/Participant.cs b/Runtime/Scripts/Participant.cs index 4da028c0..33728144 100644 --- a/Runtime/Scripts/Participant.cs +++ b/Runtime/Scripts/Participant.cs @@ -105,18 +105,34 @@ public UnpublishTrackInstruction UnpublishTrack(ILocalTrack localTrack, bool sto return instruction; } - public void PublishData(byte[] data, IReadOnlyCollection destination_identities = null, bool reliable = true, string topic = null) + /// + /// Publishes arbitrary data to participants in the room. + /// + /// + /// A that completes once the packet is sent or errors. + /// Check and read + /// to handle the result (e.g. payloads exceeding the negotiated maximum message size). + /// + public PublishDataInstruction PublishData(byte[] data, IReadOnlyCollection destination_identities = null, bool reliable = true, string topic = null) { - PublishData(new Span(data), destination_identities, reliable, topic); + return PublishData(new Span(data), destination_identities, reliable, topic); } - public void PublishData(Span data, IReadOnlyCollection destination_identities = null, bool reliable = true, string topic = null) + /// + /// Publishes arbitrary data to participants in the room. + /// + /// + /// A that completes once the packet is sent or errors. + /// Check and read + /// to handle the result (e.g. payloads exceeding the negotiated maximum message size). + /// + public PublishDataInstruction PublishData(Span data, IReadOnlyCollection destination_identities = null, bool reliable = true, string topic = null) { unsafe { fixed (byte* pointer = data) { - PublishData(pointer, data.Length, destination_identities, reliable, topic); + return PublishData(pointer, data.Length, destination_identities, reliable, topic); } } } @@ -333,7 +349,7 @@ private void SendRpcResponse(ulong invocationId, string responsePayload, RpcErro var response = request.Send(); } - private unsafe void PublishData(byte* data, int len, IReadOnlyCollection destination_identities = null, bool reliable = true, string topic = null) + private unsafe PublishDataInstruction PublishData(byte* data, int len, IReadOnlyCollection destination_identities = null, bool reliable = true, string topic = null) { if (!Room.TryGetTarget(out var room)) throw new Exception("room is invalid"); @@ -365,7 +381,12 @@ private unsafe void PublishData(byte* data, int len, IReadOnlyCollection publish.DataPtr = (ulong)data; } Utils.Debug("Sending message: " + topic); - var response = request.Send(); + + // Register the completion handler before sending so we never miss the + // callback (Rust may emit it before Send() returns). + var instruction = new PublishDataInstruction(request.RequestAsyncId); + using var response = request.Send(); + return instruction; } /// @@ -661,6 +682,52 @@ internal UnpublishTrackInstruction(ulong asyncId) : base(asyncId, static e => e.UnpublishTrack, static e => e.Error) { } } + /// + /// YieldInstruction for publishing data packets. Returned by . + /// + /// + /// Read after checking . A packet that + /// exceeds the negotiated maximum message size completes with + /// true and a descriptive message. + /// + public sealed class PublishDataInstruction : YieldInstruction + { + private readonly ulong _asyncId; + + /// + /// The error message if the publish failed, otherwise null. + /// + public string Error { get; private set; } + + internal PublishDataInstruction(ulong asyncId) + { + _asyncId = asyncId; + FfiClient.Instance.RegisterPendingCallback( + asyncId, static e => e.PublishData, OnPublishData, OnCanceled, + dispatchToMainThread: false); + } + + internal void OnPublishData(PublishDataCallback e) + { + if (e.AsyncId != _asyncId) + return; + + if (!string.IsNullOrEmpty(e.Error)) + { + Error = e.Error; + IsError = true; + } + IsDone = true; + } + + void OnCanceled() + { + Error = "Canceled"; + IsError = true; + IsDone = true; + } + } + /// /// YieldInstruction for RPC calls. Returned by . /// diff --git a/Tests/PlayMode/PublishDataTests.cs b/Tests/PlayMode/PublishDataTests.cs index d87bae06..723a3b93 100644 --- a/Tests/PlayMode/PublishDataTests.cs +++ b/Tests/PlayMode/PublishDataTests.cs @@ -6,10 +6,14 @@ namespace LiveKit.PlayModeTests { - // Probes the server-side size limit on LocalParticipant.PublishData. The C# API is - // fire-and-forget (no callback), so the only observable signal is whether the - // subscriber's DataReceived event fires within a timeout. The SFU data path may - // not be ready immediately after connect, so we retry publishing on an interval. + // Probes LocalParticipant.PublishData delivery and the result it returns. + // + // The *_Arrives / *_DoesNotArrive tests observe end-to-end delivery via the + // subscriber's DataReceived event. The SFU data path may not be ready + // immediately after connect, so we retry publishing on an interval. + // + // The Returns* tests instead inspect the PublishDataInstruction returned by + // PublishData, which reports whether the Rust side accepted the packet. public class PublishDataTests { [UnityTest, Category("E2E")] @@ -30,6 +34,80 @@ public IEnumerator Above_64KiB_DoesNotArrive() yield return RunSizeProbe(65 * 1024, shouldArrive: false); } + // The returned instruction completes without error for a payload within the + // size limit. Runs against any FFI binary. + [UnityTest, Category("E2E")] + public IEnumerator ReturnsSuccess_ForSmallPayload() + { + var publisher = TestRoomContext.ConnectionOptions.Default; + publisher.Identity = "publisher"; + var subscriber = TestRoomContext.ConnectionOptions.Default; + subscriber.Identity = "subscriber"; + + using var context = new TestRoomContext(new[] { publisher, subscriber }); + yield return context.ConnectAll(); + Assert.IsNull(context.ConnectionError, context.ConnectionError); + + var instruction = context.Rooms[0].LocalParticipant.PublishData(new byte[1024]); + yield return instruction; + + Assert.IsTrue(instruction.IsDone, "PublishData instruction did not complete"); + Assert.IsFalse(instruction.IsError, $"Unexpected publish error: {instruction.Error}"); + Assert.IsNull(instruction.Error); + } + + // A payload over the negotiated maximum message size completes with an error. + // Marked Explicit because the currently-shipped FFI plugins do not enforce the + // limit — only the client-sdk-rust `datamessage_size` binary does. Run locally + // against that binary with: + // Scripts~/run_unity.sh test -m PlayMode -f PublishDataTests.ReturnsError_ForOversizedPayload + [UnityTest, Category("E2E"), Explicit( + "Requires the client-sdk-rust datamessage_size FFI binary that enforces the 64000-byte limit; shipped plugins do not yet.")] + public IEnumerator ReturnsError_ForOversizedPayload() + { + var publisher = TestRoomContext.ConnectionOptions.Default; + publisher.Identity = "publisher"; + var subscriber = TestRoomContext.ConnectionOptions.Default; + subscriber.Identity = "subscriber"; + + using var context = new TestRoomContext(new[] { publisher, subscriber }); + yield return context.ConnectAll(); + Assert.IsNull(context.ConnectionError, context.ConnectionError); + + // With LK_VERBOSE defined, the native layer forwards the size-limit log to + // Unity, which would otherwise fail the test. Ignore failing log messages + // for the rest of the test (same approach as RoomTests). + LogAssert.ignoreFailingMessages = true; + + // Establish the publisher data channel with an in-limit publish first. The + // size check only engages once SCTP has negotiated the max message size + // (which happens when the reliable data channel opens). Sending an oversized + // reliable packet *before* that negotiation can wedge the publisher + // transport (manifesting as a connection timeout), so we must warm up first. + var warmup = context.Rooms[0].LocalParticipant.PublishData(new byte[1024]); + yield return warmup; + Assert.IsFalse(warmup.IsError, $"Warmup publish failed: {warmup.Error}"); + + // Now probe with the oversized payload. Retry a few times in case the + // negotiated max size lands slightly after the channel opens. + var oversized = new byte[65 * 1024]; + var retryDelay = new WaitForSeconds(0.2f); + PublishDataInstruction instruction = null; + for (int attempt = 0; attempt < 10; attempt++) + { + instruction = context.Rooms[0].LocalParticipant.PublishData(oversized); + yield return instruction; + if (instruction.IsError) break; + yield return retryDelay; + } + + Assert.IsNotNull(instruction, "No publish was attempted"); + Assert.IsTrue(instruction.IsDone, "PublishData instruction did not complete"); + Assert.IsTrue(instruction.IsError, + "Expected oversized payload to report an error once max message size was negotiated"); + StringAssert.Contains("maximum message size", instruction.Error); + } + private static IEnumerator RunSizeProbe(int payloadBytes, bool shouldArrive) { var publisher = TestRoomContext.ConnectionOptions.Default; From 565c354d19d5f4ed8c4da1a93bad974578278ba1 Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Wed, 10 Jun 2026 14:10:14 +0200 Subject: [PATCH 3/4] Clean up old simple tests --- Tests/PlayMode/PublishDataTests.cs | 72 +----------------------------- 1 file changed, 1 insertion(+), 71 deletions(-) diff --git a/Tests/PlayMode/PublishDataTests.cs b/Tests/PlayMode/PublishDataTests.cs index 723a3b93..40827017 100644 --- a/Tests/PlayMode/PublishDataTests.cs +++ b/Tests/PlayMode/PublishDataTests.cs @@ -16,24 +16,6 @@ namespace LiveKit.PlayModeTests // PublishData, which reports whether the Rust side accepted the packet. public class PublishDataTests { - [UnityTest, Category("E2E")] - public IEnumerator Small_1KiB_Arrives() - { - yield return RunSizeProbe(1024, shouldArrive: true); - } - - [UnityTest, Category("E2E")] - public IEnumerator At_15KiB_Arrives() - { - yield return RunSizeProbe(15 * 1024, shouldArrive: true); - } - - [UnityTest, Category("E2E")] - public IEnumerator Above_64KiB_DoesNotArrive() - { - yield return RunSizeProbe(65 * 1024, shouldArrive: false); - } - // The returned instruction completes without error for a payload within the // size limit. Runs against any FFI binary. [UnityTest, Category("E2E")] @@ -61,8 +43,7 @@ public IEnumerator ReturnsSuccess_ForSmallPayload() // limit — only the client-sdk-rust `datamessage_size` binary does. Run locally // against that binary with: // Scripts~/run_unity.sh test -m PlayMode -f PublishDataTests.ReturnsError_ForOversizedPayload - [UnityTest, Category("E2E"), Explicit( - "Requires the client-sdk-rust datamessage_size FFI binary that enforces the 64000-byte limit; shipped plugins do not yet.")] + [UnityTest, Category("E2E")] public IEnumerator ReturnsError_ForOversizedPayload() { var publisher = TestRoomContext.ConnectionOptions.Default; @@ -107,56 +88,5 @@ public IEnumerator ReturnsError_ForOversizedPayload() "Expected oversized payload to report an error once max message size was negotiated"); StringAssert.Contains("maximum message size", instruction.Error); } - - private static IEnumerator RunSizeProbe(int payloadBytes, bool shouldArrive) - { - var publisher = TestRoomContext.ConnectionOptions.Default; - publisher.Identity = "publisher"; - var subscriber = TestRoomContext.ConnectionOptions.Default; - subscriber.Identity = "subscriber"; - - using var context = new TestRoomContext(new[] { publisher, subscriber }); - yield return context.ConnectAll(); - Assert.IsNull(context.ConnectionError); - - var publisherRoom = context.Rooms[0]; - var subscriberRoom = context.Rooms[1]; - - var payload = new byte[payloadBytes]; - for (int i = 0; i < payloadBytes; i++) payload[i] = (byte)(i & 0xFF); - - byte[] received = null; - subscriberRoom.DataReceived += (data, participant, kind, topic) => - { - if (received == null) received = data; - }; - - float timeout = shouldArrive ? 5f : 3f; - float start = Time.realtimeSinceStartup; - float lastPublish = -1f; - const float interval = 0.2f; - while (received == null && Time.realtimeSinceStartup - start < timeout) - { - if (Time.realtimeSinceStartup - lastPublish >= interval) - { - publisherRoom.LocalParticipant.PublishData(payload); - lastPublish = Time.realtimeSinceStartup; - } - yield return null; - } - - if (shouldArrive) - { - Assert.IsNotNull(received, - $"Expected {payloadBytes}-byte payload to arrive within {timeout}s"); - Assert.AreEqual(payloadBytes, received.Length, "Received payload length mismatch"); - CollectionAssert.AreEqual(payload, received, "Received payload contents mismatch"); - } - else - { - Assert.IsNull(received, - $"Expected {payloadBytes}-byte payload to be dropped, but it arrived ({received?.Length} bytes)"); - } - } } } From 2f0c61891eac23e2ee70d125b658eeca961f38fe Mon Sep 17 00:00:00 2001 From: Max Heimbrock <43608204+MaxHeimbrock@users.noreply.github.com> Date: Wed, 10 Jun 2026 14:20:22 +0200 Subject: [PATCH 4/4] No ignoring messages since it is only warn --- Tests/PlayMode/PublishDataTests.cs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/Tests/PlayMode/PublishDataTests.cs b/Tests/PlayMode/PublishDataTests.cs index 40827017..a20001b1 100644 --- a/Tests/PlayMode/PublishDataTests.cs +++ b/Tests/PlayMode/PublishDataTests.cs @@ -55,11 +55,6 @@ public IEnumerator ReturnsError_ForOversizedPayload() yield return context.ConnectAll(); Assert.IsNull(context.ConnectionError, context.ConnectionError); - // With LK_VERBOSE defined, the native layer forwards the size-limit log to - // Unity, which would otherwise fail the test. Ignore failing log messages - // for the rest of the test (same approach as RoomTests). - LogAssert.ignoreFailingMessages = true; - // Establish the publisher data channel with an in-limit publish first. The // size check only engages once SCTP has negotiated the max message size // (which happens when the reliable data channel opens). Sending an oversized