From a43e33425c67c45c9f299ebfeb32730c1daa7c80 Mon Sep 17 00:00:00 2001 From: LautaroPetaccio Date: Wed, 1 Apr 2026 11:24:53 -0300 Subject: [PATCH 1/2] fix: release reader lock on done/error in stream iterators and clear receivedChunks --- .../src/data_streams/stream_reader.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/packages/livekit-rtc/src/data_streams/stream_reader.ts b/packages/livekit-rtc/src/data_streams/stream_reader.ts index 5bfff94f..7067d69d 100644 --- a/packages/livekit-rtc/src/data_streams/stream_reader.ts +++ b/packages/livekit-rtc/src/data_streams/stream_reader.ts @@ -53,12 +53,18 @@ export class ByteStreamReader extends BaseStreamReader { try { const { done, value } = await reader.read(); if (done) { + // Release the lock when the stream is exhausted so the + // underlying ReadableStream can be garbage-collected. + reader.releaseLock(); return { done: true, value: undefined as unknown }; } else { this.handleChunkReceived(value); return { done: false, value: value.content! }; } } catch (error: unknown) { + // Release the lock on error so it doesn't stay held when the + // consumer never calls return() (e.g. breaking out of for-await). + reader.releaseLock(); log.error('error processing stream update: %s', error); return { done: true, value: undefined as unknown }; } @@ -121,12 +127,18 @@ export class TextStreamReader extends BaseStreamReader { [Symbol.asyncIterator]() { const reader = this.reader.getReader(); const decoder = new TextDecoder(); + const receivedChunks = this.receivedChunks; return { next: async (): Promise> => { try { const { done, value } = await reader.read(); if (done) { + // Release the lock when the stream is exhausted so the + // underlying ReadableStream can be garbage-collected. + reader.releaseLock(); + // Clear received chunks so the buffered data can be GC'd. + receivedChunks.clear(); return { done: true, value: undefined }; } else { this.handleChunkReceived(value); @@ -136,6 +148,10 @@ export class TextStreamReader extends BaseStreamReader { }; } } catch (error: unknown) { + // Release the lock on error so it doesn't stay held when the + // consumer never calls return() (e.g. breaking out of for-await). + reader.releaseLock(); + receivedChunks.clear(); log.error('error processing stream update: %s', error); return { done: true, value: undefined }; } @@ -143,6 +159,8 @@ export class TextStreamReader extends BaseStreamReader { return(): IteratorResult { reader.releaseLock(); + // Clear received chunks so the buffered data can be GC'd. + receivedChunks.clear(); return { done: true, value: undefined }; }, }; From 96c4cbf42b08f110c410e7a1bcb207f3f171dab6 Mon Sep 17 00:00:00 2001 From: LautaroPetaccio Date: Wed, 1 Apr 2026 11:37:07 -0300 Subject: [PATCH 2/2] chore: add changeset --- .changeset/blue-dots-jump.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/blue-dots-jump.md diff --git a/.changeset/blue-dots-jump.md b/.changeset/blue-dots-jump.md new file mode 100644 index 00000000..fc5107f1 --- /dev/null +++ b/.changeset/blue-dots-jump.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Release reader lock on stream completion and clear receivedChunks to prevent memory leak