Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
livekit: patch
livekit-api: patch
livekit-ffi: patch
---

Send publisher offer with join request to accelerate connection - #996 (@cnderrauber)
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions livekit-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ signal-client-tokio = [
"dep:reqwest",
"dep:livekit-runtime",
"livekit-runtime/tokio",
"dep:base64"
"dep:base64",
"dep:flate2"
]

signal-client-async = [
Expand All @@ -36,7 +37,8 @@ __signal-client-async-compatible = [
"dep:futures-util",
"dep:isahc",
"dep:livekit-runtime",
"dep:base64"
"dep:base64",
"dep:flate2"
]


Expand Down Expand Up @@ -101,6 +103,7 @@ http = "1.1"
reqwest = { version = "0.12", default-features = false, features = [ "json" ], optional = true }
isahc = { version = "1.7.2", default-features = false, features = [ "json", "text-decoding" ], optional = true }

flate2 = { version = "1", optional = true }
scopeguard = "1.2.0"
rand = { workspace = true }
os_info = "3.14.0"
Expand Down
113 changes: 94 additions & 19 deletions livekit-api/src/signal_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
use std::{
borrow::Cow,
fmt::Debug,
io::Write,
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc,
},
time::{Duration, SystemTime, UNIX_EPOCH},
};

use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, Engine};
use base64::{engine::general_purpose::URL_SAFE as BASE64_URL_SAFE, Engine};
use flate2::{write::GzEncoder, Compression};
use http::StatusCode;
use livekit_protocol as proto;
use livekit_runtime::{interval, sleep, Instant, JoinHandle};
Expand Down Expand Up @@ -157,6 +159,8 @@ impl SignalClient {
url: &str,
token: &str,
options: SignalOptions,
publisher_offer: Option<proto::SessionDescription>,
add_track_requests: Vec<proto::AddTrackRequest>,
) -> SignalResult<(Self, proto::JoinResponse, SignalEvents)> {
let handle_success = |inner: Arc<SignalInner>, join_response, stream_events| {
let (emitter, events) = mpsc::unbounded_channel();
Expand All @@ -166,7 +170,15 @@ impl SignalClient {
(Self { inner, emitter, handle: Mutex::new(Some(signal_task)) }, join_response, events)
};

match SignalInner::connect(url, token, options.clone()).await {
match SignalInner::connect(
url,
token,
options.clone(),
publisher_offer.clone(),
add_track_requests.clone(),
)
.await
{
Ok((inner, join_response, stream_events)) => {
Ok(handle_success(inner, join_response, stream_events))
}
Expand All @@ -180,7 +192,15 @@ impl SignalClient {

for url in urls.iter() {
log::info!("fallback connection to: {}", url);
match SignalInner::connect(url, token, options.clone()).await {
match SignalInner::connect(
url,
token,
options.clone(),
publisher_offer.clone(),
add_track_requests.clone(),
)
.await
{
Ok((inner, join_response, stream_events)) => {
return Ok(handle_success(inner, join_response, stream_events))
}
Expand Down Expand Up @@ -263,6 +283,8 @@ impl SignalInner {
url: &str,
token: &str,
options: SignalOptions,
publisher_offer: Option<proto::SessionDescription>,
add_track_requests: Vec<proto::AddTrackRequest>,
) -> SignalResult<(
Arc<Self>,
proto::JoinResponse,
Expand All @@ -271,7 +293,16 @@ impl SignalInner {
// Try v1 path first if single_peer_connection is enabled
let use_v1_path = options.single_peer_connection;
// For initial connection: reconnect=false, reconnect_reason=None, participant_sid=""
let lk_url = get_livekit_url(url, &options, use_v1_path, false, None, "")?;
let lk_url = get_livekit_url(
url,
&options,
use_v1_path,
false,
None,
"",
publisher_offer.as_ref(),
&add_track_requests,
)?;
// Try to connect to the SignalClient
let (stream, mut events, single_pc_mode_active) =
match SignalStream::connect(lk_url.clone(), token, options.connect_timeout).await {
Expand Down Expand Up @@ -301,7 +332,8 @@ impl SignalInner {
matches!(&err, SignalError::WsError(WsError::Http(e)) if e.status() == 404);

if use_v1_path && is_not_found {
let lk_url_v0 = get_livekit_url(url, &options, false, false, None, "")?;
let lk_url_v0 =
get_livekit_url(url, &options, false, false, None, "", None, &[])?;
log::warn!("v1 path not found (404), falling back to v0 path");
match SignalStream::connect(
lk_url_v0.clone(),
Expand Down Expand Up @@ -397,9 +429,18 @@ impl SignalInner {
// For reconnects: reconnect=true, participant_sid=sid
// For v1 path: reconnect and sid are encoded in the join_request protobuf
// For v0 path: reconnect and sid are added as separate query parameters
let lk_url =
get_livekit_url(&self.url, &self.options, self.single_pc_mode_active, true, None, sid)
.unwrap();
// No publisher offer for reconnections
let lk_url = get_livekit_url(
&self.url,
&self.options,
self.single_pc_mode_active,
true,
None,
sid,
None,
&[],
)
.unwrap();

let (new_stream, mut events) =
SignalStream::connect(lk_url, &token, self.options.connect_timeout).await?;
Expand Down Expand Up @@ -557,6 +598,8 @@ fn create_join_request_param(
os: String,
os_version: String,
device_model: String,
publisher_offer: Option<&proto::SessionDescription>,
add_track_requests: &[proto::AddTrackRequest],
) -> String {
let connection_settings = proto::ConnectionSettings {
auto_subscribe: options.auto_subscribe,
Expand All @@ -578,6 +621,8 @@ fn create_join_request_param(
client_info: Some(client_info),
connection_settings: Some(connection_settings),
reconnect,
publisher_offer: publisher_offer.cloned(),
add_track_requests: add_track_requests.to_vec(),
..Default::default()
};

Expand All @@ -594,13 +639,30 @@ fn create_join_request_param(
// Serialize JoinRequest to bytes
let join_request_bytes = join_request.encode_to_vec();

// Create WrappedJoinRequest (JS doesn't explicitly set compression, so default is NONE)
// Use gzip compression when publisher offer or add_track_requests are included
let (compressed_bytes, compression) =
if publisher_offer.is_some() || !add_track_requests.is_empty() {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
if encoder.write_all(&join_request_bytes).is_ok() {
if let Ok(compressed) = encoder.finish() {
(compressed, proto::wrapped_join_request::Compression::Gzip as i32)
} else {
(join_request_bytes, proto::wrapped_join_request::Compression::None as i32)
}
} else {
(join_request_bytes, proto::wrapped_join_request::Compression::None as i32)
}
} else {
(join_request_bytes, proto::wrapped_join_request::Compression::None as i32)
};

let wrapped_join_request =
proto::WrappedJoinRequest { join_request: join_request_bytes, ..Default::default() };
proto::WrappedJoinRequest { join_request: compressed_bytes, compression };

// Serialize WrappedJoinRequest to bytes and base64 encode
// Serialize WrappedJoinRequest to bytes and base64url encode
// (URL-safe base64 avoids percent-encoding issues in query parameters)
let wrapped_bytes = wrapped_join_request.encode_to_vec();
BASE64_STANDARD.encode(&wrapped_bytes)
BASE64_URL_SAFE.encode(&wrapped_bytes)
}

/// Build the LiveKit WebSocket URL for connection
Expand All @@ -619,6 +681,8 @@ fn get_livekit_url(
reconnect: bool,
reconnect_reason: Option<i32>,
participant_sid: &str,
publisher_offer: Option<&proto::SessionDescription>,
add_track_requests: &[proto::AddTrackRequest],
) -> SignalResult<url::Url> {
let mut lk_url = url::Url::parse(url).map_err(|err| SignalError::UrlParse(err.to_string()))?;

Expand Down Expand Up @@ -656,6 +720,8 @@ fn get_livekit_url(
os_info.os_type().to_string(),
os_info.version().to_string(),
device_model.to_string(),
publisher_offer,
add_track_requests,
);
lk_url.query_pairs_mut().append_pair("join_request", &join_request_param);
} else {
Expand Down Expand Up @@ -760,32 +826,41 @@ mod tests {
fn livekit_url_test() {
let io = SignalOptions::default();

assert!(get_livekit_url("localhost:7880", &io, false, false, None, "").is_err());
assert!(get_livekit_url("localhost:7880", &io, false, false, None, "", None, &[]).is_err());
assert_eq!(
get_livekit_url("https://localhost:7880", &io, false, false, None, "")
get_livekit_url("https://localhost:7880", &io, false, false, None, "", None, &[])
.unwrap()
.scheme(),
"wss"
);
assert_eq!(
get_livekit_url("http://localhost:7880", &io, false, false, None, "").unwrap().scheme(),
get_livekit_url("http://localhost:7880", &io, false, false, None, "", None, &[])
.unwrap()
.scheme(),
"ws"
);
assert_eq!(
get_livekit_url("wss://localhost:7880", &io, false, false, None, "").unwrap().scheme(),
get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None, &[])
.unwrap()
.scheme(),
"wss"
);
assert_eq!(
get_livekit_url("ws://localhost:7880", &io, false, false, None, "").unwrap().scheme(),
get_livekit_url("ws://localhost:7880", &io, false, false, None, "", None, &[])
.unwrap()
.scheme(),
"ws"
);
assert!(get_livekit_url("ftp://localhost:7880", &io, false, false, None, "").is_err());
assert!(get_livekit_url("ftp://localhost:7880", &io, false, false, None, "", None, &[])
.is_err());
}

#[test]
fn validate_url_test() {
let io = SignalOptions::default();
let lk_url = get_livekit_url("wss://localhost:7880", &io, false, false, None, "").unwrap();
let lk_url =
get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None, &[])
.unwrap();
let validate_url = get_validate_url(lk_url);

// Should be /rtc/validate, not /rtc/rtc/validate
Expand Down
1 change: 1 addition & 0 deletions livekit/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub use crate::{
PushFrameError, PushFrameErrorReason, RemoteDataTrack,
},
id::*,
options::TrackPublishOptions,
participant::{
ConnectionQuality, DisconnectReason, LocalParticipant, Participant, PerformRpcData,
RemoteParticipant, RpcError, RpcErrorCode, RpcInvocationData,
Expand Down
Loading