diff --git a/.changeset/shy-rivers-punch.md b/.changeset/shy-rivers-punch.md new file mode 100644 index 00000000..affdcb6a --- /dev/null +++ b/.changeset/shy-rivers-punch.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Close in-progress stream controllers on room disconnect to prevent FD leaks diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 6ef6bf63..d311debf 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -283,6 +283,28 @@ export class Room extends (EventEmitter as new () => TypedEmitter return ev.message.case == 'disconnect' && ev.message.value.asyncId == res.asyncId; }); + // Close all in-progress stream controllers to prevent FD leaks. + // Streams that were receiving data but never got a trailer (e.g. the sender + // disconnected mid-transfer) would otherwise keep their ReadableStream open + // indefinitely, leaking the underlying controller and any buffered chunks. + for (const [, streamController] of this.byteStreamControllers) { + try { + streamController.controller.close(); + } catch { + // controller may already be closed + } + } + this.byteStreamControllers.clear(); + + for (const [, streamController] of this.textStreamControllers) { + try { + streamController.controller.close(); + } catch { + // controller may already be closed + } + } + this.textStreamControllers.clear(); + FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent); this.removeAllListeners(); } diff --git a/packages/livekit-rtc/src/tests/e2e.test.ts b/packages/livekit-rtc/src/tests/e2e.test.ts index ad1d5af2..48cc5361 100644 --- a/packages/livekit-rtc/src/tests/e2e.test.ts +++ b/packages/livekit-rtc/src/tests/e2e.test.ts @@ -514,4 +514,44 @@ describeE2E('livekit-rtc e2e', () => { }, testTimeoutMs * 2, ); + + it( + 'cleans up stream controllers when disconnecting during an active stream', + async () => { + const { rooms } = await connectTestRooms(2); + const [receivingRoom, sendingRoom] = rooms; + const topic = 'cleanup-stream-topic'; + + // Register a handler on the receiving side that will intentionally + // NOT fully consume the stream — simulating an abandoned transfer. + let readerReceived = false; + // eslint-disable-next-line @typescript-eslint/no-unused-vars + receivingRoom!.registerTextStreamHandler(topic, async (_reader, _sender) => { + readerReceived = true; + // Deliberately do not call reader.readAll() so the stream stays open + }); + + // Start sending a text stream but don't close it + const writer = await sendingRoom!.localParticipant!.streamText({ topic }); + await writer.write('partial data'); + + // Wait for the receiving side to get the stream header + await waitFor(() => readerReceived, { + timeoutMs: 5000, + debugName: 'text stream header received', + }); + + // Disconnect the receiving room while the stream is still open. + // This should close the stream controller without throwing. + await receivingRoom!.disconnect(); + + // Also close the writer and disconnect the sender + await writer.close(); + await sendingRoom!.disconnect(); + + // If we got here without hanging or throwing, the stream controller + // was properly cleaned up on disconnect. + }, + testTimeoutMs, + ); });