Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions packages/livekit-rtc/src/audio_mixer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,106 @@ describe('AudioMixer', () => {
// Should get at least 2 frames (stream exhausts after 2)
expect(frames.length).toBeGreaterThanOrEqual(2);
});

it('completes mixing without lingering timers when iterator is fast', async () => {
const sampleRate = 48000;
const numChannels = 1;
const samplesPerChannel = 480;
const mixer = new AudioMixer(sampleRate, numChannels, {
blocksize: samplesPerChannel,
// Long timeout so the iterator always wins the race.
// Before the fix, each iteration leaked a 5s timer; with the fix,
// cancel() clears it immediately so the mixer shuts down without delay.
streamTimeoutMs: 5000,
});

const stream = createMockAudioStream(3, sampleRate, numChannels, samplesPerChannel, 42);
mixer.addStream(stream);

const frames: AudioFrame[] = [];
for await (const frame of mixer) {
frames.push(frame);
if (frames.length >= 2) break;
}

await mixer.aclose();

expect(frames.length).toBe(2);
// Verify the frames contain the expected mixed value
for (const frame of frames) {
expect(frame.data[0]).toBe(42);
}
});

it('produces frames even with many race iterations', async () => {
const sampleRate = 48000;
const numChannels = 1;
const samplesPerChannel = 480;
const mixer = new AudioMixer(sampleRate, numChannels, {
blocksize: samplesPerChannel,
streamTimeoutMs: 5000,
});

// Use more frames to stress multiple race iterations
const stream = createMockAudioStream(6, sampleRate, numChannels, samplesPerChannel, 10);
mixer.addStream(stream);

const frames: AudioFrame[] = [];
for await (const frame of mixer) {
frames.push(frame);
if (frames.length >= 4) break;
}

await mixer.aclose();

expect(frames.length).toBe(4);
// All frames should contain the expected value
for (const frame of frames) {
expect(frame.data[0]).toBe(10);
}
});

it('handles slow streams via timeout path', async () => {
const sampleRate = 48000;
const numChannels = 1;
const samplesPerChannel = 480;
const mixer = new AudioMixer(sampleRate, numChannels, {
blocksize: samplesPerChannel,
// Very short timeout to trigger the timeout path
streamTimeoutMs: 1,
});

// Create a stream that is slower than the timeout
async function* slowStream(): AsyncGenerator<AudioFrame> {
await new Promise((resolve) => setTimeout(resolve, 200));
const data = new Int16Array(numChannels * samplesPerChannel).fill(500);
yield new AudioFrame(data, sampleRate, numChannels, samplesPerChannel);
}

// Suppress the expected console.warn from the timeout path
const originalWarn = console.warn;
const warnings: string[] = [];
console.warn = (...args: unknown[]) => {
warnings.push(args.map(String).join(' '));
};

try {
mixer.addStream(slowStream());

// The mixer should produce a frame (zero-padded due to timeout)
// and auto-close when the stream exhausts.
const frames: AudioFrame[] = [];
for await (const frame of mixer) {
frames.push(frame);
if (frames.length >= 1) break;
}

await mixer.aclose();

// The timeout warning should have been logged
expect(warnings.some((w) => w.includes('stream timeout after'))).toBe(true);
} finally {
console.warn = originalWarn;
}
});
});
32 changes: 25 additions & 7 deletions packages/livekit-rtc/src/audio_mixer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,19 +311,25 @@ export class AudioMixer {
// Accumulate data until we have at least chunkSize samples
while (buf.length < this.chunkSize * this.numChannels && !exhausted && !this.closed) {
try {
const result = await Promise.race([iterator.next(), this.timeout(this.streamTimeoutMs)]);

if (result === 'timeout') {
const { result, clearTimeout: cancel } = this.timeoutRace(
iterator.next(),
this.streamTimeoutMs,
);
const value = await result;
cancel();

if (value === 'timeout') {
console.warn(`AudioMixer: stream timeout after ${this.streamTimeoutMs}ms`);
break;
}
const iterResult = value;

if (result.done) {
if (iterResult.done) {
exhausted = true;
break;
}

const frame = result.value;
const frame = iterResult.value;
const newData = frame.data;

// Mark that we received data in this call
Expand Down Expand Up @@ -412,7 +418,19 @@ export class AudioMixer {
return new Promise((resolve) => setTimeout(resolve, ms));
}

private timeout(ms: number): Promise<'timeout'> {
return new Promise((resolve) => setTimeout(() => resolve('timeout'), ms));
/** Race a promise against a timeout, returning a handle to clear the timer
* so the losing setTimeout doesn't linger after the winner resolves. */
private timeoutRace<T>(
promise: Promise<T>,
ms: number,
): { result: Promise<T | 'timeout'>; clearTimeout: () => void } {
let timer: ReturnType<typeof setTimeout>;
const timeoutPromise = new Promise<'timeout'>((resolve) => {
timer = setTimeout(() => resolve('timeout'), ms);
});
return {
result: Promise.race([promise, timeoutPromise]),
clearTimeout: () => clearTimeout(timer),
};
}
}
8 changes: 8 additions & 0 deletions packages/livekit-rtc/src/audio_resampler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ export class AudioResampler {
return this.#channels;
}

/**
* Releases the underlying native resampler handle. Must be called when
* the resampler is no longer needed to avoid leaking the FD.
*/
close() {
this.#ffiHandle.dispose();
}

/**
* Push audio data into the resampler and retrieve any available resampled data.
*
Expand Down
6 changes: 6 additions & 0 deletions packages/livekit-rtc/src/audio_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ export class AudioSource {
}

async close() {
// Clear any pending playout timeout so its callback doesn't fire after
// the handle is disposed, which would reference freed native state.
if (this.timeout) {
clearTimeout(this.timeout);
this.timeout = undefined;
}
this.ffiHandle.dispose();
this.closed = true;
}
Expand Down
6 changes: 6 additions & 0 deletions packages/livekit-rtc/src/audio_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ class AudioStreamSource implements UnderlyingSource<AudioFrame> {
case 'eos':
FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent);
this.controller.close();
// Dispose the native handle so the FD is released on stream end,
// not just when cancel() is called explicitly by the consumer.
this.ffiHandle.dispose();
this.frameProcessor?.close();
Comment on lines +109 to 112
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Double-dispose of native FfiHandle when EOS fires with queued frames and consumer cancels

The new EOS handler calls this.ffiHandle.dispose() at line 111, and the cancel() method also calls this.ffiHandle.dispose() at line 123. Per the WHATWG ReadableStream spec, when controller.close() is called (line 108), enqueued frames are still drained by the consumer. If there are queued frames and the consumer cancels before reading them all (e.g., by breaking out of a for await...of loop), the stream is still in the "readable" state (close pending), so the runtime invokes cancel() on the underlying source — leading to ffiHandle.dispose() being called a second time on an already-freed native handle. The same double-invocation applies to this.frameProcessor?.close() on lines 112 and 126.

Prompt for agents
In packages/livekit-rtc/src/audio_stream.ts, add a boolean flag (e.g. `private disposed = false`) to AudioStreamSource. In both the EOS handler (around line 107-113) and the cancel() method (around line 121-127), check and set the flag before calling ffiHandle.dispose() and frameProcessor?.close(). For example:

  private disposed = false;

  // In onEvent EOS case:
  case 'eos':
    FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent);
    this.controller.close();
    if (!this.disposed) {
      this.disposed = true;
      this.ffiHandle.dispose();
      this.frameProcessor?.close();
    }
    break;

  // In cancel():
  cancel() {
    FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent);
    if (!this.disposed) {
      this.disposed = true;
      this.ffiHandle.dispose();
      this.frameProcessor?.close();
    }
  }
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

break;
}
Expand All @@ -118,6 +121,9 @@ class AudioStreamSource implements UnderlyingSource<AudioFrame> {
cancel() {
FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent);
this.ffiHandle.dispose();
// Also close the frame processor on cancel for symmetry with the EOS path,
// so resources are released regardless of how the stream ends.
this.frameProcessor?.close();
}
}

Expand Down
18 changes: 18 additions & 0 deletions packages/livekit-rtc/src/data_streams/stream_reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,18 @@ export class ByteStreamReader extends BaseStreamReader<ByteStreamInfo> {
try {
const { done, value } = await reader.read();
if (done) {
// Release the lock when the stream is exhausted so the
// underlying ReadableStream can be garbage-collected.
reader.releaseLock();
return { done: true, value: undefined as unknown };
} else {
this.handleChunkReceived(value);
return { done: false, value: value.content! };
}
} catch (error: unknown) {
// Release the lock on error so it doesn't stay held when the
// consumer never calls return() (e.g. breaking out of for-await).
reader.releaseLock();
log.error('error processing stream update: %s', error);
return { done: true, value: undefined as unknown };
}
Expand Down Expand Up @@ -121,12 +127,18 @@ export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
[Symbol.asyncIterator]() {
const reader = this.reader.getReader();
const decoder = new TextDecoder();
const receivedChunks = this.receivedChunks;

return {
next: async (): Promise<IteratorResult<string>> => {
try {
const { done, value } = await reader.read();
if (done) {
// Release the lock when the stream is exhausted so the
// underlying ReadableStream can be garbage-collected.
reader.releaseLock();
// Clear received chunks so the buffered data can be GC'd.
receivedChunks.clear();
return { done: true, value: undefined };
} else {
this.handleChunkReceived(value);
Expand All @@ -136,13 +148,19 @@ export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
};
}
} catch (error: unknown) {
// Release the lock on error so it doesn't stay held when the
// consumer never calls return() (e.g. breaking out of for-await).
reader.releaseLock();
receivedChunks.clear();
log.error('error processing stream update: %s', error);
return { done: true, value: undefined };
}
},

return(): IteratorResult<string> {
reader.releaseLock();
// Clear received chunks so the buffered data can be GC'd.
receivedChunks.clear();
return { done: true, value: undefined };
},
};
Expand Down
29 changes: 26 additions & 3 deletions packages/livekit-rtc/src/ffi_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,37 @@ export class FfiClient extends (EventEmitter as new () => TypedEmitter<FfiClient
return livekitRetrievePtr(data);
}

async waitFor<T>(predicate: (ev: FfiEvent) => boolean): Promise<T> {
return new Promise<T>((resolve) => {
async waitFor<T>(
predicate: (ev: FfiEvent) => boolean,
options?: { signal?: AbortSignal },
): Promise<T> {
return new Promise<T>((resolve, reject) => {
const listener = (ev: FfiEvent) => {
if (predicate(ev)) {
this.off(FfiClientEvent.FfiEvent, listener);
cleanup();
resolve(ev.message.value as T);
}
};

const cleanup = () => {
this.off(FfiClientEvent.FfiEvent, listener);
options?.signal?.removeEventListener('abort', onAbort);
};

// If an AbortSignal is provided, remove the listener when the signal
// fires so that pending waitFor() calls don't leak listeners after
// the room disconnects or the operation is cancelled.
const onAbort = () => {
cleanup();
reject(new Error(options?.signal?.reason ?? 'waitFor aborted'));
};

if (options?.signal?.aborted) {
reject(new Error(options.signal.reason ?? 'waitFor aborted'));
return;
}

options?.signal?.addEventListener('abort', onAbort);
this.on(FfiClientEvent.FfiEvent, listener);
});
}
Expand Down
Loading
Loading