From 4c6d5e3e31aac8c4d9a2e87277b9522591acf22a Mon Sep 17 00:00:00 2001 From: LautaroPetaccio Date: Wed, 1 Apr 2026 11:27:24 -0300 Subject: [PATCH 1/3] fix: cancel losing timeout in AudioMixer race to prevent timer leak --- packages/livekit-rtc/src/audio_mixer.test.ts | 102 +++++++++++++++++++ packages/livekit-rtc/src/audio_mixer.ts | 32 ++++-- 2 files changed, 127 insertions(+), 7 deletions(-) diff --git a/packages/livekit-rtc/src/audio_mixer.test.ts b/packages/livekit-rtc/src/audio_mixer.test.ts index 45cc9ffd..d4dfbfaa 100644 --- a/packages/livekit-rtc/src/audio_mixer.test.ts +++ b/packages/livekit-rtc/src/audio_mixer.test.ts @@ -164,4 +164,106 @@ describe('AudioMixer', () => { // Should get at least 2 frames (stream exhausts after 2) expect(frames.length).toBeGreaterThanOrEqual(2); }); + + it('completes mixing without lingering timers when iterator is fast', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + // Long timeout so the iterator always wins the race. + // Before the fix, each iteration leaked a 5s timer; with the fix, + // cancel() clears it immediately so the mixer shuts down without delay. + streamTimeoutMs: 5000, + }); + + const stream = createMockAudioStream(3, sampleRate, numChannels, samplesPerChannel, 42); + mixer.addStream(stream); + + const frames: AudioFrame[] = []; + for await (const frame of mixer) { + frames.push(frame); + if (frames.length >= 2) break; + } + + await mixer.aclose(); + + expect(frames.length).toBe(2); + // Verify the frames contain the expected mixed value + for (const frame of frames) { + expect(frame.data[0]).toBe(42); + } + }); + + it('produces frames even with many race iterations', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + streamTimeoutMs: 5000, + }); + + // Use more frames to stress multiple race iterations + const stream = createMockAudioStream(6, sampleRate, numChannels, samplesPerChannel, 10); + mixer.addStream(stream); + + const frames: AudioFrame[] = []; + for await (const frame of mixer) { + frames.push(frame); + if (frames.length >= 4) break; + } + + await mixer.aclose(); + + expect(frames.length).toBe(4); + // All frames should contain the expected value + for (const frame of frames) { + expect(frame.data[0]).toBe(10); + } + }); + + it('handles slow streams via timeout path', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + // Very short timeout to trigger the timeout path + streamTimeoutMs: 1, + }); + + // Create a stream that is slower than the timeout + async function* slowStream(): AsyncGenerator { + await new Promise((resolve) => setTimeout(resolve, 200)); + const data = new Int16Array(numChannels * samplesPerChannel).fill(500); + yield new AudioFrame(data, sampleRate, numChannels, samplesPerChannel); + } + + // Suppress the expected console.warn from the timeout path + const originalWarn = console.warn; + const warnings: string[] = []; + console.warn = (...args: unknown[]) => { + warnings.push(args.map(String).join(' ')); + }; + + try { + mixer.addStream(slowStream()); + + // The mixer should produce a frame (zero-padded due to timeout) + // and auto-close when the stream exhausts. + const frames: AudioFrame[] = []; + for await (const frame of mixer) { + frames.push(frame); + if (frames.length >= 1) break; + } + + await mixer.aclose(); + + // The timeout warning should have been logged + expect(warnings.some((w) => w.includes('stream timeout after'))).toBe(true); + } finally { + console.warn = originalWarn; + } + }); }); diff --git a/packages/livekit-rtc/src/audio_mixer.ts b/packages/livekit-rtc/src/audio_mixer.ts index 1f75e16c..744bbf6e 100644 --- a/packages/livekit-rtc/src/audio_mixer.ts +++ b/packages/livekit-rtc/src/audio_mixer.ts @@ -311,19 +311,25 @@ export class AudioMixer { // Accumulate data until we have at least chunkSize samples while (buf.length < this.chunkSize * this.numChannels && !exhausted && !this.closed) { try { - const result = await Promise.race([iterator.next(), this.timeout(this.streamTimeoutMs)]); - - if (result === 'timeout') { + const { result, clearTimeout: cancel } = this.timeoutRace( + iterator.next(), + this.streamTimeoutMs, + ); + const value = await result; + cancel(); + + if (value === 'timeout') { console.warn(`AudioMixer: stream timeout after ${this.streamTimeoutMs}ms`); break; } + const iterResult = value; - if (result.done) { + if (iterResult.done) { exhausted = true; break; } - const frame = result.value; + const frame = iterResult.value; const newData = frame.data; // Mark that we received data in this call @@ -412,7 +418,19 @@ export class AudioMixer { return new Promise((resolve) => setTimeout(resolve, ms)); } - private timeout(ms: number): Promise<'timeout'> { - return new Promise((resolve) => setTimeout(() => resolve('timeout'), ms)); + /** Race a promise against a timeout, returning a handle to clear the timer + * so the losing setTimeout doesn't linger after the winner resolves. */ + private timeoutRace( + promise: Promise, + ms: number, + ): { result: Promise; clearTimeout: () => void } { + let timer: ReturnType; + const timeoutPromise = new Promise<'timeout'>((resolve) => { + timer = setTimeout(() => resolve('timeout'), ms); + }); + return { + result: Promise.race([promise, timeoutPromise]), + clearTimeout: () => clearTimeout(timer), + }; } } From 39438b400fabd9f393bef9ddd211463d3fb00602 Mon Sep 17 00:00:00 2001 From: LautaroPetaccio Date: Wed, 1 Apr 2026 11:37:09 -0300 Subject: [PATCH 2/3] chore: add changeset --- .changeset/long-keys-roll.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/long-keys-roll.md diff --git a/.changeset/long-keys-roll.md b/.changeset/long-keys-roll.md new file mode 100644 index 00000000..d2b3841e --- /dev/null +++ b/.changeset/long-keys-roll.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Cancel losing timeout in AudioMixer race to prevent orphaned timers From 54d5d36da1b226b60b4bc950b33ac2bd83195f4b Mon Sep 17 00:00:00 2001 From: LautaroPetaccio Date: Wed, 1 Apr 2026 11:46:30 -0300 Subject: [PATCH 3/3] fix: clear timeout on error path when iterator.next() rejects --- packages/livekit-rtc/src/audio_mixer.ts | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/packages/livekit-rtc/src/audio_mixer.ts b/packages/livekit-rtc/src/audio_mixer.ts index 744bbf6e..7d77f5af 100644 --- a/packages/livekit-rtc/src/audio_mixer.ts +++ b/packages/livekit-rtc/src/audio_mixer.ts @@ -310,11 +310,11 @@ export class AudioMixer { // Accumulate data until we have at least chunkSize samples while (buf.length < this.chunkSize * this.numChannels && !exhausted && !this.closed) { + const { result, clearTimeout: cancel } = this.timeoutRace( + iterator.next(), + this.streamTimeoutMs, + ); try { - const { result, clearTimeout: cancel } = this.timeoutRace( - iterator.next(), - this.streamTimeoutMs, - ); const value = await result; cancel(); @@ -345,6 +345,9 @@ export class AudioMixer { buf = combined; } } catch (error) { + // Clear the timeout on the error path too, so it doesn't linger + // when iterator.next() rejects. + cancel(); console.error(`AudioMixer: Error reading from stream:`, error); exhausted = true; break;