From 73749ba5c6a9e4fa88e505d10a1908ef2ee346a6 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Tue, 7 Apr 2026 16:45:16 +0800 Subject: [PATCH 1/7] Send publisher offer with join request to accelerate connection align with js sdk: https://github.com/livekit/client-sdk-js/pull/1846 --- Cargo.lock | 1 + livekit-api/Cargo.toml | 7 +- livekit-api/src/signal_client/mod.rs | 88 +++++++++--- livekit/src/room/mod.rs | 3 +- livekit/src/rtc_engine/peer_transport.rs | 40 ++++++ livekit/src/rtc_engine/rtc_session.rs | 176 ++++++++++++++++------- 6 files changed, 244 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dc50e81f0..28ac98b45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4018,6 +4018,7 @@ version = "0.4.18" dependencies = [ "async-tungstenite", "base64 0.21.7", + "flate2", "futures-util", "http 1.4.0", "isahc", diff --git a/livekit-api/Cargo.toml b/livekit-api/Cargo.toml index 45eebf8e3..5b42a0266 100644 --- a/livekit-api/Cargo.toml +++ b/livekit-api/Cargo.toml @@ -17,7 +17,8 @@ signal-client-tokio = [ "dep:reqwest", "dep:livekit-runtime", "livekit-runtime/tokio", - "dep:base64" + "dep:base64", + "dep:flate2" ] signal-client-async = [ @@ -36,7 +37,8 @@ __signal-client-async-compatible = [ "dep:futures-util", "dep:isahc", "dep:livekit-runtime", - "dep:base64" + "dep:base64", + "dep:flate2" ] @@ -101,6 +103,7 @@ http = "1.1" reqwest = { version = "0.12", default-features = false, features = [ "json" ], optional = true } isahc = { version = "1.7.2", default-features = false, features = [ "json", "text-decoding" ], optional = true } +flate2 = { version = "1", optional = true } scopeguard = "1.2.0" rand = { workspace = true } os_info = "3.14.0" diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index 2a7b8bada..7943e1978 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -15,6 +15,7 @@ use std::{ borrow::Cow, fmt::Debug, + io::Write, sync::{ atomic::{AtomicBool, AtomicU32, Ordering}, Arc, @@ -22,7 +23,11 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, Engine}; +use base64::{ + engine::general_purpose::URL_SAFE as BASE64_URL_SAFE, + Engine, +}; +use flate2::{write::GzEncoder, Compression}; use http::StatusCode; use livekit_protocol as proto; use livekit_runtime::{interval, sleep, Instant, JoinHandle}; @@ -157,6 +162,7 @@ impl SignalClient { url: &str, token: &str, options: SignalOptions, + publisher_offer: Option, ) -> SignalResult<(Self, proto::JoinResponse, SignalEvents)> { let handle_success = |inner: Arc, join_response, stream_events| { let (emitter, events) = mpsc::unbounded_channel(); @@ -166,7 +172,7 @@ impl SignalClient { (Self { inner, emitter, handle: Mutex::new(Some(signal_task)) }, join_response, events) }; - match SignalInner::connect(url, token, options.clone()).await { + match SignalInner::connect(url, token, options.clone(), publisher_offer.clone()).await { Ok((inner, join_response, stream_events)) => { Ok(handle_success(inner, join_response, stream_events)) } @@ -180,7 +186,9 @@ impl SignalClient { for url in urls.iter() { log::info!("fallback connection to: {}", url); - match SignalInner::connect(url, token, options.clone()).await { + match SignalInner::connect(url, token, options.clone(), publisher_offer.clone()) + .await + { Ok((inner, join_response, stream_events)) => { return Ok(handle_success(inner, join_response, stream_events)) } @@ -263,6 +271,7 @@ impl SignalInner { url: &str, token: &str, options: SignalOptions, + publisher_offer: Option, ) -> SignalResult<( Arc, proto::JoinResponse, @@ -271,7 +280,15 @@ impl SignalInner { // Try v1 path first if single_peer_connection is enabled let use_v1_path = options.single_peer_connection; // For initial connection: reconnect=false, reconnect_reason=None, participant_sid="" - let lk_url = get_livekit_url(url, &options, use_v1_path, false, None, "")?; + let lk_url = get_livekit_url( + url, + &options, + use_v1_path, + false, + None, + "", + publisher_offer.as_ref(), + )?; // Try to connect to the SignalClient let (stream, mut events, single_pc_mode_active) = match SignalStream::connect(lk_url.clone(), token, options.connect_timeout).await { @@ -301,7 +318,8 @@ impl SignalInner { matches!(&err, SignalError::WsError(WsError::Http(e)) if e.status() == 404); if use_v1_path && is_not_found { - let lk_url_v0 = get_livekit_url(url, &options, false, false, None, "")?; + let lk_url_v0 = + get_livekit_url(url, &options, false, false, None, "", None)?; log::warn!("v1 path not found (404), falling back to v0 path"); match SignalStream::connect( lk_url_v0.clone(), @@ -397,9 +415,17 @@ impl SignalInner { // For reconnects: reconnect=true, participant_sid=sid // For v1 path: reconnect and sid are encoded in the join_request protobuf // For v0 path: reconnect and sid are added as separate query parameters - let lk_url = - get_livekit_url(&self.url, &self.options, self.single_pc_mode_active, true, None, sid) - .unwrap(); + // No publisher offer for reconnections + let lk_url = get_livekit_url( + &self.url, + &self.options, + self.single_pc_mode_active, + true, + None, + sid, + None, + ) + .unwrap(); let (new_stream, mut events) = SignalStream::connect(lk_url, &token, self.options.connect_timeout).await?; @@ -556,6 +582,7 @@ fn create_join_request_param( participant_sid: &str, os: String, os_version: String, + publisher_offer: Option<&proto::SessionDescription>, ) -> String { let connection_settings = proto::ConnectionSettings { auto_subscribe: options.auto_subscribe, @@ -576,6 +603,7 @@ fn create_join_request_param( client_info: Some(client_info), connection_settings: Some(connection_settings), reconnect, + publisher_offer: publisher_offer.cloned(), ..Default::default() }; @@ -592,13 +620,31 @@ fn create_join_request_param( // Serialize JoinRequest to bytes let join_request_bytes = join_request.encode_to_vec(); - // Create WrappedJoinRequest (JS doesn't explicitly set compression, so default is NONE) - let wrapped_join_request = - proto::WrappedJoinRequest { join_request: join_request_bytes, ..Default::default() }; + // Use gzip compression when publisher offer is included (SDP makes payload large) + let (compressed_bytes, compression) = if publisher_offer.is_some() { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + if encoder.write_all(&join_request_bytes).is_ok() { + if let Ok(compressed) = encoder.finish() { + (compressed, proto::wrapped_join_request::Compression::Gzip as i32) + } else { + (join_request_bytes, proto::wrapped_join_request::Compression::None as i32) + } + } else { + (join_request_bytes, proto::wrapped_join_request::Compression::None as i32) + } + } else { + (join_request_bytes, proto::wrapped_join_request::Compression::None as i32) + }; + + let wrapped_join_request = proto::WrappedJoinRequest { + join_request: compressed_bytes, + compression, + }; - // Serialize WrappedJoinRequest to bytes and base64 encode + // Serialize WrappedJoinRequest to bytes and base64url encode + // (URL-safe base64 avoids percent-encoding issues in query parameters) let wrapped_bytes = wrapped_join_request.encode_to_vec(); - BASE64_STANDARD.encode(&wrapped_bytes) + BASE64_URL_SAFE.encode(&wrapped_bytes) } /// Build the LiveKit WebSocket URL for connection @@ -617,6 +663,7 @@ fn get_livekit_url( reconnect: bool, reconnect_reason: Option, participant_sid: &str, + publisher_offer: Option<&proto::SessionDescription>, ) -> SignalResult { let mut lk_url = url::Url::parse(url).map_err(|err| SignalError::UrlParse(err.to_string()))?; @@ -651,6 +698,7 @@ fn get_livekit_url( participant_sid, os_info.os_type().to_string(), os_info.version().to_string(), + publisher_offer, ); lk_url.query_pairs_mut().append_pair("join_request", &join_request_param); } else { @@ -754,32 +802,32 @@ mod tests { fn livekit_url_test() { let io = SignalOptions::default(); - assert!(get_livekit_url("localhost:7880", &io, false, false, None, "").is_err()); + assert!(get_livekit_url("localhost:7880", &io, false, false, None, "", None).is_err()); assert_eq!( - get_livekit_url("https://localhost:7880", &io, false, false, None, "") + get_livekit_url("https://localhost:7880", &io, false, false, None, "", None) .unwrap() .scheme(), "wss" ); assert_eq!( - get_livekit_url("http://localhost:7880", &io, false, false, None, "").unwrap().scheme(), + get_livekit_url("http://localhost:7880", &io, false, false, None, "", None).unwrap().scheme(), "ws" ); assert_eq!( - get_livekit_url("wss://localhost:7880", &io, false, false, None, "").unwrap().scheme(), + get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None).unwrap().scheme(), "wss" ); assert_eq!( - get_livekit_url("ws://localhost:7880", &io, false, false, None, "").unwrap().scheme(), + get_livekit_url("ws://localhost:7880", &io, false, false, None, "", None).unwrap().scheme(), "ws" ); - assert!(get_livekit_url("ftp://localhost:7880", &io, false, false, None, "").is_err()); + assert!(get_livekit_url("ftp://localhost:7880", &io, false, false, None, "", None).is_err()); } #[test] fn validate_url_test() { let io = SignalOptions::default(); - let lk_url = get_livekit_url("wss://localhost:7880", &io, false, false, None, "").unwrap(); + let lk_url = get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None).unwrap(); let validate_url = get_validate_url(lk_url); // Should be /rtc/validate, not /rtc/rtc/validate diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 0035237b6..89ae9c205 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -1171,7 +1171,8 @@ impl RoomSession { let stream_id = stream.id(); let lk_stream_id = unpack_stream_id(&stream_id); if lk_stream_id.is_none() { - log::error!("received track with an invalid track_id: {:?}", &stream_id); + // server could require extra media sections to accelerate subscription. + log::debug!("received track with an invalid track_id: {:?}", &stream_id); return; } diff --git a/livekit/src/rtc_engine/peer_transport.rs b/livekit/src/rtc_engine/peer_transport.rs index da1ac2821..520bcbaa1 100644 --- a/livekit/src/rtc_engine/peer_transport.rs +++ b/livekit/src/rtc_engine/peer_transport.rs @@ -33,6 +33,7 @@ struct TransportInner { single_pc_mode: bool, // Publish-side target bitrate (bps) for offer munging max_send_bitrate_bps: Option, + pending_initial_offer: Option, } pub struct PeerTransport { @@ -64,6 +65,7 @@ impl PeerTransport { restarting_ice: false, single_pc_mode, max_send_bitrate_bps: None, + pending_initial_offer: None, })), } } @@ -108,6 +110,11 @@ impl PeerTransport { ) -> EngineResult<()> { let mut inner = self.inner.lock().await; + if let Some(pending_offer) = inner.pending_initial_offer.take() { + log::debug!("applying pending initial offer as local description before answer"); + self.peer_connection.set_local_description(pending_offer).await?; + } + self.peer_connection.set_remote_description(remote_description).await?; for ic in inner.pending_candidates.drain(..) { @@ -136,6 +143,34 @@ impl PeerTransport { Ok(answer) } + /// Create an initial offer without setting it as local description. + /// The offer is stored as pending and will be applied when the server's answer arrives. + pub async fn create_initial_offer(&self) -> EngineResult> { + let mut inner = self.inner.lock().await; + if !inner.single_pc_mode { + return Ok(None); + } + + let offer = self.peer_connection.create_offer(OfferOptions::default()).await?; + let sdp = offer.to_string(); + + let recvonly_munged = Self::munge_inactive_to_recvonly_for_media(&sdp); + if recvonly_munged != sdp { + if let Ok(parsed) = SessionDescription::parse(&recvonly_munged, offer.sdp_type()) { + inner.pending_initial_offer = Some(parsed.clone()); + return Ok(Some(parsed)); + } + } + + inner.pending_initial_offer = Some(offer.clone()); + Ok(Some(offer)) + } + + pub async fn clear_pending_initial_offer(&self) { + let mut inner = self.inner.lock().await; + inner.pending_initial_offer = None; + } + pub async fn set_max_send_bitrate_bps(&self, bps: Option) { let mut inner = self.inner.lock().await; inner.max_send_bitrate_bps = bps; @@ -333,6 +368,11 @@ impl PeerTransport { inner.restarting_ice = true; } + if inner.pending_initial_offer.is_some() { + inner.renegotiate = true; + return Ok(()); + } + if self.peer_connection.signaling_state() == SignalingState::HaveLocalOffer { let remote_sdp = self.peer_connection.current_remote_description(); if options.ice_restart && remote_sdp.is_some() { diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 3b181fae5..127b44089 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -443,8 +443,50 @@ impl RtcSession { ) -> EngineResult<(Self, proto::JoinResponse, SessionEvents)> { let (emitter, session_events) = mpsc::unbounded_channel(); - let (signal_client, mut join_response, signal_events) = - SignalClient::connect(url, token, options.signal_options.clone()).await?; + let lk_runtime = LkRuntime::instance(); + let use_single_pc = options.signal_options.single_peer_connection; + + let mut publisher_offer = None; + let early_publisher_pc = if use_single_pc { + let publisher_pc = PeerTransport::new( + lk_runtime + .pc_factory() + .create_peer_connection(options.rtc_config.clone())?, + proto::SignalTarget::Publisher, + true, + ); + + let dcs = Self::create_data_channels(&publisher_pc, &emitter)?; + Self::add_recv_media_sections(&publisher_pc.peer_connection(), 3, 3); + + match publisher_pc.create_initial_offer().await { + Ok(Some(offer)) => { + log::debug!("created initial publisher offer for join request"); + publisher_offer = Some(proto::SessionDescription { + r#type: "offer".to_string(), + sdp: offer.to_string(), + id: 0, + mid_to_track_id: Default::default(), + }); + } + Ok(None) => {} + Err(err) => { + log::warn!("failed to create initial publisher offer: {:?}", err); + } + } + + Some((publisher_pc, dcs)) + } else { + None + }; + + let (signal_client, mut join_response, signal_events) = SignalClient::connect( + url, + token, + options.signal_options.clone(), + publisher_offer.clone(), + ) + .await?; let signal_client = Arc::new(signal_client); log::debug!("received JoinResponse: {:?}", join_response); let subscriber_primary = join_response.subscriber_primary; @@ -464,12 +506,28 @@ impl RtcSession { let (dc_emitter, dc_events) = mpsc::unbounded_channel(); - let lk_runtime = LkRuntime::instance(); - let mut publisher_pc = PeerTransport::new( - lk_runtime.pc_factory().create_peer_connection(rtc_config.clone())?, - proto::SignalTarget::Publisher, - single_pc_mode, - ); + let sent_publisher_offer; + let (mut publisher_pc, mut reliable_dc, mut lossy_dc, data_track_dc) = + if let Some((pub_pc, dcs)) = early_publisher_pc { + if single_pc_mode { + pub_pc.peer_connection().set_configuration(rtc_config.clone())?; + sent_publisher_offer = publisher_offer.is_some(); + } else { + pub_pc.clear_pending_initial_offer().await; + pub_pc.peer_connection().set_configuration(rtc_config.clone())?; + sent_publisher_offer = false; + } + (pub_pc, dcs.0, dcs.1, dcs.2) + } else { + sent_publisher_offer = false; + let publisher_pc = PeerTransport::new( + lk_runtime.pc_factory().create_peer_connection(rtc_config.clone())?, + proto::SignalTarget::Publisher, + single_pc_mode, + ); + let dcs = Self::create_data_channels(&publisher_pc, &emitter)?; + (publisher_pc, dcs.0, dcs.1, dcs.2) + }; // In single PC mode, subscriber_pc is None let mut subscriber_pc = if single_pc_mode { @@ -482,24 +540,6 @@ impl RtcSession { )) }; - let mut reliable_dc = publisher_pc.peer_connection().create_data_channel( - RELIABLE_DC_LABEL, - // Use ordered: true for reliable delivery with ordering guarantees. - DataChannelInit { ordered: true, ..Default::default() }, - )?; - - let lossy_options = - DataChannelInit { ordered: false, max_retransmits: Some(0), ..Default::default() }; - - let mut lossy_dc = publisher_pc - .peer_connection() - .create_data_channel(LOSSY_DC_LABEL, lossy_options.clone())?; - - let data_track_dc = publisher_pc - .peer_connection() - .create_data_channel(DATA_TRACK_DC_LABEL, lossy_options)?; - handle_remote_dt_packets(&data_track_dc, emitter.downgrade()); - // Forward events received inside the signaling thread to our rtc channel rtc_events::forward_pc_events(&mut publisher_pc, rtc_emitter.clone()); if let Some(ref mut sub_pc) = subscriber_pc { @@ -570,15 +610,69 @@ impl RtcSession { dt_sender_task, })); - // In single PC mode (or with fast_publish), trigger initial negotiation - // This matches JS SDK behavior: if (!this.subscriberPrimary || joinResponse.fastPublish) { this.negotiate(); } - if single_pc_mode || join_response.fast_publish || !subscriber_primary { + // If we already sent the publisher offer with the JoinRequest, skip initial + // negotiation - the server will respond with an answer via the signal channel. + // Otherwise, trigger negotiation as before. + if sent_publisher_offer { + inner.has_published.store(true, Ordering::Release); + } else if single_pc_mode || join_response.fast_publish || !subscriber_primary { inner.publisher_negotiation_needed(); } Ok((Self { inner, handle }, join_response, session_events)) } + fn create_data_channels( + publisher_pc: &PeerTransport, + emitter: &mpsc::UnboundedSender, + ) -> EngineResult<(DataChannel, DataChannel, DataChannel)> { + let reliable_dc = publisher_pc.peer_connection().create_data_channel( + RELIABLE_DC_LABEL, + DataChannelInit { ordered: true, ..Default::default() }, + )?; + + let lossy_options = + DataChannelInit { ordered: false, max_retransmits: Some(0), ..Default::default() }; + + let lossy_dc = publisher_pc + .peer_connection() + .create_data_channel(LOSSY_DC_LABEL, lossy_options.clone())?; + + let data_track_dc = publisher_pc + .peer_connection() + .create_data_channel(DATA_TRACK_DC_LABEL, lossy_options)?; + handle_remote_dt_packets(&data_track_dc, emitter.downgrade()); + + Ok((reliable_dc, lossy_dc, data_track_dc)) + } + + fn add_recv_media_sections( + pc: &PeerConnection, + audio_count: u32, + video_count: u32, + ) { + let recvonly_init = RtpTransceiverInit { + direction: RtpTransceiverDirection::RecvOnly, + stream_ids: Vec::new(), + send_encodings: Vec::new(), + }; + + for _ in 0..audio_count { + if let Err(err) = + pc.add_transceiver_for_media(MediaType::Audio, recvonly_init.clone()) + { + log::warn!("failed to add recvonly audio transceiver: {:?}", err); + } + } + for _ in 0..video_count { + if let Err(err) = + pc.add_transceiver_for_media(MediaType::Video, recvonly_init.clone()) + { + log::warn!("failed to add recvonly video transceiver: {:?}", err); + } + } + } + pub fn has_published(&self) -> bool { self.inner.has_published.load(Ordering::Acquire) } @@ -1227,25 +1321,11 @@ impl SessionInner { req.num_videos ); - let recvonly_init = RtpTransceiverInit { - direction: RtpTransceiverDirection::RecvOnly, - stream_ids: Vec::new(), - send_encodings: Vec::new(), - }; - - // Add audio transceivers - for _ in 0..req.num_audios { - self.publisher_pc - .peer_connection() - .add_transceiver_for_media(MediaType::Audio, recvonly_init.clone())?; - } - - // Add video transceivers - for _ in 0..req.num_videos { - self.publisher_pc - .peer_connection() - .add_transceiver_for_media(MediaType::Video, recvonly_init.clone())?; - } + RtcSession::add_recv_media_sections( + &self.publisher_pc.peer_connection(), + req.num_audios, + req.num_videos, + ); // Trigger renegotiation self.publisher_negotiation_needed(); From 5f597375ef58b3d2ba00dc632bb318547ca43791 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Tue, 7 Apr 2026 17:26:22 +0800 Subject: [PATCH 2/7] Send publisher offer with join request to accelerate connection This change sends a publisher offer along with a join request to improve connection speed. --- ...lisher_offer_with_join_request_to_accelerate_connect.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/send_publisher_offer_with_join_request_to_accelerate_connect.md diff --git a/.changeset/send_publisher_offer_with_join_request_to_accelerate_connect.md b/.changeset/send_publisher_offer_with_join_request_to_accelerate_connect.md new file mode 100644 index 000000000..d44910b93 --- /dev/null +++ b/.changeset/send_publisher_offer_with_join_request_to_accelerate_connect.md @@ -0,0 +1,7 @@ +--- +livekit: patch +livekit-api: patch +livekit-ffi: patch +--- + +Send publisher offer with join request to accelerate connection - #996 (@cnderrauber) From fed8b898d841848c6cc7020379eba26ee424172f Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Tue, 7 Apr 2026 17:54:52 +0800 Subject: [PATCH 3/7] clean code --- livekit/src/rtc_engine/peer_transport.rs | 1 - livekit/src/rtc_engine/rtc_session.rs | 20 ++++++-------------- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/livekit/src/rtc_engine/peer_transport.rs b/livekit/src/rtc_engine/peer_transport.rs index 520bcbaa1..62c15f47f 100644 --- a/livekit/src/rtc_engine/peer_transport.rs +++ b/livekit/src/rtc_engine/peer_transport.rs @@ -111,7 +111,6 @@ impl PeerTransport { let mut inner = self.inner.lock().await; if let Some(pending_offer) = inner.pending_initial_offer.take() { - log::debug!("applying pending initial offer as local description before answer"); self.peer_connection.set_local_description(pending_offer).await?; } diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 127b44089..0ca283aef 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -457,11 +457,10 @@ impl RtcSession { ); let dcs = Self::create_data_channels(&publisher_pc, &emitter)?; - Self::add_recv_media_sections(&publisher_pc.peer_connection(), 3, 3); + Self::add_recv_media_sections(&publisher_pc.peer_connection(), 3, 3)?; match publisher_pc.create_initial_offer().await { Ok(Some(offer)) => { - log::debug!("created initial publisher offer for join request"); publisher_offer = Some(proto::SessionDescription { r#type: "offer".to_string(), sdp: offer.to_string(), @@ -650,7 +649,7 @@ impl RtcSession { pc: &PeerConnection, audio_count: u32, video_count: u32, - ) { + ) -> EngineResult<()> { let recvonly_init = RtpTransceiverInit { direction: RtpTransceiverDirection::RecvOnly, stream_ids: Vec::new(), @@ -658,19 +657,12 @@ impl RtcSession { }; for _ in 0..audio_count { - if let Err(err) = - pc.add_transceiver_for_media(MediaType::Audio, recvonly_init.clone()) - { - log::warn!("failed to add recvonly audio transceiver: {:?}", err); - } + pc.add_transceiver_for_media(MediaType::Audio, recvonly_init.clone())?; } for _ in 0..video_count { - if let Err(err) = - pc.add_transceiver_for_media(MediaType::Video, recvonly_init.clone()) - { - log::warn!("failed to add recvonly video transceiver: {:?}", err); - } + pc.add_transceiver_for_media(MediaType::Video, recvonly_init.clone())?; } + Ok(()) } pub fn has_published(&self) -> bool { @@ -1325,7 +1317,7 @@ impl SessionInner { &self.publisher_pc.peer_connection(), req.num_audios, req.num_videos, - ); + )?; // Trigger renegotiation self.publisher_negotiation_needed(); From e6e7ee29b8696f06a79996142956ec8cb5423492 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Wed, 8 Apr 2026 15:16:22 +0800 Subject: [PATCH 4/7] Fix formatting --- livekit-api/src/signal_client/mod.rs | 37 ++++++++++++--------------- livekit/src/rtc_engine/rtc_session.rs | 4 +-- 2 files changed, 17 insertions(+), 24 deletions(-) diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index 7943e1978..9d0bd4034 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -23,10 +23,7 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use base64::{ - engine::general_purpose::URL_SAFE as BASE64_URL_SAFE, - Engine, -}; +use base64::{engine::general_purpose::URL_SAFE as BASE64_URL_SAFE, Engine}; use flate2::{write::GzEncoder, Compression}; use http::StatusCode; use livekit_protocol as proto; @@ -280,15 +277,8 @@ impl SignalInner { // Try v1 path first if single_peer_connection is enabled let use_v1_path = options.single_peer_connection; // For initial connection: reconnect=false, reconnect_reason=None, participant_sid="" - let lk_url = get_livekit_url( - url, - &options, - use_v1_path, - false, - None, - "", - publisher_offer.as_ref(), - )?; + let lk_url = + get_livekit_url(url, &options, use_v1_path, false, None, "", publisher_offer.as_ref())?; // Try to connect to the SignalClient let (stream, mut events, single_pc_mode_active) = match SignalStream::connect(lk_url.clone(), token, options.connect_timeout).await { @@ -636,10 +626,8 @@ fn create_join_request_param( (join_request_bytes, proto::wrapped_join_request::Compression::None as i32) }; - let wrapped_join_request = proto::WrappedJoinRequest { - join_request: compressed_bytes, - compression, - }; + let wrapped_join_request = + proto::WrappedJoinRequest { join_request: compressed_bytes, compression }; // Serialize WrappedJoinRequest to bytes and base64url encode // (URL-safe base64 avoids percent-encoding issues in query parameters) @@ -810,15 +798,21 @@ mod tests { "wss" ); assert_eq!( - get_livekit_url("http://localhost:7880", &io, false, false, None, "", None).unwrap().scheme(), + get_livekit_url("http://localhost:7880", &io, false, false, None, "", None) + .unwrap() + .scheme(), "ws" ); assert_eq!( - get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None).unwrap().scheme(), + get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None) + .unwrap() + .scheme(), "wss" ); assert_eq!( - get_livekit_url("ws://localhost:7880", &io, false, false, None, "", None).unwrap().scheme(), + get_livekit_url("ws://localhost:7880", &io, false, false, None, "", None) + .unwrap() + .scheme(), "ws" ); assert!(get_livekit_url("ftp://localhost:7880", &io, false, false, None, "", None).is_err()); @@ -827,7 +821,8 @@ mod tests { #[test] fn validate_url_test() { let io = SignalOptions::default(); - let lk_url = get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None).unwrap(); + let lk_url = + get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None).unwrap(); let validate_url = get_validate_url(lk_url); // Should be /rtc/validate, not /rtc/rtc/validate diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 0ca283aef..8f01b89d6 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -449,9 +449,7 @@ impl RtcSession { let mut publisher_offer = None; let early_publisher_pc = if use_single_pc { let publisher_pc = PeerTransport::new( - lk_runtime - .pc_factory() - .create_peer_connection(options.rtc_config.clone())?, + lk_runtime.pc_factory().create_peer_connection(options.rtc_config.clone())?, proto::SignalTarget::Publisher, true, ); From 5982481011ed9d4dab4a89f3db2fb7d7015d84b1 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Wed, 15 Apr 2026 15:35:17 +0800 Subject: [PATCH 5/7] Notify wait_pc_connection when pc state changed --- livekit/src/rtc_engine/rtc_session.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 8f01b89d6..4d3faab03 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -391,6 +391,7 @@ struct SessionInner { e2ee_manager: Option, subscriber_primary: bool, + pc_state_notify: Notify, } /// Information about the local participant needed for outgoing @@ -588,6 +589,7 @@ impl RtcSession { pending_requests: Default::default(), e2ee_manager, subscriber_primary, + pc_state_notify: Notify::new(), }); // Start session tasks @@ -1343,6 +1345,8 @@ impl SessionInner { RtcEvent::ConnectionChange { state, target } => { log::debug!("connection change, {:?} {:?}", state, target); + self.pc_state_notify.notify_waiters(); + if state == PeerConnectionState::Failed { log::error!("{:?} pc state failed", target); self.on_session_disconnected( @@ -1725,6 +1729,7 @@ impl SessionInner { async fn close(&self, reason: DisconnectReason) { self.closed.store(true, Ordering::Release); + self.pc_state_notify.notify_waiters(); self.signal_client .send(proto::signal_request::Message::Leave(proto::LeaveRequest { @@ -1939,6 +1944,8 @@ impl SessionInner { async fn wait_pc_connection(&self) -> EngineResult<()> { let wait_connected = async move { loop { + let notified = self.pc_state_notify.notified(); + if self.closed.load(Ordering::Acquire) { return Err(EngineError::Connection("closed".into())); } @@ -1956,7 +1963,7 @@ impl SessionInner { break; } - livekit_runtime::sleep(Duration::from_millis(50)).await; + let _ = tokio::time::timeout(Duration::from_millis(50), notified).await; } Ok(()) From 788a3c4149931c132804f7da2bd55bc1356b29e2 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Mon, 27 Apr 2026 10:39:44 +0800 Subject: [PATCH 6/7] clear pending offer when close pc --- livekit/src/rtc_engine/peer_transport.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/livekit/src/rtc_engine/peer_transport.rs b/livekit/src/rtc_engine/peer_transport.rs index 62c15f47f..78804d051 100644 --- a/livekit/src/rtc_engine/peer_transport.rs +++ b/livekit/src/rtc_engine/peer_transport.rs @@ -87,6 +87,9 @@ impl PeerTransport { } pub fn close(&self) { + if let Ok(mut inner) = self.inner.try_lock() { + inner.pending_initial_offer = None; + } self.peer_connection.close(); } From d17fa0202bd7f194cdfa3b7eaa04d5cd5d84d418 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Tue, 28 Apr 2026 11:54:29 +0800 Subject: [PATCH 7/7] Support publishing track(s) when join room Save 2 rtt (add track signaling & sdp negotiation) if client wants to publish track immediately after join room. --- livekit-api/src/signal_client/mod.rs | 78 ++++++++++++------ livekit/src/prelude.rs | 1 + livekit/src/room/mod.rs | 114 +++++++++++++++++++++++++- livekit/src/rtc_engine/mod.rs | 47 +++++++++-- livekit/src/rtc_engine/rtc_session.rs | 83 ++++++++++++++++--- 5 files changed, 278 insertions(+), 45 deletions(-) diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index 59f0c4494..e4d735841 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -160,6 +160,7 @@ impl SignalClient { token: &str, options: SignalOptions, publisher_offer: Option, + add_track_requests: Vec, ) -> SignalResult<(Self, proto::JoinResponse, SignalEvents)> { let handle_success = |inner: Arc, join_response, stream_events| { let (emitter, events) = mpsc::unbounded_channel(); @@ -169,7 +170,15 @@ impl SignalClient { (Self { inner, emitter, handle: Mutex::new(Some(signal_task)) }, join_response, events) }; - match SignalInner::connect(url, token, options.clone(), publisher_offer.clone()).await { + match SignalInner::connect( + url, + token, + options.clone(), + publisher_offer.clone(), + add_track_requests.clone(), + ) + .await + { Ok((inner, join_response, stream_events)) => { Ok(handle_success(inner, join_response, stream_events)) } @@ -183,8 +192,14 @@ impl SignalClient { for url in urls.iter() { log::info!("fallback connection to: {}", url); - match SignalInner::connect(url, token, options.clone(), publisher_offer.clone()) - .await + match SignalInner::connect( + url, + token, + options.clone(), + publisher_offer.clone(), + add_track_requests.clone(), + ) + .await { Ok((inner, join_response, stream_events)) => { return Ok(handle_success(inner, join_response, stream_events)) @@ -269,6 +284,7 @@ impl SignalInner { token: &str, options: SignalOptions, publisher_offer: Option, + add_track_requests: Vec, ) -> SignalResult<( Arc, proto::JoinResponse, @@ -277,8 +293,16 @@ impl SignalInner { // Try v1 path first if single_peer_connection is enabled let use_v1_path = options.single_peer_connection; // For initial connection: reconnect=false, reconnect_reason=None, participant_sid="" - let lk_url = - get_livekit_url(url, &options, use_v1_path, false, None, "", publisher_offer.as_ref())?; + let lk_url = get_livekit_url( + url, + &options, + use_v1_path, + false, + None, + "", + publisher_offer.as_ref(), + &add_track_requests, + )?; // Try to connect to the SignalClient let (stream, mut events, single_pc_mode_active) = match SignalStream::connect(lk_url.clone(), token, options.connect_timeout).await { @@ -309,7 +333,7 @@ impl SignalInner { if use_v1_path && is_not_found { let lk_url_v0 = - get_livekit_url(url, &options, false, false, None, "", None)?; + get_livekit_url(url, &options, false, false, None, "", None, &[])?; log::warn!("v1 path not found (404), falling back to v0 path"); match SignalStream::connect( lk_url_v0.clone(), @@ -414,6 +438,7 @@ impl SignalInner { None, sid, None, + &[], ) .unwrap(); @@ -574,6 +599,7 @@ fn create_join_request_param( os_version: String, device_model: String, publisher_offer: Option<&proto::SessionDescription>, + add_track_requests: &[proto::AddTrackRequest], ) -> String { let connection_settings = proto::ConnectionSettings { auto_subscribe: options.auto_subscribe, @@ -596,6 +622,7 @@ fn create_join_request_param( connection_settings: Some(connection_settings), reconnect, publisher_offer: publisher_offer.cloned(), + add_track_requests: add_track_requests.to_vec(), ..Default::default() }; @@ -612,21 +639,22 @@ fn create_join_request_param( // Serialize JoinRequest to bytes let join_request_bytes = join_request.encode_to_vec(); - // Use gzip compression when publisher offer is included (SDP makes payload large) - let (compressed_bytes, compression) = if publisher_offer.is_some() { - let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); - if encoder.write_all(&join_request_bytes).is_ok() { - if let Ok(compressed) = encoder.finish() { - (compressed, proto::wrapped_join_request::Compression::Gzip as i32) + // Use gzip compression when publisher offer or add_track_requests are included + let (compressed_bytes, compression) = + if publisher_offer.is_some() || !add_track_requests.is_empty() { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + if encoder.write_all(&join_request_bytes).is_ok() { + if let Ok(compressed) = encoder.finish() { + (compressed, proto::wrapped_join_request::Compression::Gzip as i32) + } else { + (join_request_bytes, proto::wrapped_join_request::Compression::None as i32) + } } else { (join_request_bytes, proto::wrapped_join_request::Compression::None as i32) } } else { (join_request_bytes, proto::wrapped_join_request::Compression::None as i32) - } - } else { - (join_request_bytes, proto::wrapped_join_request::Compression::None as i32) - }; + }; let wrapped_join_request = proto::WrappedJoinRequest { join_request: compressed_bytes, compression }; @@ -654,6 +682,7 @@ fn get_livekit_url( reconnect_reason: Option, participant_sid: &str, publisher_offer: Option<&proto::SessionDescription>, + add_track_requests: &[proto::AddTrackRequest], ) -> SignalResult { let mut lk_url = url::Url::parse(url).map_err(|err| SignalError::UrlParse(err.to_string()))?; @@ -692,6 +721,7 @@ fn get_livekit_url( os_info.version().to_string(), device_model.to_string(), publisher_offer, + add_track_requests, ); lk_url.query_pairs_mut().append_pair("join_request", &join_request_param); } else { @@ -796,39 +826,41 @@ mod tests { fn livekit_url_test() { let io = SignalOptions::default(); - assert!(get_livekit_url("localhost:7880", &io, false, false, None, "", None).is_err()); + assert!(get_livekit_url("localhost:7880", &io, false, false, None, "", None, &[]).is_err()); assert_eq!( - get_livekit_url("https://localhost:7880", &io, false, false, None, "", None) + get_livekit_url("https://localhost:7880", &io, false, false, None, "", None, &[]) .unwrap() .scheme(), "wss" ); assert_eq!( - get_livekit_url("http://localhost:7880", &io, false, false, None, "", None) + get_livekit_url("http://localhost:7880", &io, false, false, None, "", None, &[]) .unwrap() .scheme(), "ws" ); assert_eq!( - get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None) + get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None, &[]) .unwrap() .scheme(), "wss" ); assert_eq!( - get_livekit_url("ws://localhost:7880", &io, false, false, None, "", None) + get_livekit_url("ws://localhost:7880", &io, false, false, None, "", None, &[]) .unwrap() .scheme(), "ws" ); - assert!(get_livekit_url("ftp://localhost:7880", &io, false, false, None, "", None).is_err()); + assert!(get_livekit_url("ftp://localhost:7880", &io, false, false, None, "", None, &[]) + .is_err()); } #[test] fn validate_url_test() { let io = SignalOptions::default(); let lk_url = - get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None).unwrap(); + get_livekit_url("wss://localhost:7880", &io, false, false, None, "", None, &[]) + .unwrap(); let validate_url = get_validate_url(lk_url); // Should be /rtc/validate, not /rtc/rtc/validate diff --git a/livekit/src/prelude.rs b/livekit/src/prelude.rs index e1e71f1c7..02703edc6 100644 --- a/livekit/src/prelude.rs +++ b/livekit/src/prelude.rs @@ -21,6 +21,7 @@ pub use crate::{ PushFrameError, PushFrameErrorReason, RemoteDataTrack, }, id::*, + options::TrackPublishOptions, participant::{ ConnectionQuality, DisconnectReason, LocalParticipant, Participant, PerformRpcData, RemoteParticipant, RpcError, RpcErrorCode, RpcInvocationData, diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 89ae9c205..2832349e6 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::e2ee::EncryptionType; use bmrng::unbounded::UnboundedRequestReceiver; use futures_util::{Stream, StreamExt}; +use libwebrtc::prelude::RtpEncodingParameters; use libwebrtc::{ native::frame_cryptor::EncryptionState, prelude::{ @@ -31,6 +33,7 @@ use livekit_datatrack::{ use livekit_protocol::observer::Dispatcher; use livekit_protocol::{self as proto, encryption}; use livekit_runtime::JoinHandle; +use options::TrackPublishOptions; use parking_lot::RwLock; pub use proto::DisconnectReason; use proto::{promise::Promise, SignalTarget}; @@ -41,6 +44,7 @@ use tokio::sync::{ mpsc::{self, UnboundedReceiver}, oneshot, Mutex as AsyncMutex, }; +use track::LocalTrack; pub use utils::take_cell::TakeCell; pub use self::{ @@ -55,8 +59,8 @@ use crate::{ prelude::*, registered_audio_filter_plugins, rtc_engine::{ - EngineError, EngineEvent, EngineEvents, EngineOptions, EngineResult, RtcEngine, - SessionStats, INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD, + EngineError, EngineEvent, EngineEvents, EngineOptions, EngineResult, PrePublishTrack, + RtcEngine, SessionStats, INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD, }, }; @@ -394,6 +398,8 @@ pub struct RoomOptions { pub single_peer_connection: bool, /// Timeout for each individual signal connection attempt pub connect_timeout: Duration, + /// Tracks to publish immediately upon joining. Only effective when `single_peer_connection` is true. + pub publish_tracks: Vec<(LocalTrack, TrackPublishOptions)>, } impl Default for RoomOptions { @@ -416,6 +422,7 @@ impl Default for RoomOptions { sdk_options: RoomSdkOptions::default(), single_peer_connection: false, connect_timeout: SIGNAL_CONNECT_TIMEOUT, + publish_tracks: Vec::new(), } } } @@ -519,7 +526,22 @@ impl Room { signal_options.adaptive_stream = options.adaptive_stream; signal_options.single_peer_connection = options.single_peer_connection; signal_options.connect_timeout = options.connect_timeout; - let (rtc_engine, join_response, engine_events) = RtcEngine::connect( + + if !options.publish_tracks.is_empty() && !options.single_peer_connection { + return Err(RoomError::Internal( + "publish_tracks requires single_peer_connection to be enabled".into(), + )); + } + let encryption_type = e2ee_manager.encryption_type(); + let pre_publish_tracks: Vec = options + .publish_tracks + .iter() + .map(|(track, opts)| { + Self::build_pre_publish_track(track.clone(), opts.clone(), encryption_type) + }) + .collect(); + + let (rtc_engine, join_response, engine_events, pre_publish_receivers) = RtcEngine::connect( url, token, EngineOptions { @@ -527,6 +549,7 @@ impl Room { signal_options, join_retries: options.join_retries, single_peer_connection: options.single_peer_connection, + publish_tracks: pre_publish_tracks.clone(), }, Some(e2ee_manager.clone()), ) @@ -788,9 +811,94 @@ impl Room { }; inner.handle.lock().await.replace(handle); + let mut receiver_map: HashMap> = + pre_publish_receivers.into_iter().collect(); + for pt in pre_publish_tracks { + let Some(rx) = receiver_map.remove(&pt.request.cid) else { + log::warn!("no receiver for pre-published track {}", pt.track.name()); + continue; + }; + match rtc_engine.wait_track_published_by_cid(pt.request.cid.clone(), rx).await { + Ok(track_info) => { + let publication = + LocalTrackPublication::new(track_info.clone(), pt.track.clone()); + pt.track.update_info(track_info); + publication.set_track(Some(pt.track.clone().into())); + publication.update_publish_options(pt.options); + inner.local_participant.add_publication(TrackPublication::Local(publication)); + pt.track.enable(); + log::debug!("pre-published track completed: {}", pt.track.name()); + } + Err(err) => { + log::warn!( + "failed to complete pre-published track {}: {:?}", + pt.track.name(), + err + ); + } + } + } + Ok((Self { inner }, events)) } + fn build_pre_publish_track( + track: LocalTrack, + opts: TrackPublishOptions, + encryption_type: EncryptionType, + ) -> PrePublishTrack { + let disable_red = encryption_type != EncryptionType::None || !opts.red; + + let mut req = proto::AddTrackRequest { + cid: track.rtc_track().id(), + name: track.name(), + r#type: proto::TrackType::from(track.kind()) as i32, + muted: track.is_muted(), + source: proto::TrackSource::from(opts.source) as i32, + disable_dtx: !opts.dtx, + disable_red, + encryption: proto::encryption::Type::from(encryption_type) as i32, + stream: opts.stream.clone(), + ..Default::default() + }; + + if opts.preconnect_buffer { + req.audio_features.push(proto::AudioTrackFeature::TfPreconnectBuffer as i32); + } + + let encodings = match &track { + LocalTrack::Video(video_track) => { + let resolution = video_track.rtc_source().video_resolution(); + req.width = resolution.width; + req.height = resolution.height; + + let encodings = options::compute_video_encodings(req.width, req.height, &opts); + req.layers = + options::video_layers_from_encodings(req.width, req.height, &encodings); + + if opts.simulcast && encodings.len() > 1 { + req.simulcast_codecs = vec![proto::SimulcastCodec { + codec: opts.video_codec.as_str().to_string(), + cid: track.rtc_track().id(), + layers: req.layers.clone(), + ..Default::default() + }]; + } + encodings + } + LocalTrack::Audio(_) => { + let audio_encoding = + opts.audio_encoding.as_ref().unwrap_or(&options::audio::MUSIC.encoding); + vec![RtpEncodingParameters { + max_bitrate: Some(audio_encoding.max_bitrate), + ..Default::default() + }] + } + }; + + PrePublishTrack { track, options: opts, encodings, request: req } + } + pub async fn close(&self) -> RoomResult<()> { self.inner.close(DisconnectReason::ClientInitiated).await } diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index 1ad21f7de..bb2837feb 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -76,6 +76,15 @@ pub enum EngineError { Internal(Cow<'static, str>), // Unexpected error, generally we can't recover } +/// A track to be published at join time, bundled with its options and encodings. +#[derive(Debug, Clone)] +pub struct PrePublishTrack { + pub track: LocalTrack, + pub options: TrackPublishOptions, + pub encodings: Vec, + pub request: proto::AddTrackRequest, +} + #[derive(Default, Debug, Clone)] pub struct EngineOptions { pub rtc_config: RtcConfiguration, @@ -83,6 +92,8 @@ pub struct EngineOptions { pub join_retries: u32, /// Enable single peer connection mode pub single_peer_connection: bool, + /// Tracks to publish at join time (pre-publish optimization) + pub publish_tracks: Vec, } #[derive(Debug)] @@ -245,10 +256,15 @@ impl RtcEngine { token: &str, options: EngineOptions, e2ee_manager: Option, - ) -> EngineResult<(Self, proto::JoinResponse, EngineEvents)> { - let (inner, join_response, engine_events) = + ) -> EngineResult<( + Self, + proto::JoinResponse, + EngineEvents, + Vec<(String, oneshot::Receiver)>, + )> { + let (inner, join_response, engine_events, pre_publish_receivers) = EngineInner::connect(url, token, options, e2ee_manager).await?; - Ok((Self { inner }, join_response, engine_events)) + Ok((Self { inner }, join_response, engine_events, pre_publish_receivers)) } pub async fn close(&self, reason: DisconnectReason) { @@ -308,6 +324,18 @@ impl RtcEngine { session.add_track(req).await } + pub async fn wait_track_published_by_cid( + &self, + cid: String, + rx: oneshot::Receiver, + ) -> EngineResult { + let (session, _r_lock) = { + let (handle, _r_lock) = self.inner.wait_reconnection().await?; + (handle.session.clone(), _r_lock) + }; + session.wait_track_published_by_cid(cid, rx).await + } + pub fn remove_track(&self, sender: RtpSender) -> EngineResult<()> { // We don't need to wait for the reconnection let session = self.inner.running_handle.read().session.clone(); @@ -378,7 +406,12 @@ impl EngineInner { token: &str, options: EngineOptions, e2ee_manager: Option, - ) -> EngineResult<(Arc, proto::JoinResponse, EngineEvents)> { + ) -> EngineResult<( + Arc, + proto::JoinResponse, + EngineEvents, + Vec<(String, oneshot::Receiver)>, + )> { let lk_runtime = LkRuntime::instance(); let max_retries = options.join_retries; @@ -388,7 +421,7 @@ impl EngineInner { let lk_runtime = lk_runtime.clone(); let e2ee_manager = e2ee_manager.clone(); async move { - let (session, join_response, session_events) = + let (session, join_response, session_events, pre_publish_receivers) = RtcSession::connect(url, token, options.clone(), e2ee_manager).await?; session.wait_pc_connection().await?; @@ -419,7 +452,7 @@ impl EngineInner { )); inner.running_handle.write().engine_task = Some((session_task, close_tx)); - Ok((inner, join_response, engine_rx)) + Ok((inner, join_response, engine_rx, pre_publish_receivers)) } } }; @@ -855,7 +888,7 @@ impl EngineInner { let _ = engine_task.await; } - let (new_session, join_response, session_events) = + let (new_session, join_response, session_events, _) = RtcSession::connect(url, token, options, e2ee_manager).await?; // On SignalRestarted, the room will try to unpublish the local tracks diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index 72abc36c0..b3ab74834 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -449,13 +449,19 @@ impl RtcSession { token: &str, options: EngineOptions, e2ee_manager: Option, - ) -> EngineResult<(Self, proto::JoinResponse, SessionEvents)> { + ) -> EngineResult<( + Self, + proto::JoinResponse, + SessionEvents, + Vec<(String, oneshot::Receiver)>, + )> { let (emitter, session_events) = mpsc::unbounded_channel(); let lk_runtime = LkRuntime::instance(); let use_single_pc = options.signal_options.single_peer_connection; let mut publisher_offer = None; + let mut add_track_requests = Vec::new(); let early_publisher_pc = if use_single_pc { let publisher_pc = PeerTransport::new( lk_runtime.pc_factory().create_peer_connection(options.rtc_config.clone())?, @@ -466,6 +472,18 @@ impl RtcSession { let dcs = Self::create_data_channels(&publisher_pc, &emitter)?; Self::add_recv_media_sections(&publisher_pc.peer_connection(), 3, 3)?; + // Add SendOnly transceivers for pre-publish tracks so the initial offer + // includes their media sections, and include AddTrackRequests in JoinRequest. + for pt in &options.publish_tracks { + let init = RtpTransceiverInit { + direction: RtpTransceiverDirection::SendOnly, + stream_ids: Default::default(), + send_encodings: pt.encodings.clone(), + }; + publisher_pc.peer_connection().add_transceiver(pt.track.rtc_track(), init)?; + add_track_requests.push(pt.request.clone()); + } + match publisher_pc.create_initial_offer().await { Ok(Some(offer)) => { publisher_offer = Some(proto::SessionDescription { @@ -491,6 +509,7 @@ impl RtcSession { token, options.signal_options.clone(), publisher_offer.clone(), + add_track_requests, ) .await?; let signal_client = Arc::new(signal_client); @@ -556,6 +575,12 @@ impl RtcSession { let (close_tx, close_rx) = watch::channel(false); + let pre_publish_cids: Vec = options + .publish_tracks + .iter() + .map(|pt| pt.request.cid.clone()) + .collect(); + let dt_sender_options = DataChannelSenderOptions { low_buffer_threshold: DATA_TRACK_BUFFERED_AMOUNT_LOW_THRESHOLD, dc: data_track_dc.clone(), @@ -600,6 +625,14 @@ impl RtcSession { pc_state_notify: Notify::new(), }); + let pre_publish_receivers: Vec<(String, oneshot::Receiver)> = + pre_publish_cids + .iter() + .filter_map(|cid| { + inner.register_pending_track(cid).ok().map(|rx| (cid.clone(), rx)) + }) + .collect(); + // Start session tasks let signal_task = livekit_runtime::spawn(inner.clone().signal_task(signal_events, close_rx.clone())); @@ -626,7 +659,7 @@ impl RtcSession { inner.publisher_negotiation_needed(); } - Ok((Self { inner, handle }, join_response, session_events)) + Ok((Self { inner, handle }, join_response, session_events, pre_publish_receivers)) } fn create_data_channels( @@ -699,6 +732,14 @@ impl RtcSession { self.inner.add_track(req).await } + pub async fn wait_track_published_by_cid( + &self, + cid: String, + rx: oneshot::Receiver, + ) -> EngineResult { + self.inner.wait_track_published_by_cid(cid, rx).await + } + pub async fn mute_track(&self, req: proto::MuteTrackRequest) -> EngineResult<()> { self.inner.mute_track(req).await } @@ -1620,20 +1661,38 @@ impl SessionInner { } async fn add_track(&self, req: proto::AddTrackRequest) -> EngineResult { - let (tx, rx) = oneshot::channel(); let cid = req.cid.clone(); - { - let mut pendings_tracks = self.pending_tracks.lock(); - if pendings_tracks.contains_key(&req.cid) { - Err(EngineError::Internal("track already published".into()))?; - } + let rx = self.register_pending_track(&cid)?; + self.signal_client.send(proto::signal_request::Message::AddTrack(req)).await; + self.wait_track_published(cid, rx).await + } - pendings_tracks.insert(cid.clone(), tx); - } + async fn wait_track_published_by_cid( + &self, + cid: String, + rx: oneshot::Receiver, + ) -> EngineResult { + self.wait_track_published(cid, rx).await + } - self.signal_client.send(proto::signal_request::Message::AddTrack(req)).await; + fn register_pending_track( + &self, + cid: &str, + ) -> EngineResult> { + let (tx, rx) = oneshot::channel(); + let mut pending_tracks = self.pending_tracks.lock(); + if pending_tracks.contains_key(cid) { + Err(EngineError::Internal("track already published".into()))?; + } + pending_tracks.insert(cid.to_string(), tx); + Ok(rx) + } - // Wait the result from the server (TrackInfo) + async fn wait_track_published( + &self, + cid: String, + rx: oneshot::Receiver, + ) -> EngineResult { tokio::select! { Ok(info) = rx => Ok(info), _ = sleep(TRACK_PUBLISH_TIMEOUT) => {