diff --git a/.changeset/long-keys-roll.md b/.changeset/long-keys-roll.md new file mode 100644 index 00000000..d2b3841e --- /dev/null +++ b/.changeset/long-keys-roll.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Cancel losing timeout in AudioMixer race to prevent orphaned timers diff --git a/packages/livekit-rtc/src/audio_mixer.test.ts b/packages/livekit-rtc/src/audio_mixer.test.ts index 45cc9ffd..d4dfbfaa 100644 --- a/packages/livekit-rtc/src/audio_mixer.test.ts +++ b/packages/livekit-rtc/src/audio_mixer.test.ts @@ -164,4 +164,106 @@ describe('AudioMixer', () => { // Should get at least 2 frames (stream exhausts after 2) expect(frames.length).toBeGreaterThanOrEqual(2); }); + + it('completes mixing without lingering timers when iterator is fast', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + // Long timeout so the iterator always wins the race. + // Before the fix, each iteration leaked a 5s timer; with the fix, + // cancel() clears it immediately so the mixer shuts down without delay. + streamTimeoutMs: 5000, + }); + + const stream = createMockAudioStream(3, sampleRate, numChannels, samplesPerChannel, 42); + mixer.addStream(stream); + + const frames: AudioFrame[] = []; + for await (const frame of mixer) { + frames.push(frame); + if (frames.length >= 2) break; + } + + await mixer.aclose(); + + expect(frames.length).toBe(2); + // Verify the frames contain the expected mixed value + for (const frame of frames) { + expect(frame.data[0]).toBe(42); + } + }); + + it('produces frames even with many race iterations', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + streamTimeoutMs: 5000, + }); + + // Use more frames to stress multiple race iterations + const stream = createMockAudioStream(6, sampleRate, numChannels, samplesPerChannel, 10); + mixer.addStream(stream); + + const frames: AudioFrame[] = []; + for await (const frame of mixer) { + frames.push(frame); + if (frames.length >= 4) break; + } + + await mixer.aclose(); + + expect(frames.length).toBe(4); + // All frames should contain the expected value + for (const frame of frames) { + expect(frame.data[0]).toBe(10); + } + }); + + it('handles slow streams via timeout path', async () => { + const sampleRate = 48000; + const numChannels = 1; + const samplesPerChannel = 480; + const mixer = new AudioMixer(sampleRate, numChannels, { + blocksize: samplesPerChannel, + // Very short timeout to trigger the timeout path + streamTimeoutMs: 1, + }); + + // Create a stream that is slower than the timeout + async function* slowStream(): AsyncGenerator { + await new Promise((resolve) => setTimeout(resolve, 200)); + const data = new Int16Array(numChannels * samplesPerChannel).fill(500); + yield new AudioFrame(data, sampleRate, numChannels, samplesPerChannel); + } + + // Suppress the expected console.warn from the timeout path + const originalWarn = console.warn; + const warnings: string[] = []; + console.warn = (...args: unknown[]) => { + warnings.push(args.map(String).join(' ')); + }; + + try { + mixer.addStream(slowStream()); + + // The mixer should produce a frame (zero-padded due to timeout) + // and auto-close when the stream exhausts. + const frames: AudioFrame[] = []; + for await (const frame of mixer) { + frames.push(frame); + if (frames.length >= 1) break; + } + + await mixer.aclose(); + + // The timeout warning should have been logged + expect(warnings.some((w) => w.includes('stream timeout after'))).toBe(true); + } finally { + console.warn = originalWarn; + } + }); }); diff --git a/packages/livekit-rtc/src/audio_mixer.ts b/packages/livekit-rtc/src/audio_mixer.ts index 1f75e16c..7d77f5af 100644 --- a/packages/livekit-rtc/src/audio_mixer.ts +++ b/packages/livekit-rtc/src/audio_mixer.ts @@ -310,20 +310,26 @@ export class AudioMixer { // Accumulate data until we have at least chunkSize samples while (buf.length < this.chunkSize * this.numChannels && !exhausted && !this.closed) { + const { result, clearTimeout: cancel } = this.timeoutRace( + iterator.next(), + this.streamTimeoutMs, + ); try { - const result = await Promise.race([iterator.next(), this.timeout(this.streamTimeoutMs)]); + const value = await result; + cancel(); - if (result === 'timeout') { + if (value === 'timeout') { console.warn(`AudioMixer: stream timeout after ${this.streamTimeoutMs}ms`); break; } + const iterResult = value; - if (result.done) { + if (iterResult.done) { exhausted = true; break; } - const frame = result.value; + const frame = iterResult.value; const newData = frame.data; // Mark that we received data in this call @@ -339,6 +345,9 @@ export class AudioMixer { buf = combined; } } catch (error) { + // Clear the timeout on the error path too, so it doesn't linger + // when iterator.next() rejects. + cancel(); console.error(`AudioMixer: Error reading from stream:`, error); exhausted = true; break; @@ -412,7 +421,19 @@ export class AudioMixer { return new Promise((resolve) => setTimeout(resolve, ms)); } - private timeout(ms: number): Promise<'timeout'> { - return new Promise((resolve) => setTimeout(() => resolve('timeout'), ms)); + /** Race a promise against a timeout, returning a handle to clear the timer + * so the losing setTimeout doesn't linger after the winner resolves. */ + private timeoutRace( + promise: Promise, + ms: number, + ): { result: Promise; clearTimeout: () => void } { + let timer: ReturnType; + const timeoutPromise = new Promise<'timeout'>((resolve) => { + timer = setTimeout(() => resolve('timeout'), ms); + }); + return { + result: Promise.race([promise, timeoutPromise]), + clearTimeout: () => clearTimeout(timer), + }; } }