diff --git a/.changeset/warm-owls-deny.md b/.changeset/warm-owls-deny.md new file mode 100644 index 00000000..8f5e3964 --- /dev/null +++ b/.changeset/warm-owls-deny.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Dispose track publication FfiHandles on participant disconnect to prevent FD leaks diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 6ef6bf63..719eda6f 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -382,7 +382,15 @@ export class Room extends (EventEmitter as new () => TypedEmitter if (participant) { this.remoteParticipants.delete(participant.identity); participant.info.disconnectReason = ev.value.disconnectReason; + // Emit before disposing so listeners can still access trackPublications. this.emit(RoomEvent.ParticipantDisconnected, participant); + // Dispose each track publication's FfiHandle to prevent FD leaks. + // Without this, rapid participant disconnections accumulate undisposed + // native handles since nothing else triggers their cleanup. + for (const [, publication] of participant.trackPublications) { + publication.ffiHandle.dispose(); + } + participant.trackPublications.clear(); } else { log.warn(`RoomEvent.ParticipantDisconnected: Could not find participant`); } diff --git a/packages/livekit-rtc/src/tests/e2e.test.ts b/packages/livekit-rtc/src/tests/e2e.test.ts index ad1d5af2..f3146c64 100644 --- a/packages/livekit-rtc/src/tests/e2e.test.ts +++ b/packages/livekit-rtc/src/tests/e2e.test.ts @@ -514,4 +514,96 @@ describeE2E('livekit-rtc e2e', () => { }, testTimeoutMs * 2, ); + + it( + 'cleans up track publications when a remote participant disconnects', + async () => { + const { rooms } = await connectTestRooms(2); + const [stayingRoom, leavingRoom] = rooms; + + // Publish a track from the leaving participant so its track publication + // will need to be cleaned up on disconnect. + const source = new AudioSource(48_000, 1); + const track = LocalAudioTrack.createAudioTrack('cleanup-test', source); + const options = new TrackPublishOptions(); + options.source = TrackSource.SOURCE_MICROPHONE; + await leavingRoom!.localParticipant!.publishTrack(track, options); + + // Wait for the staying room to see the track subscription + await waitFor( + () => { + const remote = stayingRoom!.remoteParticipants.get( + leavingRoom!.localParticipant!.identity, + ); + return remote !== undefined && remote.trackPublications.size > 0; + }, + { timeoutMs: 5000, debugName: 'track publication visible' }, + ); + + // Capture a reference to the remote participant before disconnect + const remoteParticipant = stayingRoom!.remoteParticipants.get( + leavingRoom!.localParticipant!.identity, + )!; + expect(remoteParticipant.trackPublications.size).toBeGreaterThan(0); + + // Listen for the disconnect event + const disconnected = waitForRoomEvent( + stayingRoom!, + RoomEvent.ParticipantDisconnected, + testTimeoutMs, + (p: { identity: string }) => p.identity, + ); + + await leavingRoom!.disconnect(); + await disconnected; + + // After disconnect, the remote participant's track publications map + // should be cleared (handles disposed). + expect(remoteParticipant.trackPublications.size).toBe(0); + expect(stayingRoom!.remoteParticipants.has(remoteParticipant.identity)).toBe(false); + + await source.close(); + await stayingRoom!.disconnect(); + }, + testTimeoutMs, + ); + + it( + 'cleans up resources when multiple participants disconnect simultaneously', + async () => { + // Connect 4 participants to stress-test concurrent disconnection cleanup + const { rooms } = await connectTestRooms(4); + + // Publish a track from each participant to create track publications + const sources: AudioSource[] = []; + for (const room of rooms) { + const source = new AudioSource(48_000, 1); + sources.push(source); + const track = LocalAudioTrack.createAudioTrack('multi-cleanup', source); + const options = new TrackPublishOptions(); + options.source = TrackSource.SOURCE_MICROPHONE; + await room.localParticipant!.publishTrack(track, options); + } + + // Wait for all participants to see each other's tracks + await waitFor( + () => + rooms.every( + (r) => + r.remoteParticipants.size === 3 && + [...r.remoteParticipants.values()].every((p) => p.trackPublications.size > 0), + ), + { timeoutMs: 5000, debugName: 'all tracks visible' }, + ); + + // Disconnect all participants simultaneously + await Promise.all([...rooms.map((r) => r.disconnect()), ...sources.map((s) => s.close())]); + + // Verify all rooms are disconnected and remote participant maps are empty + for (const room of rooms) { + expect(room.isConnected).toBe(false); + } + }, + testTimeoutMs * 2, + ); });