diff --git a/.changeset/kind-foxes-wave.md b/.changeset/kind-foxes-wave.md new file mode 100644 index 00000000..96b7f17e --- /dev/null +++ b/.changeset/kind-foxes-wave.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Add AbortSignal to waitFor() to clean up listeners on disconnect and send trailer on stream abort diff --git a/packages/livekit-rtc/src/ffi_client.ts b/packages/livekit-rtc/src/ffi_client.ts index 7a1e28b6..d637ad01 100644 --- a/packages/livekit-rtc/src/ffi_client.ts +++ b/packages/livekit-rtc/src/ffi_client.ts @@ -70,14 +70,37 @@ export class FfiClient extends (EventEmitter as new () => TypedEmitter(predicate: (ev: FfiEvent) => boolean): Promise { - return new Promise((resolve) => { + async waitFor( + predicate: (ev: FfiEvent) => boolean, + options?: { signal?: AbortSignal }, + ): Promise { + return new Promise((resolve, reject) => { const listener = (ev: FfiEvent) => { if (predicate(ev)) { - this.off(FfiClientEvent.FfiEvent, listener); + cleanup(); resolve(ev.message.value as T); } }; + + const cleanup = () => { + this.off(FfiClientEvent.FfiEvent, listener); + options?.signal?.removeEventListener('abort', onAbort); + }; + + // If an AbortSignal is provided, remove the listener when the signal + // fires so that pending waitFor() calls don't leak listeners after + // the room disconnects or the operation is cancelled. + const onAbort = () => { + cleanup(); + reject(new Error(options?.signal?.reason ?? 'waitFor aborted')); + }; + + if (options?.signal?.aborted) { + reject(new Error(options.signal.reason ?? 'waitFor aborted')); + return; + } + + options?.signal?.addEventListener('abort', onAbort); this.on(FfiClientEvent.FfiEvent, listener); }); } diff --git a/packages/livekit-rtc/src/participant.ts b/packages/livekit-rtc/src/participant.ts index 3c69ef04..5b7c2b69 100644 --- a/packages/livekit-rtc/src/participant.ts +++ b/packages/livekit-rtc/src/participant.ts @@ -157,11 +157,16 @@ export class LocalParticipant extends Participant { private ffiEventLock: Mutex; + // Signal that fires when the owning Room disconnects, used to cancel + // pending FfiClient.waitFor() listeners so they don't leak. + private disconnectSignal: AbortSignal; + trackPublications: Map = new Map(); - constructor(info: OwnedParticipant, ffiEventLock: Mutex) { + constructor(info: OwnedParticipant, ffiEventLock: Mutex, disconnectSignal: AbortSignal) { super(info); this.ffiEventLock = ffiEventLock; + this.disconnectSignal = disconnectSignal; } async publishData(data: Uint8Array, options: DataPublishOptions) { @@ -178,9 +183,10 @@ export class LocalParticipant extends Participant { message: { case: 'publishData', value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'publishData' && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'publishData' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); if (cb.error) { throw new Error(cb.error); @@ -198,9 +204,10 @@ export class LocalParticipant extends Participant { message: { case: 'publishSipDtmf', value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'publishSipDtmf' && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'publishSipDtmf' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); if (cb.error) { throw new Error(cb.error); @@ -229,9 +236,10 @@ export class LocalParticipant extends Participant { message: { case: 'publishTranscription', value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'publishTranscription' && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'publishTranscription' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); if (cb.error) { throw new Error(cb.error); @@ -248,9 +256,10 @@ export class LocalParticipant extends Participant { message: { case: 'setLocalMetadata', value: req }, }); - await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'setLocalMetadata' && ev.message.value.asyncId == res.asyncId; - }); + await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'setLocalMetadata' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); } /** @@ -335,8 +344,24 @@ export class LocalParticipant extends Participant { }); await sendTrailer(trailerReq); }, - abort(err) { + // Send a trailer with the error reason so the remote side's stream + // controller is closed instead of waiting for data that won't arrive. + async abort(err) { log.error('Sink error:', err); + try { + const trailerReq = new SendStreamTrailerRequest({ + senderIdentity, + localParticipantHandle: localHandle, + destinationIdentities, + trailer: new DataStream_Trailer({ + streamId, + reason: err instanceof Error ? err.message : String(err ?? ''), + }), + }); + await sendTrailer(trailerReq); + } catch { + // Best-effort: the connection may already be gone. + } }, }); @@ -450,8 +475,24 @@ export class LocalParticipant extends Participant { }); await sendTrailer(trailerReq); }, - abort(err) { + // Send a trailer with the error reason so the remote side's stream + // controller is closed instead of waiting for data that won't arrive. + async abort(err) { log.error('Sink error:', err); + try { + const trailerReq = new SendStreamTrailerRequest({ + senderIdentity, + localParticipantHandle: localHandle, + destinationIdentities, + trailer: new DataStream_Trailer({ + streamId, + reason: err instanceof Error ? err.message : String(err ?? ''), + }), + }); + await sendTrailer(trailerReq); + } catch { + // Best-effort: the connection may already be gone. + } }, }); @@ -494,44 +535,47 @@ export class LocalParticipant extends Participant { message: { case: type, value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == type && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == type && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); if (cb.error) { throw new Error(cb.error); } } - private async sendStreamChunk(req: SendStreamChunkRequest) { + private sendStreamChunk = async (req: SendStreamChunkRequest) => { const type = 'sendStreamChunk'; const res = FfiClient.instance.request({ message: { case: type, value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == type && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == type && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); if (cb.error) { throw new Error(cb.error); } - } + }; - private async sendStreamTrailer(req: SendStreamTrailerRequest) { + private sendStreamTrailer = async (req: SendStreamTrailerRequest) => { const type = 'sendStreamTrailer'; const res = FfiClient.instance.request({ message: { case: type, value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == type && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == type && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); if (cb.error) { throw new Error(cb.error); } - } + }; /** * Sends a chat message to participants in the room @@ -557,9 +601,10 @@ export class LocalParticipant extends Participant { message: { case: 'sendChatMessage', value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); switch (cb.message.case) { case 'chatMessage': @@ -603,9 +648,10 @@ export class LocalParticipant extends Participant { message: { case: 'editChatMessage', value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); switch (cb.message.case) { case 'chatMessage': @@ -632,9 +678,10 @@ export class LocalParticipant extends Participant { message: { case: 'setLocalName', value: req }, }); - await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'setLocalName' && ev.message.value.asyncId == res.asyncId; - }); + await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'setLocalName' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); } async setAttributes(attributes: Record) { @@ -647,9 +694,10 @@ export class LocalParticipant extends Participant { message: { case: 'setLocalAttributes', value: req }, }); - await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'setLocalAttributes' && ev.message.value.asyncId == res.asyncId; - }); + await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'setLocalAttributes' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); } async publishTrack( @@ -669,9 +717,10 @@ export class LocalParticipant extends Participant { }); try { - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'publishTrack' && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'publishTrack' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); switch (cb.message.case) { case 'publication': @@ -702,9 +751,10 @@ export class LocalParticipant extends Participant { message: { case: 'unpublishTrack', value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'unpublishTrack' && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'unpublishTrack' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); if (cb.error) { throw new Error(cb.error); @@ -744,9 +794,10 @@ export class LocalParticipant extends Participant { message: { case: 'performRpc', value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case === 'performRpc' && ev.message.value.asyncId === res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case === 'performRpc' && ev.message.value.asyncId === res.asyncId, + { signal: this.disconnectSignal }, + ); if (cb.error) { throw RpcError.fromProto(cb.error); diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 6ef6bf63..355d95be 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -96,6 +96,10 @@ export class Room extends (EventEmitter as new () => TypedEmitter private preConnectEvents: FfiEvent[] = []; + // Aborted on disconnect to cancel any pending FfiClient.waitFor() listeners, + // preventing them from leaking when the room goes away. + private disconnectController = new AbortController(); + private _token?: string; private _serverUrl?: string; @@ -241,9 +245,13 @@ export class Room extends (EventEmitter as new () => TypedEmitter this._serverUrl = url; this.info = cb.message.value.room!.info; this.connectionState = ConnectionState.CONN_CONNECTED; + // Reset the abort controller for this connection session so that + // a previous disconnect doesn't immediately cancel new operations. + this.disconnectController = new AbortController(); this.localParticipant = new LocalParticipant( cb.message.value.localParticipant!, this.ffiEventLock, + this.disconnectController.signal, ); for (const pt of cb.message.value.participants) { @@ -283,6 +291,11 @@ export class Room extends (EventEmitter as new () => TypedEmitter return ev.message.case == 'disconnect' && ev.message.value.asyncId == res.asyncId; }); + // Abort all pending FfiClient.waitFor() listeners so they don't leak. + // This causes any in-flight operations (publishData, publishTrack, etc.) + // to reject and clean up their event listeners. + this.disconnectController.abort('Room disconnected'); + FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent); this.removeAllListeners(); } @@ -599,6 +612,9 @@ export class Room extends (EventEmitter as new () => TypedEmitter /*} else if (ev.case == 'connected') { this.emit(RoomEvent.Connected);*/ } else if (ev.case == 'disconnected') { + // Abort pending waitFor() listeners on server-initiated disconnect too, + // not just on explicit disconnect() calls. + this.disconnectController.abort('Room disconnected'); this.emit(RoomEvent.Disconnected, ev.value.reason!); } else if (ev.case == 'reconnecting') { this.emit(RoomEvent.Reconnecting);