Conversation
|
| // Dispose the native handle so the FD is released on stream end, | ||
| // not just when cancel() is called explicitly by the consumer. | ||
| this.ffiHandle.dispose(); | ||
| this.frameProcessor?.close(); |
There was a problem hiding this comment.
🔴 Double-dispose of native FfiHandle when EOS fires with queued frames and consumer cancels
The new EOS handler calls this.ffiHandle.dispose() at line 111, and the cancel() method also calls this.ffiHandle.dispose() at line 123. Per the WHATWG ReadableStream spec, when controller.close() is called (line 108), enqueued frames are still drained by the consumer. If there are queued frames and the consumer cancels before reading them all (e.g., by breaking out of a for await...of loop), the stream is still in the "readable" state (close pending), so the runtime invokes cancel() on the underlying source — leading to ffiHandle.dispose() being called a second time on an already-freed native handle. The same double-invocation applies to this.frameProcessor?.close() on lines 112 and 126.
Prompt for agents
In packages/livekit-rtc/src/audio_stream.ts, add a boolean flag (e.g. `private disposed = false`) to AudioStreamSource. In both the EOS handler (around line 107-113) and the cancel() method (around line 121-127), check and set the flag before calling ffiHandle.dispose() and frameProcessor?.close(). For example:
private disposed = false;
// In onEvent EOS case:
case 'eos':
FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent);
this.controller.close();
if (!this.disposed) {
this.disposed = true;
this.ffiHandle.dispose();
this.frameProcessor?.close();
}
break;
// In cancel():
cancel() {
FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent);
if (!this.disposed) {
this.disposed = true;
this.ffiHandle.dispose();
this.frameProcessor?.close();
}
}
Was this helpful? React with 👍 or 👎 to provide feedback.
| // Dispose the native handle so the FD is released on stream end, | ||
| // not just when cancel() is called explicitly by the consumer. | ||
| this.ffiHandle.dispose(); |
There was a problem hiding this comment.
🔴 Double-dispose of native FfiHandle in VideoStreamSource when EOS fires with queued frames and consumer cancels
Same issue as in AudioStreamSource: the new EOS handler at line 70 calls this.ffiHandle.dispose(), and cancel() at line 81 also calls this.ffiHandle.dispose(). If EOS fires while there are still enqueued video frames and the consumer cancels the stream before draining them (e.g., breaking from for await...of), the ReadableStream invokes both the EOS cleanup and cancel(), disposing the native handle twice.
Prompt for agents
In packages/livekit-rtc/src/video_stream.ts, add a boolean flag (e.g. `private disposed = false`) to VideoStreamSource. Guard both the EOS handler (around line 65-71) and cancel() (around line 79-82) so that ffiHandle.dispose() is only called once:
private disposed = false;
// In onEvent EOS case:
case 'eos':
FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent);
this.controller.close();
if (!this.disposed) {
this.disposed = true;
this.ffiHandle.dispose();
}
break;
// In cancel():
cancel() {
FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent);
if (!this.disposed) {
this.disposed = true;
this.ffiHandle.dispose();
}
}
Was this helpful? React with 👍 or 👎 to provide feedback.
When multiple participants connect and disconnect rapidly from a room, native FFI handles, stream controllers, and event listeners accumulate without being released. Each leaked resource holds an open file descriptor. Under sustained churn (the typical pattern for applications where participants join and leave frequently), the process eventually exhausts its FD limit and crashes. Additionally, stream readers accumulate buffered data that is never released, causing gradual memory growth.
There are eleven independent leak vectors, each small on its own, but compounding under load:
In-progress stream controllers survive
disconnect().Room.disconnect()removed event listeners and calledremoveAllListeners(), but never closed theReadableStreamDefaultControllerinstances stored inbyteStreamControllersandtextStreamControllers. If a sender disconnects mid-transfer (so no trailer is ever received), the receiving side's stream stays open forever. Each open stream holds a controller reference and any buffered chunks, preventing GC.Track publication handles are never disposed on remote participant disconnect.
When the room processes a
participantDisconnectedevent, it deletes the participant from theremoteParticipantsmap and emits an event — but never touches the participant'strackPublications. EachTrackPublicationwraps anFfiHandlethat maps to a native resource. Dropping the JS reference without callingdispose()means the native side never frees the handle. With N participants each publishing M tracks, every disconnect leaks N×M handles.Audio and video stream native handles leak on normal stream end.
AudioStreamSourceandVideoStreamSourceregister an event listener onFfiClientin their constructor and create anFfiHandlefor the native stream. Oneos(end-of-stream), they removed the listener and closed the controller but never calledffiHandle.dispose(). The handle was only disposed in thecancel()path. Streams that end normally (the common case) leaked the native handle. Additionally,AudioStreamSource.cancel()did not close theframeProcessor, so cancelling a noise-cancelled stream leaked the processor.AudioResamplerhas no way to release its native handle.The class creates an
FfiHandlein its constructor but exposes noclose()ordispose()method. Every resampler instance leaks for the lifetime of the process.AudioSource.close()leaves a danglingsetTimeout.captureFrame()schedules a timeout that callsthis.releaseafter the current queue drains.close()disposed the native handle and setclosed = true, but never cleared the timeout. The callback fires after disposal, referencing freed native state. While not a direct FD leak, it causes use-after-free on the native side that can prevent the handle from being fully released.Concurrent
getSid()calls each register independent listeners.Each call creates its own
RoomSidChanged+Disconnectedlistener pair. If multiple calls race, only one resolves the SID and cleans up its listener — the rest stay attached until disconnect, at which point only theDisconnectedlistener is cleaned per call. TheRoomSidChangedlisteners from already-resolved calls persist.FfiClient.waitFor()listeners leak when their predicate never matches.waitFor()registers a listener on the FfiClientEventEmitterthat is only removed when the predicate returns true. If the room disconnects before the expected FFI callback arrives (e.g. apublishDatacall is in-flight during disconnect), the listener stays attached forever. Everyawait waitFor()call site (~28 across participant.ts, room.ts, and audio_source.ts) is affected. Under rapid connect/disconnect cycles, this is the primary source of unbounded listener accumulation on the singletonFfiClient.WritableStream.abort()instreamText/streamBytesdoesn't close the remote stream.The
abort()handlers only logged the error. If a WritableStream is aborted due to a write failure, the remote side'sReadableStreamDefaultControllerstays open waiting for chunks that will never arrive — the same leak pattern as Still seeingidentity is required for join but not setwhen calling RoomService methods on version >= 0.2.1 #1 but triggered from the sender side.Stream reader async iterator lock not released on done/error.
Both
ByteStreamReaderandTextStreamReaderasync iterators only released the reader lock in the explicitreturn()method. If the stream completed normally (done: true) or an error was caught innext(), the lock stayed held. Consumers that don't callreturn()(e.g.for-awaitloops that break on a condition, orreadAll()which runs to completion) would leave the underlyingReadableStreamlocked and unable to be garbage-collected.AudioMixerleaks orphanedsetTimeouttimers on every mixing iteration.getContribution()usesPromise.race([iterator.next(), this.timeout(ms)])to race the stream against a deadline. When the iterator wins (the normal case), the losing timeout'ssetTimeoutis never cleared. It fires harmlessly but creates an orphaned timer on every iteration. Under sustained mixing this creates a steady stream of wasted work, and in pathological cases can pressure the timer heap.TextStreamReader.receivedChunksaccumulates indefinitely (memory leak).The
receivedChunksMap stores every chunk received during the stream's lifetime for version-based deduplication and progress tracking. It is never cleared — not when the stream is fully consumed, not on error, and not when the consumer breaks out of iteration. For large or frequent text streams, this keeps all chunk data in memory for the lifetime of the reader object, causing gradual memory growth.How
1. Close stream controllers in
Room.disconnect()—room.tsAfter the FFI disconnect callback completes but before removing listeners, iterate both
byteStreamControllersandtextStreamControllers, callcontroller.close()on each (wrapped in try/catch since a controller may already be closed), and clear the maps. This ensures streams that never received a trailer are properly terminated.2. Dispose track publication handles on participant disconnect —
room.tsIn the
participantDisconnectedevent handler, before deleting the participant fromremoteParticipantsand emitting the event, loop throughparticipant.trackPublications, callffiHandle.dispose()on each publication, and clear the map. The emit happens after cleanup so consumers see the participant in a clean state.3. Dispose native handles on audio/video stream EOS —
audio_stream.ts,video_stream.tsIn the
eoscase of bothAudioStreamSource.onEventandVideoStreamSource.onEvent, addthis.ffiHandle.dispose()after closing the controller. This makes the EOS path symmetric with thecancel()path. Also addthis.frameProcessor?.close()toAudioStreamSource.cancel()so both termination paths release the processor.4. Add
close()toAudioResampler—audio_resampler.tsAdd a public
close()method that callsthis.#ffiHandle.dispose(). This follows the same pattern asAudioSource.close()andVideoSource.close(), giving callers a way to release the native resampler when done.5. Clear timeout in
AudioSource.close()—audio_source.tsBefore disposing the handle, check for a pending
this.timeoutand callclearTimeout()+ set it toundefined. This prevents the scheduledreleasecallback from firing after the native handle is freed.6. Deduplicate
getSid()listeners —room.tsAdd a
private sidPromise?: Promise<string>field. On the firstgetSid()call that needs to wait, create the promise and store it. Subsequent concurrent calls return the same promise. When the promise resolves or rejects,sidPromiseis cleared so future calls after a reconnect work normally. This ensures exactly oneRoomSidChanged+Disconnectedlistener pair exists regardless of how many callers are waiting.7. Add
AbortSignalsupport toFfiClient.waitFor()—ffi_client.ts,room.ts,participant.tswaitFor()now accepts an optional{ signal?: AbortSignal }parameter. When the signal fires, the listener is removed from theEventEmitterand the promise rejects. TheRoomclass holds anAbortControllerthat is aborted indisconnect()and reset inconnect(). Its signal is passed toLocalParticipanton construction, which threads it into all 14waitFor()call sites. This ensures every pending FFI listener is cleaned up when the room disconnects, regardless of whether the expected callback ever arrives.8. Send trailer on
WritableStreamabort —participant.tsBoth
streamText()andstreamBytes()abort handlers now send aDataStream_Trailerwith the error reason (best-effort, wrapped in try/catch since the connection may already be gone). This closes the remote side's stream controller instead of leaving it open waiting for data that will never arrive.9. Release reader lock on done/error in stream iterators —
stream_reader.tsBoth
ByteStreamReaderandTextStreamReaderasync iterators now callreader.releaseLock()in thedoneandcatchpaths ofnext(), not just inreturn(). This ensures the lock is released regardless of how the iteration ends — normal completion, error, or explicitreturn().10. Cancel losing timeout in
AudioMixer.timeoutRace()—audio_mixer.tsReplace the old
timeout()method (which returned an uncancellable promise) withtimeoutRace(), which returns both the race promise and aclearTimeouthandle. After the race resolves ingetContribution(),cancel()is called immediately to clear the losing timer. This eliminates orphanedsetTimeoutcallbacks on every mixing iteration.11. Clear
receivedChunkson stream completion —stream_reader.tsAdd
this.receivedChunks.clear()in all three termination paths of theTextStreamReaderasync iterator: thedonepath (stream fully consumed), thecatchpath (error during read), and thereturn()path (consumer breaks out offor-awaitearly). This ensures the buffered chunk data can be garbage-collected as soon as the stream iteration ends, rather than being held for the lifetime of theTextStreamReaderinstance.