Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/shy-rivers-punch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/rtc-node': patch
---

Close in-progress stream controllers on room disconnect to prevent FD leaks
22 changes: 22 additions & 0 deletions packages/livekit-rtc/src/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,28 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
return ev.message.case == 'disconnect' && ev.message.value.asyncId == res.asyncId;
});

// Close all in-progress stream controllers to prevent FD leaks.
// Streams that were receiving data but never got a trailer (e.g. the sender
// disconnected mid-transfer) would otherwise keep their ReadableStream open
// indefinitely, leaking the underlying controller and any buffered chunks.
for (const [, streamController] of this.byteStreamControllers) {
try {
streamController.controller.close();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more appropriate to trigger call an controller.error(new Error("Disconnected while receiving")

} catch {
// controller may already be closed
}
}
this.byteStreamControllers.clear();

for (const [, streamController] of this.textStreamControllers) {
try {
streamController.controller.close();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

} catch {
// controller may already be closed
}
}
this.textStreamControllers.clear();

FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent);
this.removeAllListeners();
}
Expand Down
40 changes: 40 additions & 0 deletions packages/livekit-rtc/src/tests/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -514,4 +514,44 @@ describeE2E('livekit-rtc e2e', () => {
},
testTimeoutMs * 2,
);

it(
'cleans up stream controllers when disconnecting during an active stream',
async () => {
const { rooms } = await connectTestRooms(2);
const [receivingRoom, sendingRoom] = rooms;
const topic = 'cleanup-stream-topic';

// Register a handler on the receiving side that will intentionally
// NOT fully consume the stream — simulating an abandoned transfer.
let readerReceived = false;
// eslint-disable-next-line @typescript-eslint/no-unused-vars
receivingRoom!.registerTextStreamHandler(topic, async (_reader, _sender) => {
readerReceived = true;
// Deliberately do not call reader.readAll() so the stream stays open
});

// Start sending a text stream but don't close it
const writer = await sendingRoom!.localParticipant!.streamText({ topic });
await writer.write('partial data');

// Wait for the receiving side to get the stream header
await waitFor(() => readerReceived, {
timeoutMs: 5000,
debugName: 'text stream header received',
});

// Disconnect the receiving room while the stream is still open.
// This should close the stream controller without throwing.
await receivingRoom!.disconnect();

// Also close the writer and disconnect the sender
await writer.close();
await sendingRoom!.disconnect();

// If we got here without hanging or throwing, the stream controller
// was properly cleaned up on disconnect.
},
testTimeoutMs,
);
});
Loading