diff --git a/packages/livekit-rtc/src/audio_mixer.test.ts b/packages/livekit-rtc/src/audio_mixer.test.ts index 45cc9ffd..d4dfbfaa 100644 --- a/packages/livekit-rtc/src/audio_mixer.test.ts +++ b/packages/livekit-rtc/src/audio_mixer.test.ts @@ -164,4 +164,106 @@ describe('AudioMixer', () => { // Should get at least 2 frames (stream exhausts after 2) expect(frames.length).toBeGreaterThanOrEqual(2); }); + + it('completes mixing without lingering timers when iterator is fast', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + // Long timeout so the iterator always wins the race. + // Before the fix, each iteration leaked a 5s timer; with the fix, + // cancel() clears it immediately so the mixer shuts down without delay. + streamTimeoutMs: 5000, + }); + + const stream = createMockAudioStream(3, sampleRate, numChannels, samplesPerChannel, 42); + mixer.addStream(stream); + + const frames: AudioFrame[] = []; + for await (const frame of mixer) { + frames.push(frame); + if (frames.length >= 2) break; + } + + await mixer.aclose(); + + expect(frames.length).toBe(2); + // Verify the frames contain the expected mixed value + for (const frame of frames) { + expect(frame.data[0]).toBe(42); + } + }); + + it('produces frames even with many race iterations', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + streamTimeoutMs: 5000, + }); + + // Use more frames to stress multiple race iterations + const stream = createMockAudioStream(6, sampleRate, numChannels, samplesPerChannel, 10); + mixer.addStream(stream); + + const frames: AudioFrame[] = []; + for await (const frame of mixer) { + frames.push(frame); + if (frames.length >= 4) break; + } + + await mixer.aclose(); + + expect(frames.length).toBe(4); + // All frames should contain the expected value + for (const frame of frames) { + expect(frame.data[0]).toBe(10); + } + }); + + it('handles slow streams via timeout path', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + // Very short timeout to trigger the timeout path + streamTimeoutMs: 1, + }); + + // Create a stream that is slower than the timeout + async function* slowStream(): AsyncGenerator { + await new Promise((resolve) => setTimeout(resolve, 200)); + const data = new Int16Array(numChannels * samplesPerChannel).fill(500); + yield new AudioFrame(data, sampleRate, numChannels, samplesPerChannel); + } + + // Suppress the expected console.warn from the timeout path + const originalWarn = console.warn; + const warnings: string[] = []; + console.warn = (...args: unknown[]) => { + warnings.push(args.map(String).join(' ')); + }; + + try { + mixer.addStream(slowStream()); + + // The mixer should produce a frame (zero-padded due to timeout) + // and auto-close when the stream exhausts. + const frames: AudioFrame[] = []; + for await (const frame of mixer) { + frames.push(frame); + if (frames.length >= 1) break; + } + + await mixer.aclose(); + + // The timeout warning should have been logged + expect(warnings.some((w) => w.includes('stream timeout after'))).toBe(true); + } finally { + console.warn = originalWarn; + } + }); }); diff --git a/packages/livekit-rtc/src/audio_mixer.ts b/packages/livekit-rtc/src/audio_mixer.ts index 1f75e16c..744bbf6e 100644 --- a/packages/livekit-rtc/src/audio_mixer.ts +++ b/packages/livekit-rtc/src/audio_mixer.ts @@ -311,19 +311,25 @@ export class AudioMixer { // Accumulate data until we have at least chunkSize samples while (buf.length < this.chunkSize * this.numChannels && !exhausted && !this.closed) { try { - const result = await Promise.race([iterator.next(), this.timeout(this.streamTimeoutMs)]); - - if (result === 'timeout') { + const { result, clearTimeout: cancel } = this.timeoutRace( + iterator.next(), + this.streamTimeoutMs, + ); + const value = await result; + cancel(); + + if (value === 'timeout') { console.warn(`AudioMixer: stream timeout after ${this.streamTimeoutMs}ms`); break; } + const iterResult = value; - if (result.done) { + if (iterResult.done) { exhausted = true; break; } - const frame = result.value; + const frame = iterResult.value; const newData = frame.data; // Mark that we received data in this call @@ -412,7 +418,19 @@ export class AudioMixer { return new Promise((resolve) => setTimeout(resolve, ms)); } - private timeout(ms: number): Promise<'timeout'> { - return new Promise((resolve) => setTimeout(() => resolve('timeout'), ms)); + /** Race a promise against a timeout, returning a handle to clear the timer + * so the losing setTimeout doesn't linger after the winner resolves. */ + private timeoutRace( + promise: Promise, + ms: number, + ): { result: Promise; clearTimeout: () => void } { + let timer: ReturnType; + const timeoutPromise = new Promise<'timeout'>((resolve) => { + timer = setTimeout(() => resolve('timeout'), ms); + }); + return { + result: Promise.race([promise, timeoutPromise]), + clearTimeout: () => clearTimeout(timer), + }; } } diff --git a/packages/livekit-rtc/src/audio_resampler.ts b/packages/livekit-rtc/src/audio_resampler.ts index 524cd94d..24bb0ec1 100644 --- a/packages/livekit-rtc/src/audio_resampler.ts +++ b/packages/livekit-rtc/src/audio_resampler.ts @@ -97,6 +97,14 @@ export class AudioResampler { return this.#channels; } + /** + * Releases the underlying native resampler handle. Must be called when + * the resampler is no longer needed to avoid leaking the FD. + */ + close() { + this.#ffiHandle.dispose(); + } + /** * Push audio data into the resampler and retrieve any available resampled data. * diff --git a/packages/livekit-rtc/src/audio_source.ts b/packages/livekit-rtc/src/audio_source.ts index 16096b87..059adce1 100644 --- a/packages/livekit-rtc/src/audio_source.ts +++ b/packages/livekit-rtc/src/audio_source.ts @@ -145,6 +145,12 @@ export class AudioSource { } async close() { + // Clear any pending playout timeout so its callback doesn't fire after + // the handle is disposed, which would reference freed native state. + if (this.timeout) { + clearTimeout(this.timeout); + this.timeout = undefined; + } this.ffiHandle.dispose(); this.closed = true; } diff --git a/packages/livekit-rtc/src/audio_stream.ts b/packages/livekit-rtc/src/audio_stream.ts index 12d78232..f2d9dda4 100644 --- a/packages/livekit-rtc/src/audio_stream.ts +++ b/packages/livekit-rtc/src/audio_stream.ts @@ -106,6 +106,9 @@ class AudioStreamSource implements UnderlyingSource { case 'eos': FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent); this.controller.close(); + // Dispose the native handle so the FD is released on stream end, + // not just when cancel() is called explicitly by the consumer. + this.ffiHandle.dispose(); this.frameProcessor?.close(); break; } @@ -118,6 +121,9 @@ class AudioStreamSource implements UnderlyingSource { cancel() { FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent); this.ffiHandle.dispose(); + // Also close the frame processor on cancel for symmetry with the EOS path, + // so resources are released regardless of how the stream ends. + this.frameProcessor?.close(); } } diff --git a/packages/livekit-rtc/src/data_streams/stream_reader.ts b/packages/livekit-rtc/src/data_streams/stream_reader.ts index 5bfff94f..7067d69d 100644 --- a/packages/livekit-rtc/src/data_streams/stream_reader.ts +++ b/packages/livekit-rtc/src/data_streams/stream_reader.ts @@ -53,12 +53,18 @@ export class ByteStreamReader extends BaseStreamReader { try { const { done, value } = await reader.read(); if (done) { + // Release the lock when the stream is exhausted so the + // underlying ReadableStream can be garbage-collected. + reader.releaseLock(); return { done: true, value: undefined as unknown }; } else { this.handleChunkReceived(value); return { done: false, value: value.content! }; } } catch (error: unknown) { + // Release the lock on error so it doesn't stay held when the + // consumer never calls return() (e.g. breaking out of for-await). + reader.releaseLock(); log.error('error processing stream update: %s', error); return { done: true, value: undefined as unknown }; } @@ -121,12 +127,18 @@ export class TextStreamReader extends BaseStreamReader { [Symbol.asyncIterator]() { const reader = this.reader.getReader(); const decoder = new TextDecoder(); + const receivedChunks = this.receivedChunks; return { next: async (): Promise> => { try { const { done, value } = await reader.read(); if (done) { + // Release the lock when the stream is exhausted so the + // underlying ReadableStream can be garbage-collected. + reader.releaseLock(); + // Clear received chunks so the buffered data can be GC'd. + receivedChunks.clear(); return { done: true, value: undefined }; } else { this.handleChunkReceived(value); @@ -136,6 +148,10 @@ export class TextStreamReader extends BaseStreamReader { }; } } catch (error: unknown) { + // Release the lock on error so it doesn't stay held when the + // consumer never calls return() (e.g. breaking out of for-await). + reader.releaseLock(); + receivedChunks.clear(); log.error('error processing stream update: %s', error); return { done: true, value: undefined }; } @@ -143,6 +159,8 @@ export class TextStreamReader extends BaseStreamReader { return(): IteratorResult { reader.releaseLock(); + // Clear received chunks so the buffered data can be GC'd. + receivedChunks.clear(); return { done: true, value: undefined }; }, }; diff --git a/packages/livekit-rtc/src/ffi_client.ts b/packages/livekit-rtc/src/ffi_client.ts index 7a1e28b6..d637ad01 100644 --- a/packages/livekit-rtc/src/ffi_client.ts +++ b/packages/livekit-rtc/src/ffi_client.ts @@ -70,14 +70,37 @@ export class FfiClient extends (EventEmitter as new () => TypedEmitter(predicate: (ev: FfiEvent) => boolean): Promise { - return new Promise((resolve) => { + async waitFor( + predicate: (ev: FfiEvent) => boolean, + options?: { signal?: AbortSignal }, + ): Promise { + return new Promise((resolve, reject) => { const listener = (ev: FfiEvent) => { if (predicate(ev)) { - this.off(FfiClientEvent.FfiEvent, listener); + cleanup(); resolve(ev.message.value as T); } }; + + const cleanup = () => { + this.off(FfiClientEvent.FfiEvent, listener); + options?.signal?.removeEventListener('abort', onAbort); + }; + + // If an AbortSignal is provided, remove the listener when the signal + // fires so that pending waitFor() calls don't leak listeners after + // the room disconnects or the operation is cancelled. + const onAbort = () => { + cleanup(); + reject(new Error(options?.signal?.reason ?? 'waitFor aborted')); + }; + + if (options?.signal?.aborted) { + reject(new Error(options.signal.reason ?? 'waitFor aborted')); + return; + } + + options?.signal?.addEventListener('abort', onAbort); this.on(FfiClientEvent.FfiEvent, listener); }); } diff --git a/packages/livekit-rtc/src/participant.ts b/packages/livekit-rtc/src/participant.ts index 3c69ef04..5b7c2b69 100644 --- a/packages/livekit-rtc/src/participant.ts +++ b/packages/livekit-rtc/src/participant.ts @@ -157,11 +157,16 @@ export class LocalParticipant extends Participant { private ffiEventLock: Mutex; + // Signal that fires when the owning Room disconnects, used to cancel + // pending FfiClient.waitFor() listeners so they don't leak. + private disconnectSignal: AbortSignal; + trackPublications: Map = new Map(); - constructor(info: OwnedParticipant, ffiEventLock: Mutex) { + constructor(info: OwnedParticipant, ffiEventLock: Mutex, disconnectSignal: AbortSignal) { super(info); this.ffiEventLock = ffiEventLock; + this.disconnectSignal = disconnectSignal; } async publishData(data: Uint8Array, options: DataPublishOptions) { @@ -178,9 +183,10 @@ export class LocalParticipant extends Participant { message: { case: 'publishData', value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'publishData' && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'publishData' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); if (cb.error) { throw new Error(cb.error); @@ -198,9 +204,10 @@ export class LocalParticipant extends Participant { message: { case: 'publishSipDtmf', value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'publishSipDtmf' && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'publishSipDtmf' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); if (cb.error) { throw new Error(cb.error); @@ -229,9 +236,10 @@ export class LocalParticipant extends Participant { message: { case: 'publishTranscription', value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'publishTranscription' && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'publishTranscription' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); if (cb.error) { throw new Error(cb.error); @@ -248,9 +256,10 @@ export class LocalParticipant extends Participant { message: { case: 'setLocalMetadata', value: req }, }); - await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'setLocalMetadata' && ev.message.value.asyncId == res.asyncId; - }); + await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'setLocalMetadata' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); } /** @@ -335,8 +344,24 @@ export class LocalParticipant extends Participant { }); await sendTrailer(trailerReq); }, - abort(err) { + // Send a trailer with the error reason so the remote side's stream + // controller is closed instead of waiting for data that won't arrive. + async abort(err) { log.error('Sink error:', err); + try { + const trailerReq = new SendStreamTrailerRequest({ + senderIdentity, + localParticipantHandle: localHandle, + destinationIdentities, + trailer: new DataStream_Trailer({ + streamId, + reason: err instanceof Error ? err.message : String(err ?? ''), + }), + }); + await sendTrailer(trailerReq); + } catch { + // Best-effort: the connection may already be gone. + } }, }); @@ -450,8 +475,24 @@ export class LocalParticipant extends Participant { }); await sendTrailer(trailerReq); }, - abort(err) { + // Send a trailer with the error reason so the remote side's stream + // controller is closed instead of waiting for data that won't arrive. + async abort(err) { log.error('Sink error:', err); + try { + const trailerReq = new SendStreamTrailerRequest({ + senderIdentity, + localParticipantHandle: localHandle, + destinationIdentities, + trailer: new DataStream_Trailer({ + streamId, + reason: err instanceof Error ? err.message : String(err ?? ''), + }), + }); + await sendTrailer(trailerReq); + } catch { + // Best-effort: the connection may already be gone. + } }, }); @@ -494,44 +535,47 @@ export class LocalParticipant extends Participant { message: { case: type, value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == type && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == type && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); if (cb.error) { throw new Error(cb.error); } } - private async sendStreamChunk(req: SendStreamChunkRequest) { + private sendStreamChunk = async (req: SendStreamChunkRequest) => { const type = 'sendStreamChunk'; const res = FfiClient.instance.request({ message: { case: type, value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == type && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == type && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); if (cb.error) { throw new Error(cb.error); } - } + }; - private async sendStreamTrailer(req: SendStreamTrailerRequest) { + private sendStreamTrailer = async (req: SendStreamTrailerRequest) => { const type = 'sendStreamTrailer'; const res = FfiClient.instance.request({ message: { case: type, value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == type && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == type && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); if (cb.error) { throw new Error(cb.error); } - } + }; /** * Sends a chat message to participants in the room @@ -557,9 +601,10 @@ export class LocalParticipant extends Participant { message: { case: 'sendChatMessage', value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); switch (cb.message.case) { case 'chatMessage': @@ -603,9 +648,10 @@ export class LocalParticipant extends Participant { message: { case: 'editChatMessage', value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'chatMessage' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); switch (cb.message.case) { case 'chatMessage': @@ -632,9 +678,10 @@ export class LocalParticipant extends Participant { message: { case: 'setLocalName', value: req }, }); - await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'setLocalName' && ev.message.value.asyncId == res.asyncId; - }); + await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'setLocalName' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); } async setAttributes(attributes: Record) { @@ -647,9 +694,10 @@ export class LocalParticipant extends Participant { message: { case: 'setLocalAttributes', value: req }, }); - await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'setLocalAttributes' && ev.message.value.asyncId == res.asyncId; - }); + await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'setLocalAttributes' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); } async publishTrack( @@ -669,9 +717,10 @@ export class LocalParticipant extends Participant { }); try { - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'publishTrack' && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'publishTrack' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); switch (cb.message.case) { case 'publication': @@ -702,9 +751,10 @@ export class LocalParticipant extends Participant { message: { case: 'unpublishTrack', value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case == 'unpublishTrack' && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case == 'unpublishTrack' && ev.message.value.asyncId == res.asyncId, + { signal: this.disconnectSignal }, + ); if (cb.error) { throw new Error(cb.error); @@ -744,9 +794,10 @@ export class LocalParticipant extends Participant { message: { case: 'performRpc', value: req }, }); - const cb = await FfiClient.instance.waitFor((ev) => { - return ev.message.case === 'performRpc' && ev.message.value.asyncId === res.asyncId; - }); + const cb = await FfiClient.instance.waitFor( + (ev) => ev.message.case === 'performRpc' && ev.message.value.asyncId === res.asyncId, + { signal: this.disconnectSignal }, + ); if (cb.error) { throw RpcError.fromProto(cb.error); diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 6ef6bf63..3db9cdba 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -96,6 +96,10 @@ export class Room extends (EventEmitter as new () => TypedEmitter private preConnectEvents: FfiEvent[] = []; + // Aborted on disconnect to cancel any pending FfiClient.waitFor() listeners, + // preventing them from leaking when the room goes away. + private disconnectController = new AbortController(); + private _token?: string; private _serverUrl?: string; @@ -131,6 +135,11 @@ export class Room extends (EventEmitter as new () => TypedEmitter return this._serverUrl; } + // Shared promise for concurrent getSid() callers. Without this, each call + // registers its own RoomSidChanged + Disconnected listeners, and if many + // calls race only one of each pair is cleaned up — leaking the rest. + private sidPromise?: Promise; + /** * Gets the room's server ID. This ID is assigned by the LiveKit server * and is unique for each room session. @@ -144,19 +153,26 @@ export class Room extends (EventEmitter as new () => TypedEmitter if (this.info?.sid && this.info.sid !== '') { return this.info.sid; } - return new Promise((resolve, reject) => { - const handleRoomUpdate = (sid: string) => { - if (sid !== '') { + if (!this.sidPromise) { + this.sidPromise = new Promise((resolve, reject) => { + const handleDisconnect = () => { this.off(RoomEvent.RoomSidChanged, handleRoomUpdate); - resolve(sid); - } - }; - this.on(RoomEvent.RoomSidChanged, handleRoomUpdate); - this.once(RoomEvent.Disconnected, () => { - this.off(RoomEvent.RoomSidChanged, handleRoomUpdate); - reject('Room disconnected before room server id was available'); + this.sidPromise = undefined; + reject('Room disconnected before room server id was available'); + }; + const handleRoomUpdate = (sid: string) => { + if (sid !== '') { + this.off(RoomEvent.RoomSidChanged, handleRoomUpdate); + this.off(RoomEvent.Disconnected as any, handleDisconnect); + this.sidPromise = undefined; + resolve(sid); + } + }; + this.on(RoomEvent.RoomSidChanged, handleRoomUpdate); + this.once(RoomEvent.Disconnected, handleDisconnect); }); - }); + } + return this.sidPromise; } get numParticipants(): number { @@ -241,9 +257,13 @@ export class Room extends (EventEmitter as new () => TypedEmitter this._serverUrl = url; this.info = cb.message.value.room!.info; this.connectionState = ConnectionState.CONN_CONNECTED; + // Reset the abort controller for this connection session so that + // a previous disconnect doesn't immediately cancel new operations. + this.disconnectController = new AbortController(); this.localParticipant = new LocalParticipant( cb.message.value.localParticipant!, this.ffiEventLock, + this.disconnectController.signal, ); for (const pt of cb.message.value.participants) { @@ -283,6 +303,33 @@ export class Room extends (EventEmitter as new () => TypedEmitter return ev.message.case == 'disconnect' && ev.message.value.asyncId == res.asyncId; }); + // Close all in-progress stream controllers to prevent FD leaks. + // Streams that were receiving data but never got a trailer (e.g. the sender + // disconnected mid-transfer) would otherwise keep their ReadableStream open + // indefinitely, leaking the underlying controller and any buffered chunks. + for (const [, streamController] of this.byteStreamControllers) { + try { + streamController.controller.close(); + } catch { + // controller may already be closed + } + } + this.byteStreamControllers.clear(); + + for (const [, streamController] of this.textStreamControllers) { + try { + streamController.controller.close(); + } catch { + // controller may already be closed + } + } + this.textStreamControllers.clear(); + + // Abort all pending FfiClient.waitFor() listeners so they don't leak. + // This causes any in-flight operations (publishData, publishTrack, etc.) + // to reject and clean up their event listeners. + this.disconnectController.abort('Room disconnected'); + FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent); this.removeAllListeners(); } @@ -382,6 +429,13 @@ export class Room extends (EventEmitter as new () => TypedEmitter if (participant) { this.remoteParticipants.delete(participant.identity); participant.info.disconnectReason = ev.value.disconnectReason; + // Dispose each track publication's FfiHandle to prevent FD leaks. + // Without this, rapid participant disconnections accumulate undisposed + // native handles since nothing else triggers their cleanup. + for (const [, publication] of participant.trackPublications) { + publication.ffiHandle.dispose(); + } + participant.trackPublications.clear(); this.emit(RoomEvent.ParticipantDisconnected, participant); } else { log.warn(`RoomEvent.ParticipantDisconnected: Could not find participant`); diff --git a/packages/livekit-rtc/src/tests/e2e.test.ts b/packages/livekit-rtc/src/tests/e2e.test.ts index ad1d5af2..2fbb9e52 100644 --- a/packages/livekit-rtc/src/tests/e2e.test.ts +++ b/packages/livekit-rtc/src/tests/e2e.test.ts @@ -7,6 +7,7 @@ import { setTimeout as delay } from 'node:timers/promises'; import { afterAll, describe, expect, it as itRaw } from 'vitest'; import { AudioFrame, + AudioResampler, AudioSource, AudioStream, ConnectionState, @@ -514,4 +515,198 @@ describeE2E('livekit-rtc e2e', () => { }, testTimeoutMs * 2, ); + + // --- Resource cleanup tests --- + // These tests verify the fixes for FD / handle leaks that occur when + // participants connect and disconnect rapidly, streams are abandoned + // mid-transfer, or native handles are not disposed. + + it( + 'cleans up track publications when a remote participant disconnects', + async () => { + const { rooms } = await connectTestRooms(2); + const [stayingRoom, leavingRoom] = rooms; + + // Publish a track from the leaving participant so its track publication + // will need to be cleaned up on disconnect. + const source = new AudioSource(48_000, 1); + const track = LocalAudioTrack.createAudioTrack('cleanup-test', source); + const options = new TrackPublishOptions(); + options.source = TrackSource.SOURCE_MICROPHONE; + await leavingRoom!.localParticipant!.publishTrack(track, options); + + // Wait for the staying room to see the track subscription + await waitFor( + () => { + const remote = stayingRoom!.remoteParticipants.get( + leavingRoom!.localParticipant!.identity, + ); + return remote !== undefined && remote.trackPublications.size > 0; + }, + { timeoutMs: 5000, debugName: 'track publication visible' }, + ); + + // Capture a reference to the remote participant before disconnect + const remoteParticipant = stayingRoom!.remoteParticipants.get( + leavingRoom!.localParticipant!.identity, + )!; + expect(remoteParticipant.trackPublications.size).toBeGreaterThan(0); + + // Listen for the disconnect event + const disconnected = waitForRoomEvent( + stayingRoom!, + RoomEvent.ParticipantDisconnected, + testTimeoutMs, + (p: { identity: string }) => p.identity, + ); + + await leavingRoom!.disconnect(); + await disconnected; + + // After disconnect, the remote participant's track publications map + // should be cleared (handles disposed). + expect(remoteParticipant.trackPublications.size).toBe(0); + expect(stayingRoom!.remoteParticipants.has(remoteParticipant.identity)).toBe(false); + + await source.close(); + await stayingRoom!.disconnect(); + }, + testTimeoutMs, + ); + + it( + 'cleans up stream controllers when disconnecting during an active stream', + async () => { + const { rooms } = await connectTestRooms(2); + const [receivingRoom, sendingRoom] = rooms; + const topic = 'cleanup-stream-topic'; + + // Register a handler on the receiving side that will intentionally + // NOT fully consume the stream — simulating an abandoned transfer. + let readerReceived = false; + // eslint-disable-next-line @typescript-eslint/no-unused-vars + receivingRoom!.registerTextStreamHandler(topic, async (_reader, _sender) => { + readerReceived = true; + // Deliberately do not call reader.readAll() so the stream stays open + }); + + // Start sending a text stream but don't close it + const writer = await sendingRoom!.localParticipant!.streamText({ topic }); + await writer.write('partial data'); + + // Wait for the receiving side to get the stream header + await waitFor(() => readerReceived, { + timeoutMs: 5000, + debugName: 'text stream header received', + }); + + // Disconnect the receiving room while the stream is still open. + // This should close the stream controller without throwing. + await receivingRoom!.disconnect(); + + // Also close the writer and disconnect the sender + await writer.close(); + await sendingRoom!.disconnect(); + + // If we got here without hanging or throwing, the stream controller + // was properly cleaned up on disconnect. + }, + testTimeoutMs, + ); + + it( + 'cleans up resources when multiple participants disconnect simultaneously', + async () => { + // Connect 4 participants to stress-test concurrent disconnection cleanup + const { rooms } = await connectTestRooms(4); + + // Publish a track from each participant to create track publications + const sources: AudioSource[] = []; + for (const room of rooms) { + const source = new AudioSource(48_000, 1); + sources.push(source); + const track = LocalAudioTrack.createAudioTrack('multi-cleanup', source); + const options = new TrackPublishOptions(); + options.source = TrackSource.SOURCE_MICROPHONE; + await room.localParticipant!.publishTrack(track, options); + } + + // Wait for all participants to see each other's tracks + await waitFor( + () => + rooms.every( + (r) => + r.remoteParticipants.size === 3 && + [...r.remoteParticipants.values()].every((p) => p.trackPublications.size > 0), + ), + { timeoutMs: 5000, debugName: 'all tracks visible' }, + ); + + // Disconnect all participants simultaneously + await Promise.all([...rooms.map((r) => r.disconnect()), ...sources.map((s) => s.close())]); + + // Verify all rooms are disconnected and remote participant maps are empty + for (const room of rooms) { + expect(room.isConnected).toBe(false); + } + }, + testTimeoutMs * 2, + ); + + it( + 'AudioSource.close() clears pending timeout', + async () => { + const source = new AudioSource(48_000, 1); + + // Capture a frame to create a pending timeout + const frame = AudioFrame.create(48_000, 1, 480); + for (let i = 0; i < 480; i++) frame.data[i] = 0; + await source.captureFrame(frame); + + // Close while the timeout is pending — should not throw or + // leave a dangling timer that references disposed state. + await source.close(); + + // Verify the source is marked as closed + await expect(source.captureFrame(frame)).rejects.toThrow('AudioSource is closed'); + }, + testTimeoutMs, + ); + + it( + 'AudioResampler.close() releases the native handle', + async () => { + const resampler = new AudioResampler(48_000, 24_000, 1); + + // Push some data to ensure the handle is actively used + const frame = AudioFrame.create(48_000, 1, 480); + for (let i = 0; i < 480; i++) frame.data[i] = i % 32767; + const output = resampler.push(frame); + expect(output.length).toBeGreaterThanOrEqual(0); + + // close() should dispose the native handle without throwing + resampler.close(); + }, + testTimeoutMs, + ); + + it( + 'concurrent getSid() calls share a single listener and resolve consistently', + async () => { + const { rooms } = await connectTestRooms(1); + const room = rooms[0]!; + + // Fire multiple concurrent getSid() calls — they should all resolve + // to the same SID without leaking event listeners. + const results = await Promise.all([room.getSid(), room.getSid(), room.getSid()]); + + // All calls should return the same non-empty SID + expect(results[0]).toBeTruthy(); + expect(results[1]).toBe(results[0]); + expect(results[2]).toBe(results[0]); + + await room.disconnect(); + }, + testTimeoutMs, + ); }); diff --git a/packages/livekit-rtc/src/video_stream.ts b/packages/livekit-rtc/src/video_stream.ts index 241433e7..b13c6160 100644 --- a/packages/livekit-rtc/src/video_stream.ts +++ b/packages/livekit-rtc/src/video_stream.ts @@ -65,6 +65,9 @@ class VideoStreamSource implements UnderlyingSource { case 'eos': FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent); this.controller.close(); + // Dispose the native handle so the FD is released on stream end, + // not just when cancel() is called explicitly by the consumer. + this.ffiHandle.dispose(); break; } };