diff --git a/.changeset/blue-dots-jump.md b/.changeset/blue-dots-jump.md new file mode 100644 index 00000000..fc5107f1 --- /dev/null +++ b/.changeset/blue-dots-jump.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Release reader lock on stream completion and clear receivedChunks to prevent memory leak diff --git a/packages/livekit-rtc/src/data_streams/stream_reader.ts b/packages/livekit-rtc/src/data_streams/stream_reader.ts index 5bfff94f..7067d69d 100644 --- a/packages/livekit-rtc/src/data_streams/stream_reader.ts +++ b/packages/livekit-rtc/src/data_streams/stream_reader.ts @@ -53,12 +53,18 @@ export class ByteStreamReader extends BaseStreamReader { try { const { done, value } = await reader.read(); if (done) { + // Release the lock when the stream is exhausted so the + // underlying ReadableStream can be garbage-collected. + reader.releaseLock(); return { done: true, value: undefined as unknown }; } else { this.handleChunkReceived(value); return { done: false, value: value.content! }; } } catch (error: unknown) { + // Release the lock on error so it doesn't stay held when the + // consumer never calls return() (e.g. breaking out of for-await). + reader.releaseLock(); log.error('error processing stream update: %s', error); return { done: true, value: undefined as unknown }; } @@ -121,12 +127,18 @@ export class TextStreamReader extends BaseStreamReader { [Symbol.asyncIterator]() { const reader = this.reader.getReader(); const decoder = new TextDecoder(); + const receivedChunks = this.receivedChunks; return { next: async (): Promise> => { try { const { done, value } = await reader.read(); if (done) { + // Release the lock when the stream is exhausted so the + // underlying ReadableStream can be garbage-collected. + reader.releaseLock(); + // Clear received chunks so the buffered data can be GC'd. + receivedChunks.clear(); return { done: true, value: undefined }; } else { this.handleChunkReceived(value); @@ -136,6 +148,10 @@ export class TextStreamReader extends BaseStreamReader { }; } } catch (error: unknown) { + // Release the lock on error so it doesn't stay held when the + // consumer never calls return() (e.g. breaking out of for-await). + reader.releaseLock(); + receivedChunks.clear(); log.error('error processing stream update: %s', error); return { done: true, value: undefined }; } @@ -143,6 +159,8 @@ export class TextStreamReader extends BaseStreamReader { return(): IteratorResult { reader.releaseLock(); + // Clear received chunks so the buffered data can be GC'd. + receivedChunks.clear(); return { done: true, value: undefined }; }, };