diff --git a/Cargo.lock b/Cargo.lock index b0287a730..e21c9abd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -112,7 +112,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -123,7 +123,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -365,7 +365,7 @@ dependencies = [ "sha1", "sync_wrapper", "tokio", - "tokio-tungstenite 0.29.0", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -1357,7 +1357,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccc2776f0c61eca1ca32528f85548abd1a4be8fb53d1b21c013e4f18da1e7090" dependencies = [ "data-encoding", - "syn 2.0.117", + "syn 1.0.109", ] [[package]] @@ -1737,7 +1737,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3239,7 +3239,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -4290,7 +4290,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -5178,9 +5178,9 @@ dependencies = [ [[package]] name = "qmux" -version = "0.0.7" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a625edac9a3021654a955444ca602b7b66b6764c7372196df08af53779ffbe7" +checksum = "bb8250ecd8e31dbf7935014a8784eea6e4b971a9d627600b920c09871f2bd768" dependencies = [ "bytes", "futures", @@ -5188,7 +5188,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tokio-rustls", - "tokio-tungstenite 0.28.0", + "tokio-tungstenite", "tracing", "web-transport-proto", "web-transport-trait", @@ -5734,7 +5734,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -5793,7 +5793,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -5814,7 +5814,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -6029,7 +6029,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b55fb86dfd3a2f5f76ea78310a88f96c4ea21a3031f8d212443d56123fd0521" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -6437,7 +6437,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -6667,7 +6667,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix 1.1.4", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -6676,7 +6676,7 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8c27177b12a6399ffc08b98f76f7c9a1f4fe9fc967c784c5a071fa8d93cf7e1" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -6925,9 +6925,9 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.28.0" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" +checksum = "8f72a05e828585856dacd553fba484c242c46e391fb0e58917c942ee9202915c" dependencies = [ "futures-util", "log", @@ -6936,19 +6936,7 @@ dependencies = [ "rustls-pki-types", "tokio", "tokio-rustls", - "tungstenite 0.28.0", -] - -[[package]] -name = "tokio-tungstenite" -version = "0.29.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f72a05e828585856dacd553fba484c242c46e391fb0e58917c942ee9202915c" -dependencies = [ - "futures-util", - "log", - "tokio", - "tungstenite 0.29.0", + "tungstenite", ] [[package]] @@ -7333,25 +7321,6 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" -[[package]] -name = "tungstenite" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8628dcc84e5a09eb3d8423d6cb682965dea9133204e8fb3efee74c2a0c259442" -dependencies = [ - "bytes", - "data-encoding", - "http", - "httparse", - "log", - "rand 0.9.4", - "rustls", - "rustls-pki-types", - "sha1", - "thiserror 2.0.18", - "utf-8", -] - [[package]] name = "tungstenite" version = "0.29.0" @@ -7364,6 +7333,8 @@ dependencies = [ "httparse", "log", "rand 0.9.4", + "rustls", + "rustls-pki-types", "sha1", "thiserror 2.0.18", ] @@ -7589,12 +7560,6 @@ dependencies = [ "serde_derive", ] -[[package]] -name = "utf-8" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" - [[package]] name = "utf8_iter" version = "1.0.4" @@ -8021,7 +7986,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ebad98951..78a998101 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,7 +52,7 @@ moq-mux = { version = "0.5", path = "rs/moq-mux" } moq-native = { version = "0.15", path = "rs/moq-native", default-features = false } moq-net = { version = "0.1", path = "rs/moq-net" } moq-token = { version = "0.6", path = "rs/moq-token" } -qmux = { version = "0.0.7", default-features = false } +qmux = { version = "0.1.0", default-features = false } serde = { version = "1", features = ["derive"] } tokio = "1.48" diff --git a/bun.lock b/bun.lock index d9d08ee77..c774277ae 100644 --- a/bun.lock +++ b/bun.lock @@ -1,6 +1,5 @@ { "lockfileVersion": 1, - "configVersion": 0, "workspaces": { "": { "name": "moq", @@ -145,7 +144,7 @@ "name": "@moq/net", "version": "0.1.1", "dependencies": { - "@moq/qmux": "^0.0.6", + "@moq/qmux": "^0.1.0", "@moq/signals": "workspace:*", "async-mutex": "^0.5.0", }, @@ -509,7 +508,7 @@ "@moq/publish": ["@moq/publish@workspace:js/publish"], - "@moq/qmux": ["@moq/qmux@0.0.6", "", {}, "sha512-ISuGz05lUvf1hzHW3Aw3VnsGRJe1w9Qdog3LQ66KS+l+5mzQsPANvW8yOioEe1Z9dJO2G3sAHoGPnzwnsY9SIQ=="], + "@moq/qmux": ["@moq/qmux@0.1.0", "", {}, "sha512-9Iieb+iV4WmZew62KsOuVwvTwLNQV6by6DT76qDuU0i3yfOYrVs56LGEvxvAn+Da2rlLlA2z4Mjq9HdeuAkxfw=="], "@moq/signals": ["@moq/signals@workspace:js/signals"], diff --git a/js/net/package.json b/js/net/package.json index fd22632d5..9bad775e4 100644 --- a/js/net/package.json +++ b/js/net/package.json @@ -17,7 +17,7 @@ "release": "bun ../common/release.ts" }, "dependencies": { - "@moq/qmux": "^0.0.6", + "@moq/qmux": "^0.1.0", "@moq/signals": "workspace:*", "async-mutex": "^0.5.0" }, diff --git a/js/net/src/connection/connect.ts b/js/net/src/connection/connect.ts index 37dc2727e..81acb3f72 100644 --- a/js/net/src/connection/connect.ts +++ b/js/net/src/connection/connect.ts @@ -335,7 +335,15 @@ async function connectWebSocket(url: URL, delay: number, cancel: Promise): const active = await Promise.race([cancel, timer.then(() => true)]); if (!active) return undefined; - const quic = new Session(url); + // qmux 0.1.0 pins a single QMux version per Session. Pick qmux-01 (the + // latest) and offer the application protocols that the spec allows on it: + // moq-transport-18 requires qmux-01, and moq-lite is unconstrained so the + // modern versions all ride here. Older moq-transport drafts (15/16/17) + // need qmux-00 and aren't reachable via this fallback path. + const quic = new Session(url, { + version: "qmux-01", + protocols: [Lite.ALPN_04, Lite.ALPN_03, Lite.ALPN, Ietf.ALPN.DRAFT_18], + }); // Wait for the WebSocket to connect, or for the cancel promise to resolve. // Close the connection if we lost the race. diff --git a/rs/moq-native/src/client.rs b/rs/moq-native/src/client.rs index 09fcade0e..1ecc00679 100644 --- a/rs/moq-native/src/client.rs +++ b/rs/moq-native/src/client.rs @@ -211,7 +211,6 @@ impl Default for ClientConfig { #[derive(Clone)] pub struct Client { moq: moq_net::Client, - versions: moq_net::Versions, backoff: Backoff, #[cfg(feature = "websocket")] websocket: super::ClientWebSocket, @@ -277,10 +276,8 @@ impl Client { _ => None, }; - let versions = config.versions(); Ok(Self { - moq: moq_net::Client::new().with_versions(versions.clone()), - versions, + moq: moq_net::Client::new().with_versions(config.versions()), backoff: config.backoff, #[cfg(feature = "websocket")] websocket: config.websocket, @@ -392,8 +389,8 @@ impl Client { #[cfg(feature = "websocket")] { - let alpns = self.versions.alpns(); - let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns); + let alpns = moq_net::QMUX_ALPNS; + let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, alpns); return Ok(tokio::select! { Ok(quic) = quic_handle => self.moq.connect(quic).await?, @@ -423,8 +420,8 @@ impl Client { #[cfg(feature = "websocket")] { - let alpns = self.versions.alpns(); - let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns); + let alpns = moq_net::QMUX_ALPNS; + let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, alpns); return Ok(tokio::select! { Ok(quic) = quic_handle => self.moq.connect(quic).await?, @@ -453,8 +450,8 @@ impl Client { #[cfg(feature = "websocket")] { - let alpns = self.versions.alpns(); - let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns); + let alpns = moq_net::QMUX_ALPNS; + let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, alpns); return Ok(tokio::select! { Ok(quic) = quic_handle => self.moq.connect(quic).await?, @@ -472,8 +469,8 @@ impl Client { #[cfg(feature = "websocket")] { - let alpns = self.versions.alpns(); - let session = crate::websocket::connect(&self.websocket, &self.tls, url, &alpns).await?; + let alpns = moq_net::QMUX_ALPNS; + let session = crate::websocket::connect(&self.websocket, &self.tls, url, alpns).await?; return Ok(self.moq.connect(session).await?); } diff --git a/rs/moq-native/src/websocket.rs b/rs/moq-native/src/websocket.rs index b5ee23202..9d40260c2 100644 --- a/rs/moq-native/src/websocket.rs +++ b/rs/moq-native/src/websocket.rs @@ -1,5 +1,7 @@ use anyhow::Context; +use moq_net::QmuxVersion; use qmux::tokio_tungstenite; +use qmux::tungstenite; use std::collections::HashSet; use std::sync::{Arc, LazyLock, Mutex}; use std::{net, time}; @@ -45,11 +47,27 @@ impl Default for ClientWebSocket { } } +fn qmux_version(qv: QmuxVersion) -> qmux::Version { + // `QmuxVersion` is `#[non_exhaustive]`, so we need a fallthrough even + // though both current variants are listed. New variants will hit this + // arm at runtime, which we'd want to update to handle cleanly. + match qv { + QmuxVersion::QMux00 => qmux::Version::QMux00, + QmuxVersion::QMux01 => qmux::Version::QMux01, + _ => unreachable!("unknown QmuxVersion variant"), + } +} + +/// Format a `(QmuxVersion, app)` pair as the `qmux-XX.app` subprotocol string. +fn pair_to_alpn(qv: QmuxVersion, app: &str) -> String { + format!("{}.{}", qv.alpn(), app) +} + pub(crate) async fn race_handle( config: &ClientWebSocket, tls: &rustls::ClientConfig, url: Url, - alpns: &[&str], + alpns: &[(QmuxVersion, &str)], ) -> Option> { if !config.enabled { return None; @@ -73,9 +91,10 @@ pub(crate) async fn connect( config: &ClientWebSocket, tls: &rustls::ClientConfig, mut url: Url, - alpns: &[&str], + alpns: &[(QmuxVersion, &str)], ) -> anyhow::Result { anyhow::ensure!(config.enabled, "WebSocket support is disabled"); + anyhow::ensure!(!alpns.is_empty(), "no WebSocket subprotocols to offer"); let host = url.host_str().context("missing hostname")?.to_string(); let port = url.port().unwrap_or_else(|| match url.scheme() { @@ -121,14 +140,40 @@ pub(crate) async fn connect( tokio_tungstenite::Connector::Plain }; - let session = qmux::Client::new() - .with_protocols(alpns) - .with_connector(connector) - .connect(url.as_str()) + // Build the request ourselves so we can advertise the full `qmux-XX.app` + // pair list in a single connection. qmux is one-version-per-connection; + // moq-native owns the multi-version multiplexing. + use tungstenite::client::IntoClientRequest; + let mut request = url.as_str().into_client_request().context("invalid WebSocket URL")?; + let formatted: Vec = alpns.iter().map(|(qv, app)| pair_to_alpn(*qv, app)).collect(); + let protocol_value = formatted.join(", "); + request.headers_mut().insert( + tungstenite::http::header::SEC_WEBSOCKET_PROTOCOL, + tungstenite::http::HeaderValue::from_str(&protocol_value).context("invalid Sec-WebSocket-Protocol value")?, + ); + + let (ws, response) = tokio_tungstenite::connect_async_tls_with_config(request, None, false, Some(connector)) .await .context("failed to connect WebSocket")?; - tracing::warn!(%url, "using WebSocket fallback"); + let negotiated = response + .headers() + .get(tungstenite::http::header::SEC_WEBSOCKET_PROTOCOL) + .and_then(|h| h.to_str().ok()) + .context("server did not select a Sec-WebSocket-Protocol")?; + // The server can only pick something we offered, so we recover the qmux + // version by index in the pair list rather than re-parsing the prefix. + let idx = formatted + .iter() + .position(|s| s == negotiated) + .with_context(|| format!("server picked an alpn we did not offer: {negotiated}"))?; + let (qv, _) = alpns[idx]; + + let session = qmux::ws::Upgraded::new(ws, qmux_version(qv)) + .with_alpn(negotiated) + .connect(); + + tracing::warn!(%url, ?qv, %negotiated, "using WebSocket fallback"); WEBSOCKET_WON.lock().unwrap().insert(key); Ok(session) @@ -140,18 +185,29 @@ pub(crate) async fn connect( /// alongside QUIC connections on a separate port. pub struct WebSocketListener { listener: tokio::net::TcpListener, - server: qmux::Server, + pairs: &'static [(QmuxVersion, &'static str)], + // Pre-formatted `qmux-XX.app` strings, same order as `pairs`. The handshake + // callback matches against these and we look up the qmux version by index. + formatted: Arc>, } impl WebSocketListener { pub async fn bind(addr: net::SocketAddr) -> anyhow::Result { - Self::bind_with_alpns(addr, moq_net::ALPNS).await + Self::bind_with_alpns(addr, moq_net::QMUX_ALPNS).await } - pub async fn bind_with_alpns(addr: net::SocketAddr, alpns: &[&str]) -> anyhow::Result { + pub async fn bind_with_alpns( + addr: net::SocketAddr, + alpns: &'static [(QmuxVersion, &'static str)], + ) -> anyhow::Result { + anyhow::ensure!(!alpns.is_empty(), "no WebSocket subprotocols to accept"); let listener = tokio::net::TcpListener::bind(addr).await?; - let server = qmux::Server::new().with_protocols(alpns); - Ok(Self { listener, server }) + let formatted = alpns.iter().map(|(qv, app)| pair_to_alpn(*qv, app)).collect(); + Ok(Self { + listener, + pairs: alpns, + formatted: Arc::new(formatted), + }) } pub fn local_addr(&self) -> anyhow::Result { @@ -162,15 +218,74 @@ impl WebSocketListener { match self.listener.accept().await { Ok((stream, addr)) => { tracing::debug!(%addr, "accepted WebSocket TCP connection"); - let server = self.server.clone(); - Some( - server - .accept(stream) - .await - .map_err(|e| anyhow::anyhow!("WebSocket accept failed: {e}")), - ) + Some(accept_socket(stream, self.pairs, self.formatted.clone()).await) } Err(e) => Some(Err(e.into())), } } } + +async fn accept_socket( + stream: tokio::net::TcpStream, + pairs: &'static [(QmuxVersion, &'static str)], + formatted: Arc>, +) -> anyhow::Result { + use std::sync::Mutex; + use tungstenite::handshake::server; + use tungstenite::http; + + // Capture the negotiated string from inside the handshake callback. + let chosen_slot: Arc>> = Arc::new(Mutex::new(None)); + let slot = chosen_slot.clone(); + let supported = formatted.clone(); + + #[allow(clippy::result_large_err)] + let callback = move |req: &server::Request, + mut response: server::Response| + -> Result { + let header_protocols: Vec<&str> = req + .headers() + .get_all(http::header::SEC_WEBSOCKET_PROTOCOL) + .iter() + .filter_map(|v| v.to_str().ok()) + .flat_map(|h| h.split(',')) + .map(|p| p.trim()) + .filter(|p| !p.is_empty()) + .collect(); + + // Pick the first server-supported protocol that the client offered. + match supported.iter().find(|s| header_protocols.contains(&s.as_str())) { + Some(picked) => { + response.headers_mut().insert( + http::header::SEC_WEBSOCKET_PROTOCOL, + http::HeaderValue::from_str(picked).expect("alpn must be valid HTTP value"), + ); + *slot.lock().unwrap() = Some(picked.clone()); + Ok(response) + } + None => Err(http::Response::builder() + .status(http::StatusCode::BAD_REQUEST) + .body(Some("no supported Sec-WebSocket-Protocol".to_string())) + .unwrap()), + } + }; + + let ws = tokio_tungstenite::accept_hdr_async_with_config(stream, callback, None) + .await + .context("WebSocket handshake failed")?; + + let negotiated = chosen_slot + .lock() + .unwrap() + .take() + .context("handshake completed without setting negotiated protocol")?; + let idx = formatted + .iter() + .position(|s| *s == negotiated) + .expect("callback only writes strings drawn from `formatted`"); + let (qv, _) = pairs[idx]; + + Ok(qmux::ws::Upgraded::new(ws, qmux_version(qv)) + .with_alpn(&negotiated) + .accept()) +} diff --git a/rs/moq-net/src/version.rs b/rs/moq-net/src/version.rs index becd3c019..b06f23449 100644 --- a/rs/moq-net/src/version.rs +++ b/rs/moq-net/src/version.rs @@ -41,6 +41,76 @@ pub(crate) const ALPN_16: &str = "moqt-16"; pub(crate) const ALPN_17: &str = "moqt-17"; pub(crate) const ALPN_18: &str = "moqt-18"; +/// Default `(qmux_version, app_alpn)` pairs to advertise over WebSocket / TLS, +/// in preference order. +/// +/// Each pair encodes the spec mapping: moq-transport-18 rides on qmux-01, +/// moq-transport-14..17 on qmux-00, and moq-lite-01..04 are dual-advertised on +/// both for back-compat. Lite05Wip is intentionally absent (opt-in only). +/// +/// See also [`QMUX_ALPN_STRINGS`] for the same list formatted as +/// `Sec-WebSocket-Protocol` strings. +pub const QMUX_ALPNS: &[(QmuxVersion, &str)] = &[ + (QmuxVersion::QMux01, ALPN_LITE_04), + (QmuxVersion::QMux00, ALPN_LITE_04), + (QmuxVersion::QMux01, ALPN_LITE_03), + (QmuxVersion::QMux00, ALPN_LITE_03), + (QmuxVersion::QMux01, ALPN_LITE), + (QmuxVersion::QMux00, ALPN_LITE), + (QmuxVersion::QMux01, ALPN_18), + (QmuxVersion::QMux00, ALPN_17), + (QmuxVersion::QMux00, ALPN_16), + (QmuxVersion::QMux00, ALPN_15), + (QmuxVersion::QMux00, ALPN_14), +]; + +/// [`QMUX_ALPNS`] flattened to `"qmux-XX.app"` strings, ready to drop into a +/// `Sec-WebSocket-Protocol` header (or any TLS ALPN list). +pub const QMUX_ALPN_STRINGS: &[&str] = &[ + "qmux-01.moq-lite-04", + "qmux-00.moq-lite-04", + "qmux-01.moq-lite-03", + "qmux-00.moq-lite-03", + "qmux-01.moql", + "qmux-00.moql", + "qmux-01.moqt-18", + "qmux-00.moqt-17", + "qmux-00.moqt-16", + "qmux-00.moqt-15", + "qmux-00.moq-00", +]; + +/// The qmux draft version used to carry a MoQ ALPN over WebSocket / TLS. +/// +/// The MoQ WG decided that qmux's version is tied to the moq-transport draft +/// (moq-transport-18 requires qmux-01; moq-transport-14..17 use qmux-00). +/// moq-lite is unconstrained and may ride on either. +/// +/// Mirrors `qmux::Version` but kept local so `moq-net` stays independent of +/// the `qmux` crate; the `moq-native` layer converts at the boundary. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[non_exhaustive] +pub enum QmuxVersion { + QMux00, + QMux01, +} + +impl QmuxVersion { + /// The bare ALPN string for this qmux version. + pub fn alpn(&self) -> &'static str { + match self { + Self::QMux00 => "qmux-00", + Self::QMux01 => "qmux-01", + } + } +} + +impl fmt::Display for QmuxVersion { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.alpn()) + } +} + /// A MoQ protocol version. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] #[non_exhaustive] @@ -126,6 +196,31 @@ impl Version { ) } + /// The qmux versions this MoQ version may ride on, in preference order. + /// + /// moq-transport-18 requires qmux-01; moq-transport-14..17 require qmux-00. + /// Existing moq-lite versions (Lite01..Lite04) advertise both for back-compat. + /// Future moq-lite versions should pin to a single qmux version, like moq-transport. + /// Lite05Wip is opt-in only and pins to qmux-01. + pub fn qmux_versions(&self) -> &'static [QmuxVersion] { + use ietf::Version as I; + use lite::Version as L; + match self { + Self::Ietf(I::Draft18) => &[QmuxVersion::QMux01], + Self::Ietf(I::Draft14 | I::Draft15 | I::Draft16 | I::Draft17) => &[QmuxVersion::QMux00], + Self::Lite(L::Lite01 | L::Lite02 | L::Lite03 | L::Lite04) => &[QmuxVersion::QMux01, QmuxVersion::QMux00], + Self::Lite(L::Lite05Wip) => &[QmuxVersion::QMux01], + } + } + + /// Whether this MoQ version is permitted to ride on the given qmux version. + /// + /// Use server-side after the qmux/app pair has been negotiated to reject + /// pairings the moq-transport spec forbids (e.g. `qmux-00.moqt-18`). + pub fn accepts_qmux(&self, qv: QmuxVersion) -> bool { + self.qmux_versions().contains(&qv) + } + /// Whether this is a lite protocol version. pub fn is_lite(&self) -> bool { match self { @@ -306,3 +401,93 @@ impl From for coding::Versions { coding::Versions::from(inner) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn qmux_versions_for_each_moq_version() { + assert_eq!( + Version::Ietf(ietf::Version::Draft18).qmux_versions(), + &[QmuxVersion::QMux01] + ); + for v in [ + ietf::Version::Draft14, + ietf::Version::Draft15, + ietf::Version::Draft16, + ietf::Version::Draft17, + ] { + assert_eq!(Version::Ietf(v).qmux_versions(), &[QmuxVersion::QMux00], "{v}"); + } + for v in [ + lite::Version::Lite01, + lite::Version::Lite02, + lite::Version::Lite03, + lite::Version::Lite04, + ] { + assert_eq!( + Version::Lite(v).qmux_versions(), + &[QmuxVersion::QMux01, QmuxVersion::QMux00], + "{v}" + ); + } + assert_eq!( + Version::Lite(lite::Version::Lite05Wip).qmux_versions(), + &[QmuxVersion::QMux01] + ); + } + + #[test] + fn accepts_qmux_is_consistent() { + assert!(Version::Ietf(ietf::Version::Draft18).accepts_qmux(QmuxVersion::QMux01)); + assert!(!Version::Ietf(ietf::Version::Draft18).accepts_qmux(QmuxVersion::QMux00)); + assert!(Version::Ietf(ietf::Version::Draft17).accepts_qmux(QmuxVersion::QMux00)); + assert!(!Version::Ietf(ietf::Version::Draft17).accepts_qmux(QmuxVersion::QMux01)); + assert!(Version::Lite(lite::Version::Lite04).accepts_qmux(QmuxVersion::QMux01)); + assert!(Version::Lite(lite::Version::Lite04).accepts_qmux(QmuxVersion::QMux00)); + } + + #[test] + fn qmux_alpns_table() { + assert_eq!( + QMUX_ALPNS, + &[ + (QmuxVersion::QMux01, "moq-lite-04"), + (QmuxVersion::QMux00, "moq-lite-04"), + (QmuxVersion::QMux01, "moq-lite-03"), + (QmuxVersion::QMux00, "moq-lite-03"), + (QmuxVersion::QMux01, "moql"), + (QmuxVersion::QMux00, "moql"), + (QmuxVersion::QMux01, "moqt-18"), + (QmuxVersion::QMux00, "moqt-17"), + (QmuxVersion::QMux00, "moqt-16"), + (QmuxVersion::QMux00, "moqt-15"), + (QmuxVersion::QMux00, "moq-00"), + ] + ); + } + + #[test] + fn qmux_alpn_strings_match_pairs() { + // Hand-rolled string list must agree with the typed pair list: same + // length, same order, and each string is `{qv.alpn()}.{app}`. + assert_eq!(QMUX_ALPN_STRINGS.len(), QMUX_ALPNS.len()); + for (s, (qv, app)) in QMUX_ALPN_STRINGS.iter().zip(QMUX_ALPNS) { + assert_eq!(*s, format!("{}.{}", qv.alpn(), app)); + } + } + + #[test] + fn qmux_alpns_excludes_lite_05_wip() { + // Lite05Wip is opt-in only; it must not leak into the default list. + assert!(!QMUX_ALPNS.iter().any(|(_, app)| *app == "moq-lite-05-wip")); + assert!(!QMUX_ALPN_STRINGS.iter().any(|s| s.contains("moq-lite-05-wip"))); + } + + #[test] + fn qmux_version_alpn_strings() { + assert_eq!(QmuxVersion::QMux00.alpn(), "qmux-00"); + assert_eq!(QmuxVersion::QMux01.alpn(), "qmux-01"); + } +} diff --git a/rs/moq-relay/src/websocket.rs b/rs/moq-relay/src/websocket.rs index 9701dec2b..f4e82684f 100644 --- a/rs/moq-relay/src/websocket.rs +++ b/rs/moq-relay/src/websocket.rs @@ -15,7 +15,7 @@ use axum::{ http::StatusCode, response::Response, }; -use moq_net::{OriginConsumer, OriginProducer, StatsHandle, Tier}; +use moq_net::{OriginConsumer, OriginProducer, QmuxVersion, StatsHandle, Tier}; use crate::{AuthParams, AuthToken, WebState, web::AuthQuery, web::MtlsPeer, web::landing_response}; @@ -32,7 +32,7 @@ pub(crate) async fn serve_ws( return Ok(landing_response()); }; - let ws = ws.protocols(["webtransport"]); + let ws = ws.protocols(moq_net::QMUX_ALPN_STRINGS.iter().copied()); let params = AuthParams { path, jwt: query.jwt }; let token = if mtls.is_some() { @@ -57,6 +57,24 @@ pub(crate) async fn serve_ws( Ok(ws.on_upgrade(async move |socket| { let id = state.conn_id.fetch_add(1, Ordering::Relaxed); + // Pull the negotiated subprotocol off the WebSocket before we wrap it + // in adapters. Without it we can't tell which qmux draft the peer + // expects to speak. + let Some(negotiated) = socket.protocol().and_then(|h| h.to_str().ok()).map(str::to_owned) else { + tracing::warn!("client connected with no Sec-WebSocket-Protocol"); + return; + }; + // Axum filtered to QMUX_ALPN_STRINGS for us, so the negotiated value + // must be one of those entries; recover the qmux draft by index. + let Some(idx) = moq_net::QMUX_ALPN_STRINGS + .iter() + .position(|s| *s == negotiated.as_str()) + else { + tracing::warn!(%negotiated, "client negotiated an unrecognized Sec-WebSocket-Protocol"); + return; + }; + let (qv, _) = moq_net::QMUX_ALPNS[idx]; + // Unfortunately, we need to convert from Axum to Tungstenite. // Axum uses Tungstenite internally, but it's not exposed to avoid semvar issues. let socket = socket @@ -67,34 +85,60 @@ pub(crate) async fn serve_ws( tungstenite::Error::ConnectionClosed }) .with(tungstenite_to_axum); - let _ = handle_socket(id, socket, publish, subscribe, stats).await; + let handler = Handler { + id, + qv, + negotiated, + publish, + subscribe, + stats, + }; + let _ = handler.run(socket).await; })) } -#[tracing::instrument("ws", err, skip_all, fields(id = _id))] -async fn handle_socket( - _id: u64, - socket: T, +/// Owns the per-connection state for one upgraded WebSocket, ready to be wrapped +/// in a qmux session and handed off to `moq_net::Server`. +struct Handler { + id: u64, + qv: QmuxVersion, + negotiated: String, publish: Option, subscribe: Option, stats: StatsHandle, -) -> anyhow::Result<()> -where - T: futures::Stream> - + futures::Sink - + Send - + Unpin - + 'static, -{ - // Wrap the WebSocket in a WebTransport compatibility layer. - let ws = qmux::ws::Bare::new(socket).accept(); - let session = moq_net::Server::new() - .with_publish(subscribe) - .with_consume(publish) - .with_stats(stats) - .accept(ws) - .await?; - session.closed().await.map_err(Into::into) +} + +impl Handler { + #[tracing::instrument("ws", err, skip_all, fields(id = self.id, qmux = ?self.qv, alpn = %self.negotiated))] + async fn run(self, socket: T) -> anyhow::Result<()> + where + T: futures::Stream> + + futures::Sink + + Send + + Unpin + + 'static, + { + // Wrap the WebSocket in a qmux session pinned to the negotiated draft. + let ws = qmux::ws::Upgraded::new(socket, qmux_version(self.qv)) + .with_alpn(&self.negotiated) + .accept(); + let session = moq_net::Server::new() + .with_publish(self.subscribe) + .with_consume(self.publish) + .with_stats(self.stats) + .accept(ws) + .await?; + session.closed().await.map_err(Into::into) + } +} + +fn qmux_version(qv: QmuxVersion) -> qmux::Version { + // `QmuxVersion` is `#[non_exhaustive]`, hence the catch-all arm. + match qv { + QmuxVersion::QMux00 => qmux::Version::QMux00, + QmuxVersion::QMux01 => qmux::Version::QMux01, + _ => unreachable!("unknown QmuxVersion variant"), + } } // https://github.com/tokio-rs/axum/discussions/848#discussioncomment-11443587