-
Notifications
You must be signed in to change notification settings - Fork 61
Return a PublishDataInstruction from PublishData #301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -105,18 +105,34 @@ public UnpublishTrackInstruction UnpublishTrack(ILocalTrack localTrack, bool sto | |||
| return instruction; | ||||
| } | ||||
|
|
||||
| public void PublishData(byte[] data, IReadOnlyCollection<string> destination_identities = null, bool reliable = true, string topic = null) | ||||
| /// <summary> | ||||
| /// Publishes arbitrary data to participants in the room. | ||||
| /// </summary> | ||||
| /// <returns> | ||||
| /// A <see cref="PublishDataInstruction"/> that completes once the packet is sent or errors. | ||||
| /// Check <see cref="YieldInstruction.IsError"/> and read <see cref="PublishDataInstruction.Error"/> | ||||
| /// to handle the result (e.g. payloads exceeding the negotiated maximum message size). | ||||
| /// </returns> | ||||
| public PublishDataInstruction PublishData(byte[] data, IReadOnlyCollection<string> destination_identities = null, bool reliable = true, string topic = null) | ||||
| { | ||||
| PublishData(new Span<byte>(data), destination_identities, reliable, topic); | ||||
| return PublishData(new Span<byte>(data), destination_identities, reliable, topic); | ||||
| } | ||||
|
|
||||
| public void PublishData(Span<byte> data, IReadOnlyCollection<string> destination_identities = null, bool reliable = true, string topic = null) | ||||
| /// <summary> | ||||
| /// Publishes arbitrary data to participants in the room. | ||||
| /// </summary> | ||||
| /// <returns> | ||||
| /// A <see cref="PublishDataInstruction"/> that completes once the packet is sent or errors. | ||||
| /// Check <see cref="YieldInstruction.IsError"/> and read <see cref="PublishDataInstruction.Error"/> | ||||
| /// to handle the result (e.g. payloads exceeding the negotiated maximum message size). | ||||
| /// </returns> | ||||
| public PublishDataInstruction PublishData(Span<byte> data, IReadOnlyCollection<string> destination_identities = null, bool reliable = true, string topic = null) | ||||
| { | ||||
| unsafe | ||||
| { | ||||
| fixed (byte* pointer = data) | ||||
| { | ||||
| PublishData(pointer, data.Length, destination_identities, reliable, topic); | ||||
| return PublishData(pointer, data.Length, destination_identities, reliable, topic); | ||||
| } | ||||
| } | ||||
| } | ||||
|
|
@@ -333,7 +349,7 @@ private void SendRpcResponse(ulong invocationId, string responsePayload, RpcErro | |||
| var response = request.Send(); | ||||
| } | ||||
|
|
||||
| private unsafe void PublishData(byte* data, int len, IReadOnlyCollection<string> destination_identities = null, bool reliable = true, string topic = null) | ||||
| private unsafe PublishDataInstruction PublishData(byte* data, int len, IReadOnlyCollection<string> destination_identities = null, bool reliable = true, string topic = null) | ||||
| { | ||||
| if (!Room.TryGetTarget(out var room)) | ||||
| throw new Exception("room is invalid"); | ||||
|
|
@@ -365,7 +381,12 @@ private unsafe void PublishData(byte* data, int len, IReadOnlyCollection<string> | |||
| publish.DataPtr = (ulong)data; | ||||
| } | ||||
| Utils.Debug("Sending message: " + topic); | ||||
| var response = request.Send(); | ||||
|
|
||||
| // Register the completion handler before sending so we never miss the | ||||
| // callback (Rust may emit it before Send() returns). | ||||
| var instruction = new PublishDataInstruction(request.RequestAsyncId); | ||||
| using var response = request.Send(); | ||||
| return instruction; | ||||
| } | ||||
|
|
||||
| /// <summary> | ||||
|
|
@@ -661,6 +682,52 @@ internal UnpublishTrackInstruction(ulong asyncId) | |||
| : base(asyncId, static e => e.UnpublishTrack, static e => e.Error) { } | ||||
| } | ||||
|
|
||||
| /// <summary> | ||||
| /// YieldInstruction for publishing data packets. Returned by <see cref="LocalParticipant.PublishData(byte[], IReadOnlyCollection{string}, bool, string)"/>. | ||||
| /// </summary> | ||||
| /// <remarks> | ||||
| /// Read <see cref="Error"/> after checking <see cref="YieldInstruction.IsError"/>. A packet that | ||||
| /// exceeds the negotiated maximum message size completes with <see cref="YieldInstruction.IsError"/> | ||||
| /// true and a descriptive <see cref="Error"/> message. | ||||
| /// </remarks> | ||||
| public sealed class PublishDataInstruction : YieldInstruction | ||||
| { | ||||
| private readonly ulong _asyncId; | ||||
|
|
||||
| /// <summary> | ||||
| /// The error message if the publish failed, otherwise null. | ||||
| /// </summary> | ||||
| public string Error { get; private set; } | ||||
|
|
||||
| internal PublishDataInstruction(ulong asyncId) | ||||
| { | ||||
| _asyncId = asyncId; | ||||
| FfiClient.Instance.RegisterPendingCallback( | ||||
| asyncId, static e => e.PublishData, OnPublishData, OnCanceled, | ||||
| dispatchToMainThread: false); | ||||
| } | ||||
|
|
||||
| internal void OnPublishData(PublishDataCallback e) | ||||
| { | ||||
| if (e.AsyncId != _asyncId) | ||||
| return; | ||||
|
|
||||
| if (!string.IsNullOrEmpty(e.Error)) | ||||
| { | ||||
| Error = e.Error; | ||||
| IsError = true; | ||||
| } | ||||
| IsDone = true; | ||||
| } | ||||
|
|
||||
| void OnCanceled() | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit, should onCanceled() private ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is private by default, and we also use the implicit private style around in the other instruction classes:
|
||||
| { | ||||
| Error = "Canceled"; | ||||
| IsError = true; | ||||
| IsDone = true; | ||||
| } | ||||
| } | ||||
|
|
||||
| /// <summary> | ||||
| /// YieldInstruction for RPC calls. Returned by <see cref="LocalParticipant.PerformRpc"/>. | ||||
| /// </summary> | ||||
|
|
||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| using System.Collections; | ||
| using NUnit.Framework; | ||
| using UnityEngine; | ||
| using UnityEngine.TestTools; | ||
| using LiveKit.PlayModeTests.Utils; | ||
|
|
||
| namespace LiveKit.PlayModeTests | ||
| { | ||
| // Probes LocalParticipant.PublishData delivery and the result it returns. | ||
| // | ||
| // The *_Arrives / *_DoesNotArrive tests observe end-to-end delivery via the | ||
| // subscriber's DataReceived event. The SFU data path may not be ready | ||
| // immediately after connect, so we retry publishing on an interval. | ||
| // | ||
| // The Returns* tests instead inspect the PublishDataInstruction returned by | ||
| // PublishData, which reports whether the Rust side accepted the packet. | ||
| public class PublishDataTests | ||
| { | ||
| // The returned instruction completes without error for a payload within the | ||
| // size limit. Runs against any FFI binary. | ||
| [UnityTest, Category("E2E")] | ||
| public IEnumerator ReturnsSuccess_ForSmallPayload() | ||
| { | ||
| var publisher = TestRoomContext.ConnectionOptions.Default; | ||
| publisher.Identity = "publisher"; | ||
| var subscriber = TestRoomContext.ConnectionOptions.Default; | ||
| subscriber.Identity = "subscriber"; | ||
|
|
||
| using var context = new TestRoomContext(new[] { publisher, subscriber }); | ||
| yield return context.ConnectAll(); | ||
| Assert.IsNull(context.ConnectionError, context.ConnectionError); | ||
|
|
||
| var instruction = context.Rooms[0].LocalParticipant.PublishData(new byte[1024]); | ||
| yield return instruction; | ||
|
|
||
| Assert.IsTrue(instruction.IsDone, "PublishData instruction did not complete"); | ||
| Assert.IsFalse(instruction.IsError, $"Unexpected publish error: {instruction.Error}"); | ||
| Assert.IsNull(instruction.Error); | ||
| } | ||
|
|
||
| // A payload over the negotiated maximum message size completes with an error. | ||
| // Marked Explicit because the currently-shipped FFI plugins do not enforce the | ||
| // limit — only the client-sdk-rust `datamessage_size` binary does. Run locally | ||
| // against that binary with: | ||
| // Scripts~/run_unity.sh test -m PlayMode -f PublishDataTests.ReturnsError_ForOversizedPayload | ||
| [UnityTest, Category("E2E")] | ||
| public IEnumerator ReturnsError_ForOversizedPayload() | ||
| { | ||
| var publisher = TestRoomContext.ConnectionOptions.Default; | ||
| publisher.Identity = "publisher"; | ||
| var subscriber = TestRoomContext.ConnectionOptions.Default; | ||
| subscriber.Identity = "subscriber"; | ||
|
|
||
| using var context = new TestRoomContext(new[] { publisher, subscriber }); | ||
| yield return context.ConnectAll(); | ||
| Assert.IsNull(context.ConnectionError, context.ConnectionError); | ||
|
|
||
| // Establish the publisher data channel with an in-limit publish first. The | ||
| // size check only engages once SCTP has negotiated the max message size | ||
| // (which happens when the reliable data channel opens). Sending an oversized | ||
| // reliable packet *before* that negotiation can wedge the publisher | ||
| // transport (manifesting as a connection timeout), so we must warm up first. | ||
| var warmup = context.Rooms[0].LocalParticipant.PublishData(new byte[1024]); | ||
| yield return warmup; | ||
| Assert.IsFalse(warmup.IsError, $"Warmup publish failed: {warmup.Error}"); | ||
|
|
||
| // Now probe with the oversized payload. Retry a few times in case the | ||
| // negotiated max size lands slightly after the channel opens. | ||
| var oversized = new byte[65 * 1024]; | ||
| var retryDelay = new WaitForSeconds(0.2f); | ||
| PublishDataInstruction instruction = null; | ||
| for (int attempt = 0; attempt < 10; attempt++) | ||
| { | ||
| instruction = context.Rooms[0].LocalParticipant.PublishData(oversized); | ||
| yield return instruction; | ||
| if (instruction.IsError) break; | ||
| yield return retryDelay; | ||
| } | ||
|
|
||
| Assert.IsNotNull(instruction, "No publish was attempted"); | ||
| Assert.IsTrue(instruction.IsDone, "PublishData instruction did not complete"); | ||
| Assert.IsTrue(instruction.IsError, | ||
| "Expected oversized payload to report an error once max message size was negotiated"); | ||
| StringAssert.Contains("maximum message size", instruction.Error); | ||
| } | ||
| } | ||
| } |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instruction does not seem to be a common technology for such purpose ?
how about PublishDataTask ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use the
instructionterm since the base type is CustomYieldInstructionOther examples in our code base:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instructions use Coroutines underneath, Tasks use C# async