Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/blue-dots-jump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/rtc-node': patch
---

Release reader lock on stream completion and clear receivedChunks to prevent memory leak
18 changes: 18 additions & 0 deletions packages/livekit-rtc/src/data_streams/stream_reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,18 @@ export class ByteStreamReader extends BaseStreamReader<ByteStreamInfo> {
try {
const { done, value } = await reader.read();
if (done) {
// Release the lock when the stream is exhausted so the
// underlying ReadableStream can be garbage-collected.
reader.releaseLock();
return { done: true, value: undefined as unknown };
} else {
this.handleChunkReceived(value);
return { done: false, value: value.content! };
}
} catch (error: unknown) {
// Release the lock on error so it doesn't stay held when the
// consumer never calls return() (e.g. breaking out of for-await).
reader.releaseLock();
log.error('error processing stream update: %s', error);
return { done: true, value: undefined as unknown };
}
Expand Down Expand Up @@ -121,12 +127,18 @@ export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
[Symbol.asyncIterator]() {
const reader = this.reader.getReader();
const decoder = new TextDecoder();
const receivedChunks = this.receivedChunks;

return {
next: async (): Promise<IteratorResult<string>> => {
try {
const { done, value } = await reader.read();
if (done) {
// Release the lock when the stream is exhausted so the
// underlying ReadableStream can be garbage-collected.
reader.releaseLock();
// Clear received chunks so the buffered data can be GC'd.
receivedChunks.clear();
return { done: true, value: undefined };
} else {
this.handleChunkReceived(value);
Expand All @@ -136,13 +148,19 @@ export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
};
}
} catch (error: unknown) {
// Release the lock on error so it doesn't stay held when the
// consumer never calls return() (e.g. breaking out of for-await).
reader.releaseLock();
receivedChunks.clear();
log.error('error processing stream update: %s', error);
return { done: true, value: undefined };
}
},

return(): IteratorResult<string> {
reader.releaseLock();
// Clear received chunks so the buffered data can be GC'd.
receivedChunks.clear();
return { done: true, value: undefined };
},
};
Expand Down
Loading