Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/kind-foxes-wave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/rtc-node': patch
---

Add AbortSignal to waitFor() to clean up listeners on disconnect and send trailer on stream abort
29 changes: 26 additions & 3 deletions packages/livekit-rtc/src/ffi_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,37 @@ export class FfiClient extends (EventEmitter as new () => TypedEmitter<FfiClient
return livekitRetrievePtr(data);
}

async waitFor<T>(predicate: (ev: FfiEvent) => boolean): Promise<T> {
return new Promise<T>((resolve) => {
async waitFor<T>(
predicate: (ev: FfiEvent) => boolean,
options?: { signal?: AbortSignal },
): Promise<T> {
return new Promise<T>((resolve, reject) => {
const listener = (ev: FfiEvent) => {
if (predicate(ev)) {
this.off(FfiClientEvent.FfiEvent, listener);
cleanup();
resolve(ev.message.value as T);
}
};

const cleanup = () => {
this.off(FfiClientEvent.FfiEvent, listener);
options?.signal?.removeEventListener('abort', onAbort);
};

// If an AbortSignal is provided, remove the listener when the signal
// fires so that pending waitFor() calls don't leak listeners after
// the room disconnects or the operation is cancelled.
const onAbort = () => {
cleanup();
reject(new Error(options?.signal?.reason ?? 'waitFor aborted'));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
reject(new Error(options?.signal?.reason ?? 'waitFor aborted'));
reject(options?.signal?.reason ?? new Error('waitFor aborted'));

the expectation would be that reason is thrown directly and not further wrapped in an Error

};

if (options?.signal?.aborted) {
reject(new Error(options.signal.reason ?? 'waitFor aborted'));
return;
}

options?.signal?.addEventListener('abort', onAbort);
this.on(FfiClientEvent.FfiEvent, listener);
});
}
Expand Down
149 changes: 100 additions & 49 deletions packages/livekit-rtc/src/participant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,16 @@ export class LocalParticipant extends Participant {

private ffiEventLock: Mutex;

// Signal that fires when the owning Room disconnects, used to cancel
// pending FfiClient.waitFor() listeners so they don't leak.
private disconnectSignal: AbortSignal;

trackPublications: Map<string, LocalTrackPublication> = new Map();

constructor(info: OwnedParticipant, ffiEventLock: Mutex) {
constructor(info: OwnedParticipant, ffiEventLock: Mutex, disconnectSignal: AbortSignal) {
super(info);
this.ffiEventLock = ffiEventLock;
this.disconnectSignal = disconnectSignal;
}

async publishData(data: Uint8Array, options: DataPublishOptions) {
Expand All @@ -178,9 +183,10 @@ export class LocalParticipant extends Participant {
message: { case: 'publishData', value: req },
});

const cb = await FfiClient.instance.waitFor<PublishDataCallback>((ev) => {
return ev.message.case == 'publishData' && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<PublishDataCallback>(
(ev) => ev.message.case == 'publishData' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

if (cb.error) {
throw new Error(cb.error);
Expand All @@ -198,9 +204,10 @@ export class LocalParticipant extends Participant {
message: { case: 'publishSipDtmf', value: req },
});

const cb = await FfiClient.instance.waitFor<PublishSipDtmfCallback>((ev) => {
return ev.message.case == 'publishSipDtmf' && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<PublishSipDtmfCallback>(
(ev) => ev.message.case == 'publishSipDtmf' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

if (cb.error) {
throw new Error(cb.error);
Expand Down Expand Up @@ -229,9 +236,10 @@ export class LocalParticipant extends Participant {
message: { case: 'publishTranscription', value: req },
});

const cb = await FfiClient.instance.waitFor<PublishTranscriptionCallback>((ev) => {
return ev.message.case == 'publishTranscription' && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<PublishTranscriptionCallback>(
(ev) => ev.message.case == 'publishTranscription' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

if (cb.error) {
throw new Error(cb.error);
Expand All @@ -248,9 +256,10 @@ export class LocalParticipant extends Participant {
message: { case: 'setLocalMetadata', value: req },
});

await FfiClient.instance.waitFor<SetLocalMetadataCallback>((ev) => {
return ev.message.case == 'setLocalMetadata' && ev.message.value.asyncId == res.asyncId;
});
await FfiClient.instance.waitFor<SetLocalMetadataCallback>(
(ev) => ev.message.case == 'setLocalMetadata' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);
}

/**
Expand Down Expand Up @@ -335,8 +344,24 @@ export class LocalParticipant extends Participant {
});
await sendTrailer(trailerReq);
},
abort(err) {
// Send a trailer with the error reason so the remote side's stream
// controller is closed instead of waiting for data that won't arrive.
async abort(err) {
log.error('Sink error:', err);
try {
const trailerReq = new SendStreamTrailerRequest({
senderIdentity,
localParticipantHandle: localHandle,
destinationIdentities,
trailer: new DataStream_Trailer({
streamId,
reason: err instanceof Error ? err.message : String(err ?? ''),
}),
});
await sendTrailer(trailerReq);
} catch {
// Best-effort: the connection may already be gone.
}
},
});

Expand Down Expand Up @@ -450,8 +475,24 @@ export class LocalParticipant extends Participant {
});
await sendTrailer(trailerReq);
},
abort(err) {
// Send a trailer with the error reason so the remote side's stream
// controller is closed instead of waiting for data that won't arrive.
async abort(err) {
log.error('Sink error:', err);
try {
const trailerReq = new SendStreamTrailerRequest({
senderIdentity,
localParticipantHandle: localHandle,
destinationIdentities,
trailer: new DataStream_Trailer({
streamId,
reason: err instanceof Error ? err.message : String(err ?? ''),
}),
});
await sendTrailer(trailerReq);
} catch {
// Best-effort: the connection may already be gone.
}
},
});

Expand Down Expand Up @@ -494,44 +535,47 @@ export class LocalParticipant extends Participant {
message: { case: type, value: req },
});

const cb = await FfiClient.instance.waitFor<SendStreamHeaderCallback>((ev) => {
return ev.message.case == type && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<SendStreamHeaderCallback>(
(ev) => ev.message.case == type && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

if (cb.error) {
throw new Error(cb.error);
}
}

private async sendStreamChunk(req: SendStreamChunkRequest) {
private sendStreamChunk = async (req: SendStreamChunkRequest) => {
const type = 'sendStreamChunk';
const res = FfiClient.instance.request<SendStreamChunkResponse>({
message: { case: type, value: req },
});

const cb = await FfiClient.instance.waitFor<SendStreamChunkCallback>((ev) => {
return ev.message.case == type && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<SendStreamChunkCallback>(
(ev) => ev.message.case == type && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

if (cb.error) {
throw new Error(cb.error);
}
}
};

private async sendStreamTrailer(req: SendStreamTrailerRequest) {
private sendStreamTrailer = async (req: SendStreamTrailerRequest) => {
const type = 'sendStreamTrailer';
const res = FfiClient.instance.request<SendStreamTrailerResponse>({
message: { case: type, value: req },
});

const cb = await FfiClient.instance.waitFor<SendStreamTrailerCallback>((ev) => {
return ev.message.case == type && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<SendStreamTrailerCallback>(
(ev) => ev.message.case == type && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

if (cb.error) {
throw new Error(cb.error);
}
}
};

/**
* Sends a chat message to participants in the room
Expand All @@ -557,9 +601,10 @@ export class LocalParticipant extends Participant {
message: { case: 'sendChatMessage', value: req },
});

const cb = await FfiClient.instance.waitFor<SendChatMessageCallback>((ev) => {
return ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<SendChatMessageCallback>(
(ev) => ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

switch (cb.message.case) {
case 'chatMessage':
Expand Down Expand Up @@ -603,9 +648,10 @@ export class LocalParticipant extends Participant {
message: { case: 'editChatMessage', value: req },
});

const cb = await FfiClient.instance.waitFor<SendChatMessageCallback>((ev) => {
return ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<SendChatMessageCallback>(
(ev) => ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

switch (cb.message.case) {
case 'chatMessage':
Expand All @@ -632,9 +678,10 @@ export class LocalParticipant extends Participant {
message: { case: 'setLocalName', value: req },
});

await FfiClient.instance.waitFor<SetLocalNameCallback>((ev) => {
return ev.message.case == 'setLocalName' && ev.message.value.asyncId == res.asyncId;
});
await FfiClient.instance.waitFor<SetLocalNameCallback>(
(ev) => ev.message.case == 'setLocalName' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);
}

async setAttributes(attributes: Record<string, string>) {
Expand All @@ -647,9 +694,10 @@ export class LocalParticipant extends Participant {
message: { case: 'setLocalAttributes', value: req },
});

await FfiClient.instance.waitFor<SetLocalAttributesCallback>((ev) => {
return ev.message.case == 'setLocalAttributes' && ev.message.value.asyncId == res.asyncId;
});
await FfiClient.instance.waitFor<SetLocalAttributesCallback>(
(ev) => ev.message.case == 'setLocalAttributes' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);
}

async publishTrack(
Expand All @@ -669,9 +717,10 @@ export class LocalParticipant extends Participant {
});

try {
const cb = await FfiClient.instance.waitFor<PublishTrackCallback>((ev) => {
return ev.message.case == 'publishTrack' && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<PublishTrackCallback>(
(ev) => ev.message.case == 'publishTrack' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

switch (cb.message.case) {
case 'publication':
Expand Down Expand Up @@ -702,9 +751,10 @@ export class LocalParticipant extends Participant {
message: { case: 'unpublishTrack', value: req },
});

const cb = await FfiClient.instance.waitFor<UnpublishTrackCallback>((ev) => {
return ev.message.case == 'unpublishTrack' && ev.message.value.asyncId == res.asyncId;
});
const cb = await FfiClient.instance.waitFor<UnpublishTrackCallback>(
(ev) => ev.message.case == 'unpublishTrack' && ev.message.value.asyncId == res.asyncId,
{ signal: this.disconnectSignal },
);

if (cb.error) {
throw new Error(cb.error);
Expand Down Expand Up @@ -744,9 +794,10 @@ export class LocalParticipant extends Participant {
message: { case: 'performRpc', value: req },
});

const cb = await FfiClient.instance.waitFor<PerformRpcCallback>((ev) => {
return ev.message.case === 'performRpc' && ev.message.value.asyncId === res.asyncId;
});
const cb = await FfiClient.instance.waitFor<PerformRpcCallback>(
(ev) => ev.message.case === 'performRpc' && ev.message.value.asyncId === res.asyncId,
{ signal: this.disconnectSignal },
);

if (cb.error) {
throw RpcError.fromProto(cb.error);
Expand Down
16 changes: 16 additions & 0 deletions packages/livekit-rtc/src/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>

private preConnectEvents: FfiEvent[] = [];

// Aborted on disconnect to cancel any pending FfiClient.waitFor() listeners,
// preventing them from leaking when the room goes away.
private disconnectController = new AbortController();

private _token?: string;
private _serverUrl?: string;

Expand Down Expand Up @@ -241,9 +245,13 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
this._serverUrl = url;
this.info = cb.message.value.room!.info;
this.connectionState = ConnectionState.CONN_CONNECTED;
// Reset the abort controller for this connection session so that
// a previous disconnect doesn't immediately cancel new operations.
this.disconnectController = new AbortController();
this.localParticipant = new LocalParticipant(
cb.message.value.localParticipant!,
this.ffiEventLock,
this.disconnectController.signal,
);

for (const pt of cb.message.value.participants) {
Expand Down Expand Up @@ -283,6 +291,11 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
return ev.message.case == 'disconnect' && ev.message.value.asyncId == res.asyncId;
});

// Abort all pending FfiClient.waitFor() listeners so they don't leak.
// This causes any in-flight operations (publishData, publishTrack, etc.)
// to reject and clean up their event listeners.
this.disconnectController.abort('Room disconnected');
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.disconnectController.abort('Room disconnected');
this.disconnectController.abort();

as long as there is only one path to trigger this (and the naming is quite obvious) we can just stick to the default abort error being thrown


FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent);
this.removeAllListeners();
}
Expand Down Expand Up @@ -599,6 +612,9 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
/*} else if (ev.case == 'connected') {
this.emit(RoomEvent.Connected);*/
} else if (ev.case == 'disconnected') {
// Abort pending waitFor() listeners on server-initiated disconnect too,
// not just on explicit disconnect() calls.
this.disconnectController.abort('Room disconnected');
this.emit(RoomEvent.Disconnected, ev.value.reason!);
} else if (ev.case == 'reconnecting') {
this.emit(RoomEvent.Reconnecting);
Expand Down
Loading