Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/warm-owls-deny.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/rtc-node': patch
---

Dispose track publication FfiHandles on participant disconnect to prevent FD leaks
8 changes: 8 additions & 0 deletions packages/livekit-rtc/src/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,15 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
if (participant) {
this.remoteParticipants.delete(participant.identity);
participant.info.disconnectReason = ev.value.disconnectReason;
// Emit before disposing so listeners can still access trackPublications.
this.emit(RoomEvent.ParticipantDisconnected, participant);
// Dispose each track publication's FfiHandle to prevent FD leaks.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for disconnecting participants we first receive a trackUnpublished event, which would arguably be the more obvious place to perform this cleanup.
Agree though, that the cleanup needs to happen at some point.

We could wrap it in queueMicrotask in the trackUnpublished handler.

// Without this, rapid participant disconnections accumulate undisposed
// native handles since nothing else triggers their cleanup.
for (const [, publication] of participant.trackPublications) {
publication.ffiHandle.dispose();
}
participant.trackPublications.clear();
} else {
log.warn(`RoomEvent.ParticipantDisconnected: Could not find participant`);
}
Expand Down
92 changes: 92 additions & 0 deletions packages/livekit-rtc/src/tests/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -514,4 +514,96 @@ describeE2E('livekit-rtc e2e', () => {
},
testTimeoutMs * 2,
);

it(
'cleans up track publications when a remote participant disconnects',
async () => {
const { rooms } = await connectTestRooms(2);
const [stayingRoom, leavingRoom] = rooms;

// Publish a track from the leaving participant so its track publication
// will need to be cleaned up on disconnect.
const source = new AudioSource(48_000, 1);
const track = LocalAudioTrack.createAudioTrack('cleanup-test', source);
const options = new TrackPublishOptions();
options.source = TrackSource.SOURCE_MICROPHONE;
await leavingRoom!.localParticipant!.publishTrack(track, options);

// Wait for the staying room to see the track subscription
await waitFor(
() => {
const remote = stayingRoom!.remoteParticipants.get(
leavingRoom!.localParticipant!.identity,
);
return remote !== undefined && remote.trackPublications.size > 0;
},
{ timeoutMs: 5000, debugName: 'track publication visible' },
);

// Capture a reference to the remote participant before disconnect
const remoteParticipant = stayingRoom!.remoteParticipants.get(
leavingRoom!.localParticipant!.identity,
)!;
expect(remoteParticipant.trackPublications.size).toBeGreaterThan(0);

// Listen for the disconnect event
const disconnected = waitForRoomEvent(
stayingRoom!,
RoomEvent.ParticipantDisconnected,
testTimeoutMs,
(p: { identity: string }) => p.identity,
);

await leavingRoom!.disconnect();
await disconnected;

// After disconnect, the remote participant's track publications map
// should be cleared (handles disposed).
expect(remoteParticipant.trackPublications.size).toBe(0);
expect(stayingRoom!.remoteParticipants.has(remoteParticipant.identity)).toBe(false);

await source.close();
await stayingRoom!.disconnect();
},
testTimeoutMs,
);

it(
'cleans up resources when multiple participants disconnect simultaneously',
async () => {
// Connect 4 participants to stress-test concurrent disconnection cleanup
const { rooms } = await connectTestRooms(4);

// Publish a track from each participant to create track publications
const sources: AudioSource[] = [];
for (const room of rooms) {
const source = new AudioSource(48_000, 1);
sources.push(source);
const track = LocalAudioTrack.createAudioTrack('multi-cleanup', source);
const options = new TrackPublishOptions();
options.source = TrackSource.SOURCE_MICROPHONE;
await room.localParticipant!.publishTrack(track, options);
}

// Wait for all participants to see each other's tracks
await waitFor(
() =>
rooms.every(
(r) =>
r.remoteParticipants.size === 3 &&
[...r.remoteParticipants.values()].every((p) => p.trackPublications.size > 0),
),
{ timeoutMs: 5000, debugName: 'all tracks visible' },
);

// Disconnect all participants simultaneously
await Promise.all([...rooms.map((r) => r.disconnect()), ...sources.map((s) => s.close())]);

// Verify all rooms are disconnected and remote participant maps are empty
for (const room of rooms) {
expect(room.isConnected).toBe(false);
}
},
testTimeoutMs * 2,
);
});
Loading