Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ members = [
"examples/basic_room",
"examples/basic_text_stream",
"examples/encrypted_text_stream",
"examples/agent_audio_latency",
"examples/local_audio",
"examples/local_video",
"examples/mobile",
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ match event {
![](https://github.com/livekit/rust-sdks/blob/main/examples/images/simple-room-demo.gif)

- [basic room](https://github.com/livekit/rust-sdks/tree/main/examples/basic_room): simple example connecting to a room.
- [agent_audio_latency](https://github.com/livekit/rust-sdks/tree/main/examples/agent_audio_latency): audio-only mic/speaker example for talking to an agent and estimating response latency.
- [wgpu_room](https://github.com/livekit/rust-sdks/tree/main/examples/wgpu_room): complete example app with video rendering using wgpu and egui.
- [mobile](https://github.com/livekit/rust-sdks/tree/main/examples/mobile): mobile app targeting iOS and Android
- [play_from_disk](https://github.com/livekit/rust-sdks/tree/main/examples/play_from_disk): publish audio from a wav file
Expand Down
16 changes: 16 additions & 0 deletions examples/agent_audio_latency/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "agent_audio_latency"
version = "0.1.0"
edition.workspace = true
publish = false

[dependencies]
anyhow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
cpal = "0.15"
env_logger = { workspace = true }
futures-util = { workspace = true }
livekit = { workspace = true, features = ["rustls-tls-native-roots"] }
livekit-api = { workspace = true, features = ["rustls-tls-native-roots"] }
log = { workspace = true }
tokio = { workspace = true, features = ["full"] }
55 changes: 55 additions & 0 deletions examples/agent_audio_latency/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Agent Audio Latency Example

This example connects to a LiveKit room, publishes microphone audio with `cpal`, plays remote audio back to the default speaker, and optionally injects a short probe tone to estimate agent response latency.

It is audio-only. No video tracks are created.

## What the latency metric means

When `--benchmark` is enabled, the app waits for remote audio to be quiet, injects a short tone burst into the outgoing audio, and measures how long it takes before remote audio becomes active again.

That metric works well for:

- echo / loopback agents
- agents that immediately answer with speech or audio

It is not a codec-level mouth-to-ear measurement. It is an application-level "probe sent to first remote audio response" estimate.

## Usage

With a pre-minted participant token:

```bash
cargo run -p agent_audio_latency -- \
--url "$LIVEKIT_URL" \
--token "$LIVEKIT_TOKEN"
```

Or mint a token locally:

```bash
cargo run -p agent_audio_latency -- \
--url "$LIVEKIT_URL" \
--api-key "$LIVEKIT_API_KEY" \
--api-secret "$LIVEKIT_API_SECRET" \
--room-name my-room \
--identity rust-agent-client
```

Enable the latency benchmark:

```bash
cargo run -p agent_audio_latency -- \
--url "$LIVEKIT_URL" \
--token "$LIVEKIT_TOKEN" \
--benchmark
```

If you only want to listen to a specific agent participant:

```bash
cargo run -p agent_audio_latency -- \
--url "$LIVEKIT_URL" \
--token "$LIVEKIT_TOKEN" \
--agent-identity my-agent
```
137 changes: 137 additions & 0 deletions examples/agent_audio_latency/src/audio_capture.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use crate::latency::TurnLatencyBench;
use anyhow::{anyhow, Result};
use cpal::traits::{DeviceTrait, StreamTrait};
use cpal::{Device, SampleFormat, SizedSample, Stream, StreamConfig};
use log::{error, info, warn};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use tokio::sync::mpsc;

pub struct AudioCapture {
_stream: Stream,
is_running: Arc<AtomicBool>,
}

impl AudioCapture {
pub fn new(
device: Device,
config: StreamConfig,
sample_format: SampleFormat,
audio_tx: mpsc::UnboundedSender<Vec<i16>>,
channel_index: u32,
num_input_channels: u32,
benchmark: Option<Arc<Mutex<TurnLatencyBench>>>,
) -> Result<Self> {
let is_running = Arc::new(AtomicBool::new(true));
let stream = match sample_format {
SampleFormat::F32 => Self::create_input_stream::<f32>(
device,
config,
audio_tx,
is_running.clone(),
channel_index,
num_input_channels,
benchmark.clone(),
)?,
SampleFormat::I16 => Self::create_input_stream::<i16>(
device,
config,
audio_tx,
is_running.clone(),
channel_index,
num_input_channels,
benchmark.clone(),
)?,
SampleFormat::U16 => Self::create_input_stream::<u16>(
device,
config,
audio_tx,
is_running.clone(),
channel_index,
num_input_channels,
benchmark,
)?,
other => return Err(anyhow!("unsupported input sample format: {other:?}")),
};

stream.play()?;
info!("audio capture stream started");

Ok(Self { _stream: stream, is_running })
}

fn create_input_stream<T>(
device: Device,
config: StreamConfig,
audio_tx: mpsc::UnboundedSender<Vec<i16>>,
is_running: Arc<AtomicBool>,
channel_index: u32,
num_input_channels: u32,
benchmark: Option<Arc<Mutex<TurnLatencyBench>>>,
) -> Result<Stream>
where
T: SizedSample + Send + 'static,
{
// cpal runs the microphone callback on the platform's real-time audio thread.
// Keep this callback short and non-blocking: push samples into a channel and let
// the dedicated uplink runtime handle framing and SDK calls.
let stream = device.build_input_stream(
&config,
move |data: &[T], _: &cpal::InputCallbackInfo| {
if !is_running.load(Ordering::Relaxed) {
return;
}

let converted: Vec<i16> = data
.iter()
.skip(channel_index as usize)
.step_by(num_input_channels as usize)
.map(|&sample| convert_sample_to_i16(sample))
.collect();

if let Some(benchmark) = &benchmark {
// Detect turn-end directly on the capture callback thread. For this
// benchmark, the audio callback timing is more meaningful than a later
// async task wakeup in the networking pipeline.
benchmark.lock().unwrap().observe_user_audio(&converted, config.sample_rate.0);
}

if let Err(err) = audio_tx.send(converted) {
warn!("failed to forward captured audio: {err}");
}
},
move |err| {
error!("audio input stream error: {err}");
},
None,
)?;

Ok(stream)
}

pub fn stop(&self) {
self.is_running.store(false, Ordering::Relaxed);
}
}

impl Drop for AudioCapture {
fn drop(&mut self) {
self.stop();
}
}

fn convert_sample_to_i16<T: SizedSample>(sample: T) -> i16 {
if std::mem::size_of::<T>() == std::mem::size_of::<f32>() {
let sample_f32 = unsafe { std::mem::transmute_copy::<T, f32>(&sample) };
(sample_f32.clamp(-1.0, 1.0) * i16::MAX as f32) as i16
} else if std::mem::size_of::<T>() == std::mem::size_of::<i16>() {
unsafe { std::mem::transmute_copy::<T, i16>(&sample) }
} else if std::mem::size_of::<T>() == std::mem::size_of::<u16>() {
let sample_u16 = unsafe { std::mem::transmute_copy::<T, u16>(&sample) };
((sample_u16 as i32) - (u16::MAX as i32 / 2)) as i16
} else {
0
}
}
42 changes: 42 additions & 0 deletions examples/agent_audio_latency/src/audio_mixer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};

#[derive(Clone)]
pub struct AudioMixer {
buffer: Arc<Mutex<VecDeque<i16>>>,
volume: f32,
max_buffer_size: usize,
}

impl AudioMixer {
pub fn new(sample_rate: u32, channels: u32, volume: f32) -> Self {
Self {
buffer: Arc::new(Mutex::new(VecDeque::with_capacity(
sample_rate as usize * channels as usize,
))),
volume: volume.clamp(0.0, 1.0),
max_buffer_size: sample_rate as usize * channels as usize,
}
}

pub fn add_audio_data(&self, data: &[i16]) {
let mut buffer = self.buffer.lock().unwrap();
for &sample in data {
buffer.push_back((sample as f32 * self.volume) as i16);
if buffer.len() > self.max_buffer_size {
buffer.pop_front();
}
}
}

pub fn get_samples(&self, requested_samples: usize) -> Vec<i16> {
let mut buffer = self.buffer.lock().unwrap();
let mut result = Vec::with_capacity(requested_samples);

for _ in 0..requested_samples {
result.push(buffer.pop_front().unwrap_or(0));
}

result
}
}
122 changes: 122 additions & 0 deletions examples/agent_audio_latency/src/audio_playback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use crate::audio_mixer::AudioMixer;
use crate::latency::TurnLatencyBench;
use anyhow::{anyhow, Result};
use cpal::traits::{DeviceTrait, StreamTrait};
use cpal::{Device, FromSample, Sample, SampleFormat, SizedSample, Stream, StreamConfig};
use log::{error, info};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};

pub struct AudioPlayback {
_stream: Stream,
is_running: Arc<AtomicBool>,
}

impl AudioPlayback {
pub fn new(
device: Device,
config: StreamConfig,
sample_format: SampleFormat,
mixer: AudioMixer,
benchmark: Option<Arc<Mutex<TurnLatencyBench>>>,
) -> Result<Self> {
let is_running = Arc::new(AtomicBool::new(true));
let stream = match sample_format {
SampleFormat::F32 => Self::create_output_stream::<f32>(
device,
config,
mixer,
is_running.clone(),
benchmark.clone(),
)?,
SampleFormat::I16 => Self::create_output_stream::<i16>(
device,
config,
mixer,
is_running.clone(),
benchmark.clone(),
)?,
SampleFormat::U16 => Self::create_output_stream::<u16>(
device,
config,
mixer,
is_running.clone(),
benchmark,
)?,
other => return Err(anyhow!("unsupported output sample format: {other:?}")),
};

stream.play()?;
info!("audio playback stream started");

Ok(Self { _stream: stream, is_running })
}

fn create_output_stream<T>(
device: Device,
config: StreamConfig,
mixer: AudioMixer,
is_running: Arc<AtomicBool>,
benchmark: Option<Arc<Mutex<TurnLatencyBench>>>,
) -> Result<Stream>
where
T: SizedSample + Sample + Send + 'static + FromSample<f32>,
{
// Speaker rendering also runs on a separate real-time audio thread managed by cpal.
// It must not be blocked by network or room-event work, which is why playback pulls
// already-mixed samples from shared state instead of awaiting Tokio work here.
let stream = device.build_output_stream(
&config,
move |data: &mut [T], _: &cpal::OutputCallbackInfo| {
if !is_running.load(Ordering::Relaxed) {
for sample in data.iter_mut() {
*sample = Sample::from_sample(0.0f32);
}
return;
}

let samples = mixer.get_samples(data.len());
if let Some(benchmark) = &benchmark {
// Detect speaker response on the render callback thread so the benchmark
// measures when audio is actually handed to the output device path.
benchmark.lock().unwrap().observe_speaker_audio(&samples, config.sample_rate.0);
}
for (slot, sample) in data.iter_mut().zip(samples.into_iter()) {
*slot = convert_i16_to_sample::<T>(sample);
}
},
move |err| {
error!("audio output stream error: {err}");
},
None,
)?;

Ok(stream)
}

pub fn stop(&self) {
self.is_running.store(false, Ordering::Relaxed);
}
}

impl Drop for AudioPlayback {
fn drop(&mut self) {
self.stop();
}
}

fn convert_i16_to_sample<T: SizedSample + Sample + FromSample<f32>>(sample: i16) -> T {
if std::mem::size_of::<T>() == std::mem::size_of::<f32>() {
let sample_f32 = sample as f32 / i16::MAX as f32;
unsafe { std::mem::transmute_copy::<f32, T>(&sample_f32) }
} else if std::mem::size_of::<T>() == std::mem::size_of::<i16>() {
unsafe { std::mem::transmute_copy::<i16, T>(&sample) }
} else if std::mem::size_of::<T>() == std::mem::size_of::<u16>() {
let sample_u16 = ((sample as i32) + (u16::MAX as i32 / 2)) as u16;
unsafe { std::mem::transmute_copy::<u16, T>(&sample_u16) }
} else {
Sample::from_sample(0.0f32)
}
}
Loading