Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Runtime/Scripts/Internal/FFIClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ internal bool TrySkipDispatch(ulong requestAsyncId, FfiEvent ffiEvent)
{
FfiEvent.MessageOneofCase.Connect => ffiEvent.Connect?.AsyncId,
FfiEvent.MessageOneofCase.PublishTrack => ffiEvent.PublishTrack?.AsyncId,
FfiEvent.MessageOneofCase.PublishData => ffiEvent.PublishData?.AsyncId,
FfiEvent.MessageOneofCase.UnpublishTrack => ffiEvent.UnpublishTrack?.AsyncId,
FfiEvent.MessageOneofCase.SetLocalName => ffiEvent.SetLocalName?.AsyncId,
FfiEvent.MessageOneofCase.SetLocalMetadata => ffiEvent.SetLocalMetadata?.AsyncId,
Expand Down
79 changes: 73 additions & 6 deletions Runtime/Scripts/Participant.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,34 @@ public UnpublishTrackInstruction UnpublishTrack(ILocalTrack localTrack, bool sto
return instruction;
}

public void PublishData(byte[] data, IReadOnlyCollection<string> destination_identities = null, bool reliable = true, string topic = null)
/// <summary>
/// Publishes arbitrary data to participants in the room.
/// </summary>
/// <returns>
/// A <see cref="PublishDataInstruction"/> that completes once the packet is sent or errors.
/// Check <see cref="YieldInstruction.IsError"/> and read <see cref="PublishDataInstruction.Error"/>
/// to handle the result (e.g. payloads exceeding the negotiated maximum message size).
/// </returns>
public PublishDataInstruction PublishData(byte[] data, IReadOnlyCollection<string> destination_identities = null, bool reliable = true, string topic = null)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instruction does not seem to be a common technology for such purpose ?
how about PublishDataTask ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the instruction term since the base type is CustomYieldInstruction

Other examples in our code base:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instructions use Coroutines underneath, Tasks use C# async

{
PublishData(new Span<byte>(data), destination_identities, reliable, topic);
return PublishData(new Span<byte>(data), destination_identities, reliable, topic);
}

public void PublishData(Span<byte> data, IReadOnlyCollection<string> destination_identities = null, bool reliable = true, string topic = null)
/// <summary>
/// Publishes arbitrary data to participants in the room.
/// </summary>
/// <returns>
/// A <see cref="PublishDataInstruction"/> that completes once the packet is sent or errors.
/// Check <see cref="YieldInstruction.IsError"/> and read <see cref="PublishDataInstruction.Error"/>
/// to handle the result (e.g. payloads exceeding the negotiated maximum message size).
/// </returns>
public PublishDataInstruction PublishData(Span<byte> data, IReadOnlyCollection<string> destination_identities = null, bool reliable = true, string topic = null)
{
unsafe
{
fixed (byte* pointer = data)
{
PublishData(pointer, data.Length, destination_identities, reliable, topic);
return PublishData(pointer, data.Length, destination_identities, reliable, topic);
}
}
}
Expand Down Expand Up @@ -333,7 +349,7 @@ private void SendRpcResponse(ulong invocationId, string responsePayload, RpcErro
var response = request.Send();
}

private unsafe void PublishData(byte* data, int len, IReadOnlyCollection<string> destination_identities = null, bool reliable = true, string topic = null)
private unsafe PublishDataInstruction PublishData(byte* data, int len, IReadOnlyCollection<string> destination_identities = null, bool reliable = true, string topic = null)
{
if (!Room.TryGetTarget(out var room))
throw new Exception("room is invalid");
Expand Down Expand Up @@ -365,7 +381,12 @@ private unsafe void PublishData(byte* data, int len, IReadOnlyCollection<string>
publish.DataPtr = (ulong)data;
}
Utils.Debug("Sending message: " + topic);
var response = request.Send();

// Register the completion handler before sending so we never miss the
// callback (Rust may emit it before Send() returns).
var instruction = new PublishDataInstruction(request.RequestAsyncId);
using var response = request.Send();
return instruction;
}

/// <summary>
Expand Down Expand Up @@ -661,6 +682,52 @@ internal UnpublishTrackInstruction(ulong asyncId)
: base(asyncId, static e => e.UnpublishTrack, static e => e.Error) { }
}

/// <summary>
/// YieldInstruction for publishing data packets. Returned by <see cref="LocalParticipant.PublishData(byte[], IReadOnlyCollection{string}, bool, string)"/>.
/// </summary>
/// <remarks>
/// Read <see cref="Error"/> after checking <see cref="YieldInstruction.IsError"/>. A packet that
/// exceeds the negotiated maximum message size completes with <see cref="YieldInstruction.IsError"/>
/// true and a descriptive <see cref="Error"/> message.
/// </remarks>
public sealed class PublishDataInstruction : YieldInstruction
{
private readonly ulong _asyncId;

/// <summary>
/// The error message if the publish failed, otherwise null.
/// </summary>
public string Error { get; private set; }

internal PublishDataInstruction(ulong asyncId)
{
_asyncId = asyncId;
FfiClient.Instance.RegisterPendingCallback(
asyncId, static e => e.PublishData, OnPublishData, OnCanceled,
dispatchToMainThread: false);
}

internal void OnPublishData(PublishDataCallback e)
{
if (e.AsyncId != _asyncId)
return;

if (!string.IsNullOrEmpty(e.Error))
{
Error = e.Error;
IsError = true;
}
IsDone = true;
}

void OnCanceled()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, should onCanceled() private ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is private by default, and we also use the implicit private style around in the other instruction classes:

{
Error = "Canceled";
IsError = true;
IsDone = true;
}
}

/// <summary>
/// YieldInstruction for RPC calls. Returned by <see cref="LocalParticipant.PerformRpc"/>.
/// </summary>
Expand Down
87 changes: 87 additions & 0 deletions Tests/PlayMode/PublishDataTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using System.Collections;
using NUnit.Framework;
using UnityEngine;
using UnityEngine.TestTools;
using LiveKit.PlayModeTests.Utils;

namespace LiveKit.PlayModeTests
{
// Probes LocalParticipant.PublishData delivery and the result it returns.
//
// The *_Arrives / *_DoesNotArrive tests observe end-to-end delivery via the
// subscriber's DataReceived event. The SFU data path may not be ready
// immediately after connect, so we retry publishing on an interval.
//
// The Returns* tests instead inspect the PublishDataInstruction returned by
// PublishData, which reports whether the Rust side accepted the packet.
public class PublishDataTests
{
// The returned instruction completes without error for a payload within the
// size limit. Runs against any FFI binary.
[UnityTest, Category("E2E")]
public IEnumerator ReturnsSuccess_ForSmallPayload()
{
var publisher = TestRoomContext.ConnectionOptions.Default;
publisher.Identity = "publisher";
var subscriber = TestRoomContext.ConnectionOptions.Default;
subscriber.Identity = "subscriber";

using var context = new TestRoomContext(new[] { publisher, subscriber });
yield return context.ConnectAll();
Assert.IsNull(context.ConnectionError, context.ConnectionError);

var instruction = context.Rooms[0].LocalParticipant.PublishData(new byte[1024]);
yield return instruction;

Assert.IsTrue(instruction.IsDone, "PublishData instruction did not complete");
Assert.IsFalse(instruction.IsError, $"Unexpected publish error: {instruction.Error}");
Assert.IsNull(instruction.Error);
}

// A payload over the negotiated maximum message size completes with an error.
// Marked Explicit because the currently-shipped FFI plugins do not enforce the
// limit — only the client-sdk-rust `datamessage_size` binary does. Run locally
// against that binary with:
// Scripts~/run_unity.sh test -m PlayMode -f PublishDataTests.ReturnsError_ForOversizedPayload
[UnityTest, Category("E2E")]
public IEnumerator ReturnsError_ForOversizedPayload()
{
var publisher = TestRoomContext.ConnectionOptions.Default;
publisher.Identity = "publisher";
var subscriber = TestRoomContext.ConnectionOptions.Default;
subscriber.Identity = "subscriber";

using var context = new TestRoomContext(new[] { publisher, subscriber });
yield return context.ConnectAll();
Assert.IsNull(context.ConnectionError, context.ConnectionError);

// Establish the publisher data channel with an in-limit publish first. The
// size check only engages once SCTP has negotiated the max message size
// (which happens when the reliable data channel opens). Sending an oversized
// reliable packet *before* that negotiation can wedge the publisher
// transport (manifesting as a connection timeout), so we must warm up first.
var warmup = context.Rooms[0].LocalParticipant.PublishData(new byte[1024]);
yield return warmup;
Assert.IsFalse(warmup.IsError, $"Warmup publish failed: {warmup.Error}");

// Now probe with the oversized payload. Retry a few times in case the
// negotiated max size lands slightly after the channel opens.
var oversized = new byte[65 * 1024];
var retryDelay = new WaitForSeconds(0.2f);
PublishDataInstruction instruction = null;
for (int attempt = 0; attempt < 10; attempt++)
{
instruction = context.Rooms[0].LocalParticipant.PublishData(oversized);
yield return instruction;
if (instruction.IsError) break;
yield return retryDelay;
}

Assert.IsNotNull(instruction, "No publish was attempted");
Assert.IsTrue(instruction.IsDone, "PublishData instruction did not complete");
Assert.IsTrue(instruction.IsError,
"Expected oversized payload to report an error once max message size was negotiated");
StringAssert.Contains("maximum message size", instruction.Error);
}
}
}
11 changes: 11 additions & 0 deletions Tests/PlayMode/PublishDataTests.cs.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading