diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index ce7c127be98..26cf761d111 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -23,7 +23,7 @@ use prometheus::{Histogram, IntGauge}; use scopeguard::{defer, ScopeGuard}; use serde::Deserialize; use spacetimedb::client::messages::{ - serialize, serialize_v2, IdentityTokenMessage, InUseSerializeBuffer, SerializeBuffer, SwitchedServerMessage, + serialize, serialize_v3, IdentityTokenMessage, InUseSerializeBuffer, SerializeBuffer, SwitchedServerMessage, ToProtocol, }; use spacetimedb::client::{ @@ -39,7 +39,7 @@ use spacetimedb::Identity; use spacetimedb_client_api_messages::websocket::v1 as ws_v1; use spacetimedb_client_api_messages::websocket::v2 as ws_v2; use spacetimedb_client_api_messages::websocket::v3 as ws_v3; -use spacetimedb_datastore::execution_context::WorkloadType; +use spacetimedb_lib::bsatn; use spacetimedb_lib::connection_id::{ConnectionId, ConnectionIdForUrl}; use tokio::sync::{mpsc, watch}; use tokio::task::JoinHandle; @@ -1290,13 +1290,151 @@ enum OutboundWsMessage { Message(OutboundMessage), } -/// Task that reads [`OutboundWsMessage`]s from `messages`, encodes them via -/// [`ws_encode_message`], and sends the resuling [`Frame`]s to `outgoing_frames`. +/// Controls how many binary protocol messages may be packed into a single +/// websocket payload. +/// +/// Protocol v2 requires one [`ws_v2::ServerMessage`] per websocket message. +/// Protocol v3 keeps the v2 message schema but permits multiple consecutive +/// v2 messages in a single websocket message. +#[derive(Clone, Copy, PartialEq, Eq)] +enum BinaryPayloadMode { + /// Flush after each binary server message. + Single, + /// Flush once after all available binary server messages are collected. + Coalesced, +} + +/// A binary websocket message plus the logical row count it contributes to +/// payload-level send metrics. +struct V2OutboundMessage { + message: ws_v2::ServerMessage, + num_rows: Option, +} + +/// Convert an outbound message into the binary websocket schema. +/// +/// Binary websocket connections should only receive v2 server messages. Error +/// messages are still represented by the v1 type in the internal send path, so +/// they are dropped here instead of being sent to a binary client. +fn v2_outbound_message(message: OutboundWsMessage) -> Option { + let message = match message { + OutboundWsMessage::Error(message) => { + log::error!( + "dropping v1 error message sent to a binary websocket client: {:?}", + message + ); + return None; + } + OutboundWsMessage::Message(message) => message, + }; + + let num_rows = message.num_rows(); + match message { + OutboundMessage::V2(message) => Some(V2OutboundMessage { message, num_rows }), + OutboundMessage::V1(message) => { + log::error!("dropping v1 message for a binary websocket connection: {:?}", message); + None + } + } +} + +/// Return the uncompressed v3 payload body bytes contributed by `message`. +/// +/// v3 payloads are formed by concatenating BSATN-encoded v2 server messages, so +/// this measures exactly the bytes that count against the coalescing limit. +fn v3_server_message_body_len(message: &ws_v2::ServerMessage) -> usize { + bsatn::to_len(message).expect("should be able to measure bsatn-encoded v2 server message") +} + +/// Return whether appending the next message would cross the v3 coalescing cap. +/// +/// An empty payload is always allowed to accept one message, even when that +/// message alone is larger than the cap. +fn v3_payload_would_exceed_limit(current_body_len: usize, next_message_body_len: usize) -> bool { + current_body_len != 0 && current_body_len.saturating_add(next_message_body_len) > V3_MAX_UNCOMPRESSED_PAYLOAD_SIZE +} + +/// Return whether a binary websocket payload is large enough to encode on Rayon. +/// +/// The row threshold preserves the historical behavior. The byte threshold makes +/// large v3 payloads without row counts take the same off-thread path. +fn is_large_binary_payload(num_rows: Option, uncompressed_body_len: usize) -> bool { + num_rows.is_some_and(|n| n > LARGE_MESSAGE_ROW_THRESHOLD) + || uncompressed_body_len >= V3_MAX_UNCOMPRESSED_PAYLOAD_SIZE +} + +/// Encoding receive batch size. +/// +/// This is deliberately tied to the client connection receive limit so the +/// websocket encoder can consume the batches produced by +/// [`ClientConnectionReceiver::recv_many`] without immediately re-batching +/// them to a different size. +const ENCODE_BATCH_SIZE: usize = ClientConnectionReceiver::DEFAULT_RECV_MANY_LIMIT; + +/// Row count above which websocket payload serialization is offloaded to Rayon. +const LARGE_MESSAGE_ROW_THRESHOLD: usize = 1024; + +/// Target maximum uncompressed v3 payload body size. +/// +/// The v3 binary body is a sequence of BSATN-encoded v2 server messages. The +/// one-byte compression tag is not counted here. This is a target, not a hard +/// rejection limit. One logical server message may exceed it, in which case the +/// message is sent by itself. +const V3_MAX_UNCOMPRESSED_PAYLOAD_SIZE: usize = 512 * 1024; + +/// Tracks serialize buffers that may be reusable once their frames have been +/// copied to the wire. +struct SerializeBufferPool { + config: ClientConfig, + available: ArrayQueue, + in_use: Vec, +} + +impl SerializeBufferPool { + const CAPACITY: usize = 16; + + fn new(config: ClientConfig) -> Self { + Self { + config, + available: ArrayQueue::new(Self::CAPACITY), + in_use: Vec::with_capacity(Self::CAPACITY), + } + } + + fn get(&mut self) -> SerializeBuffer { + self.reclaim(); + self.available + .pop() + .unwrap_or_else(|| SerializeBuffer::new(self.config)) + } + + fn hold(&mut self, in_use: InUseSerializeBuffer) { + if self.in_use.len() < Self::CAPACITY { + self.in_use.push(in_use); + } + } + + fn reclaim(&mut self) { + let mut i = 0; + while i < self.in_use.len() { + if self.in_use[i].is_unique() { + let in_use = self.in_use.swap_remove(i); + let buf = in_use.try_reclaim().expect("buffer should be unique"); + let _ = self.available.push(buf); + } else { + i += 1; + } + } + } +} + +/// Task that reads [`OutboundWsMessage`]s from `messages`, encodes them, and +/// sends the resulting [`Frame`]s to `outgoing_frames`. /// /// Meant to be [`tokio::spawn`]ed. /// /// The function also takes care of reusing serialization buffers and reporting -/// metrics via [`SendMetrics`].. +/// metrics via [`SendMetrics`]. async fn ws_encode_task( metrics: SendMetrics, config: ClientConfig, @@ -1304,91 +1442,255 @@ async fn ws_encode_task( outgoing_frames: mpsc::UnboundedSender, bsatn_rlb_pool: BsatnRowListBuilderPool, ) { - // Serialize buffers can be reclaimed once all frames of a message are - // copied to the wire. Since we don't know when that will happen, we prepare - // for a few messages to be in-flight, i.e. encoded but not yet sent. - const BUF_POOL_CAPACITY: usize = 16; - let buf_pool = ArrayQueue::new(BUF_POOL_CAPACITY); - let mut in_use_bufs: Vec> = Vec::with_capacity(BUF_POOL_CAPACITY); - - 'send: while let Some(message) = messages.recv().await { - // Drop serialize buffers with no external referent, - // returning them to the pool. - in_use_bufs.retain(|in_use| !in_use.is_unique()); - // Get a serialize buffer from the pool, - // or create a fresh one. - let buf = buf_pool.pop().unwrap_or_else(|| SerializeBuffer::new(config)); - - let in_use_buf = match message { - OutboundWsMessage::Error(message) => { - if config.version != WsVersion::V1 { - log::error!( - "dropping v1 error message sent to a binary websocket client: {:?}", - message - ); - continue; + let mut encoder = WsEncoder { + config, + buffers: SerializeBufferPool::new(config), + metrics: &metrics, + outgoing_frames: &outgoing_frames, + bsatn_rlb_pool: &bsatn_rlb_pool, + binary_server_messages: Vec::new(), + }; + let mut message_batch = Vec::new(); + while messages.recv_many(&mut message_batch, ENCODE_BATCH_SIZE).await != 0 { + log::trace!("encoding batch of {} websocket messages", message_batch.len()); + // `encode_batch` drains `message_batch` on success. If forwarding to + // the websocket send loop fails, the receiver is gone, so the encode + // task can terminate. + if encoder.encode_batch(&mut message_batch).await.is_err() { + break; + } + } +} + +/// Stateful websocket encoder for one client connection. +/// +/// The encoder owns reusable scratch storage: +/// +/// - [`SerializeBufferPool`] reuses byte buffers once encoded frames have been +/// copied to the socket task. +/// - `binary_server_messages` reuses the vector allocation used to assemble +/// v2/v3 binary websocket payloads. +struct WsEncoder<'a> { + config: ClientConfig, + buffers: SerializeBufferPool, + metrics: &'a SendMetrics, + outgoing_frames: &'a mpsc::UnboundedSender, + bsatn_rlb_pool: &'a BsatnRowListBuilderPool, + binary_server_messages: Vec, +} + +impl WsEncoder<'_> { + /// Encode a drained batch according to the websocket version negotiated by + /// the client. + async fn encode_batch( + &mut self, + message_batch: &mut Vec, + ) -> Result<(), mpsc::error::SendError> { + match self.config.version { + WsVersion::V1 => self.encode_v1_batch(message_batch).await, + WsVersion::V2 => self.encode_v2_batch(message_batch).await, + WsVersion::V3 => self.encode_v3_batch(message_batch).await, + } + } + + /// Encode a batch for the original v1 websocket protocols. + /// + /// v1 text/binary messages are encoded one logical message at a time. This + /// path also handles reducer errors, which still use the v1 message schema. + async fn encode_v1_batch( + &mut self, + message_batch: &mut Vec, + ) -> Result<(), mpsc::error::SendError> { + for message in message_batch.drain(..) { + match message { + OutboundWsMessage::Error(message) => { + self.encode_and_forward_v1_message(None, message, false).await?; } - let Ok(in_use) = ws_forward_frames( - &metrics, - &outgoing_frames, - None, - None, - ws_encode_message(config, buf, message, false, &bsatn_rlb_pool).await, - ) else { - break 'send; - }; - in_use - } - OutboundWsMessage::Message(message) => { - let workload = message.workload(); - let num_rows = message.num_rows(); - match message { - OutboundMessage::V2(server_message) => { - if config.version == WsVersion::V1 { + OutboundWsMessage::Message(message) => { + let num_rows = message.num_rows(); + match message { + OutboundMessage::V2(_) => { log::error!("dropping v2 message on v1 connection"); continue; } - - let Ok(in_use) = ws_forward_frames( - &metrics, - &outgoing_frames, - workload, - num_rows, - ws_encode_binary_message(config, buf, server_message, false, &bsatn_rlb_pool).await, - ) else { - break 'send; - }; - in_use - } - OutboundMessage::V1(message) => { - if config.version != WsVersion::V1 { - log::error!("dropping v1 message for a binary websocket connection: {:?}", message); - continue; + OutboundMessage::V1(message) => { + let is_large = num_rows.is_some_and(|n| n > LARGE_MESSAGE_ROW_THRESHOLD); + self.encode_and_forward_v1_message(num_rows, message, is_large).await?; } - - let is_large = num_rows.is_some_and(|n| n > 1024); - - let Ok(in_use) = ws_forward_frames( - &metrics, - &outgoing_frames, - workload, - num_rows, - ws_encode_message(config, buf, message, is_large, &bsatn_rlb_pool).await, - ) else { - break 'send; - }; - in_use } } } - }; + } + Ok(()) + } - if in_use_bufs.len() < BUF_POOL_CAPACITY { - in_use_bufs.push(scopeguard::guard(in_use_buf, |in_use| { - let buf = in_use.try_reclaim().expect("buffer should be unique"); - let _ = buf_pool.push(buf); - })); + /// Encode a batch for protocol v2. + /// + /// v2 uses the binary server-message schema, but each logical server + /// message must still be sent as its own websocket message. + async fn encode_v2_batch( + &mut self, + message_batch: &mut Vec, + ) -> Result<(), mpsc::error::SendError> { + self.encode_binary_batch(message_batch, BinaryPayloadMode::Single).await + } + + /// Encode a batch for protocol v3. + /// + /// v3 uses the same binary server-message schema as v2, but coalesces all + /// messages currently available from the encoder input into one websocket + /// payload. + async fn encode_v3_batch( + &mut self, + message_batch: &mut Vec, + ) -> Result<(), mpsc::error::SendError> { + self.encode_binary_batch(message_batch, BinaryPayloadMode::Coalesced) + .await + } + + /// Encode binary websocket payloads from a batch of outbound messages. + /// + /// `mode` is the only protocol-specific choice here: + /// + /// - [`BinaryPayloadMode::Single`] preserves the v2 wire format by + /// flushing after each message. + /// - [`BinaryPayloadMode::Coalesced`] uses the v3 wire format by flushing + /// after the whole batch has been accumulated. + async fn encode_binary_batch( + &mut self, + message_batch: &mut Vec, + mode: BinaryPayloadMode, + ) -> Result<(), mpsc::error::SendError> { + self.binary_server_messages.clear(); + self.binary_server_messages.reserve(match mode { + BinaryPayloadMode::Single => 1, + BinaryPayloadMode::Coalesced => message_batch.len(), + }); + let mut num_rows = None; + let mut uncompressed_body_len = 0; + + for message in message_batch.drain(..) { + // Drop messages that are not valid for a binary websocket + // connection. The conversion logs the protocol mismatch. + let Some(V2OutboundMessage { + message, + num_rows: message_num_rows, + }) = v2_outbound_message(message) + else { + continue; + }; + + let message_body_len = if mode == BinaryPayloadMode::Coalesced { + let message_body_len = v3_server_message_body_len(&message); + if v3_payload_would_exceed_limit(uncompressed_body_len, message_body_len) { + // v3 payload boundary: adding this message would cross the + // target byte limit, so flush the payload accumulated so far. + let is_large = is_large_binary_payload(num_rows, uncompressed_body_len); + self.encode_and_forward_binary_messages(num_rows, is_large).await?; + num_rows = None; + uncompressed_body_len = 0; + } + message_body_len + } else { + 0 + }; + + if let Some(message_num_rows) = message_num_rows { + // Payload metrics are emitted at websocket-payload granularity. + // In v3, one payload can contain several logical messages, so + // row counts are accumulated across the coalesced payload. + *num_rows.get_or_insert(0) += message_num_rows; + } + self.binary_server_messages.push(message); + uncompressed_body_len += message_body_len; + + if mode == BinaryPayloadMode::Single { + // v2 payload boundary: exactly one binary server message per + // websocket message. + let is_large = is_large_binary_payload(num_rows, uncompressed_body_len); + self.encode_and_forward_binary_messages(num_rows, is_large).await?; + num_rows = None; + uncompressed_body_len = 0; + } + } + + if self.binary_server_messages.is_empty() { + return Ok(()); } + + // v3 payload boundary: the remaining accumulated messages form a + // single coalesced websocket message. + let is_large = is_large_binary_payload(num_rows, uncompressed_body_len); + self.encode_and_forward_binary_messages(num_rows, is_large).await + } + + /// Encode and forward one v1 websocket message. + /// + /// v1 can produce either text or binary payloads depending on the client's + /// requested protocol, so it uses [`ws_encode_message`] rather than the + /// binary-only v2/v3 path. + async fn encode_and_forward_v1_message( + &mut self, + num_rows: Option, + message: impl ToProtocol + Send + 'static, + is_large: bool, + ) -> Result<(), mpsc::error::SendError> { + let config = self.config; + let bsatn_rlb_pool = self.bsatn_rlb_pool; + self.encode_and_forward_message(|buf| { + ws_encode_message(config, buf, message, is_large, bsatn_rlb_pool, num_rows) + }) + .await + } + + /// Encode and forward the currently accumulated binary server messages. + /// + /// This method is shared by v2 and v3. v2 calls it with exactly one + /// `binary_server_messages` entry; v3 calls it with the whole coalesced + /// batch. The actual bytes are produced by [`serialize_v3`], whose core + /// implementation also backs `serialize_v2`. + async fn encode_and_forward_binary_messages( + &mut self, + num_rows: Option, + is_large: bool, + ) -> Result<(), mpsc::error::SendError> { + let buf = self.buffers.get(); + // `spawn_rayon` requires a `'static` closure, so the message Vec cannot + // be borrowed from `self`. Move it into the closure and return the + // drained Vec afterward so its allocation is reused by the next batch. + let messages = std::mem::take(&mut self.binary_server_messages); + let compression = self.config.compression; + let bsatn_rlb_pool = self.bsatn_rlb_pool.clone(); + let (messages, timing, in_use, data) = maybe_spawn_encode(is_large, move || { + let mut messages = messages; + let (timing, in_use, data) = + time_encode(|| serialize_v3(&bsatn_rlb_pool, buf, messages.drain(..), compression)); + (messages, timing, in_use, data) + }) + .await; + self.binary_server_messages = messages; + let encoded = ws_encode_binary_frames(timing, in_use, data, num_rows); + let in_use = ws_forward_frames(self.metrics, self.outgoing_frames, encoded); + let in_use = in_use?; + self.buffers.hold(in_use); + Ok(()) + } + + /// Encode one websocket payload using a reusable serialization buffer, + /// forward its frames, then retain the buffer for later reuse. + async fn encode_and_forward_message( + &mut self, + encode: Encode, + ) -> Result<(), mpsc::error::SendError> + where + Encode: FnOnce(SerializeBuffer) -> Fut, + Fut: Future, + Frames: IntoIterator, + { + let buf = self.buffers.get(); + let in_use = ws_forward_frames(self.metrics, self.outgoing_frames, encode(buf).await)?; + self.buffers.hold(in_use); + Ok(()) } } @@ -1397,25 +1699,27 @@ async fn ws_encode_task( fn ws_forward_frames( metrics: &SendMetrics, outgoing_frames: &mpsc::UnboundedSender, - workload: Option, - num_rows: Option, - encoded: (EncodeMetrics, InUseSerializeBuffer, impl IntoIterator), + encoded: ( + EncodedPayloadMetrics, + InUseSerializeBuffer, + impl IntoIterator, + ), ) -> Result> { let (stats, in_use, frames) = encoded; - metrics.report(workload, num_rows, stats); + metrics.report(stats); frames.into_iter().try_for_each(|frame| outgoing_frames.send(frame))?; Ok(in_use) } -/// Some stats about serialization and compression. -/// -/// Returned by [`ws_encode_message`]. -struct EncodeMetrics { +/// Metrics for one encoded websocket payload. +struct EncodedPayloadMetrics { /// Time it took to serialize and (potentially) compress a message. /// Does not include scheduling overhead. timing: Duration, /// Length in bytes of the serialized and (potentially) compressed message. encoded_len: usize, + /// Number of logical rows included in the payload, if known. + num_rows: Option, } /// Encodes `message` into zero or more WebSocket [`Frame`]s. @@ -1432,7 +1736,7 @@ struct EncodeMetrics { /// of payload each, according to the rules laid out in [RFC6455], Section /// 5.4 Fragmentation. /// -/// Returns [`EncodeMetrics`], the [`InUseSerializeBuffer`] that was passed in +/// Returns [`EncodedPayloadMetrics`], the [`InUseSerializeBuffer`] that was passed in /// as `buf` for later reuse, and the [`Frame`]s. /// /// NOTE: When sending, the frames of a single message MUST NOT be interleaved @@ -1446,62 +1750,75 @@ async fn ws_encode_message( message: impl ToProtocol + Send + 'static, is_large_message: bool, bsatn_rlb_pool: &BsatnRowListBuilderPool, -) -> (EncodeMetrics, InUseSerializeBuffer, impl Iterator) { - const FRAGMENT_SIZE: usize = 4096; - - fn serialize_and_compress( - bsatn_rlb_pool: &BsatnRowListBuilderPool, - serialize_buf: SerializeBuffer, - message: impl ToProtocol + Send + 'static, - config: ClientConfig, - ) -> (Duration, InUseSerializeBuffer, DataMessage) { - let start = Instant::now(); - let (msg_alloc, msg_data) = serialize(bsatn_rlb_pool, serialize_buf, message, config); - (start.elapsed(), msg_alloc, msg_data) - } - let (timing, msg_alloc, msg_data) = if is_large_message { - let bsatn_rlb_pool = bsatn_rlb_pool.clone(); - spawn_rayon(move || serialize_and_compress(&bsatn_rlb_pool, buf, message, config)).await - } else { - serialize_and_compress(bsatn_rlb_pool, buf, message, config) - }; - - let metrics = EncodeMetrics { - timing, - encoded_len: msg_data.len(), - }; + num_rows: Option, +) -> (EncodedPayloadMetrics, InUseSerializeBuffer, impl Iterator) { + let bsatn_rlb_pool = bsatn_rlb_pool.clone(); + // Serialization/compression can dominate large subscription or query + // responses, so large payloads are offloaded to Rayon. + let (timing, in_use, msg_data) = maybe_spawn_encode(is_large_message, move || { + time_encode(|| serialize(&bsatn_rlb_pool, buf, message, config)) + }) + .await; + let encoded_len = msg_data.len(); let (data, ty) = match msg_data { DataMessage::Text(text) => (bytestring_to_utf8bytes(text).into(), Data::Text), DataMessage::Binary(bin) => (bin, Data::Binary), }; - let frames = fragment(data, ty, FRAGMENT_SIZE); + ws_encode_frames(timing, in_use, encoded_len, data, ty, num_rows) +} - (metrics, msg_alloc, frames) +/// Run `encode` on Rayon when the payload is expected to be large. +/// +/// Small payloads stay on the async task to avoid Rayon scheduling overhead. +async fn maybe_spawn_encode(is_large: bool, encode: impl FnOnce() -> T + Send + 'static) -> T { + if is_large { + spawn_rayon(encode).await + } else { + encode() + } } -async fn ws_encode_binary_message( - config: ClientConfig, - buf: SerializeBuffer, - message: ws_v2::ServerMessage, - is_large_message: bool, - bsatn_rlb_pool: &BsatnRowListBuilderPool, -) -> (EncodeMetrics, InUseSerializeBuffer, impl Iterator + use<>) { +/// Measure serialization/compression time for one websocket payload. +fn time_encode(encode: impl FnOnce() -> (InUseSerializeBuffer, T)) -> (Duration, InUseSerializeBuffer, T) { let start = Instant::now(); - let compression = config.compression; + let (in_use, data) = encode(); + (start.elapsed(), in_use, data) +} - let (in_use, data) = if is_large_message { - let bsatn_rlb_pool = bsatn_rlb_pool.clone(); - spawn_rayon(move || serialize_v2(&bsatn_rlb_pool, buf, message, compression)).await - } else { - serialize_v2(bsatn_rlb_pool, buf, message, compression) - }; +/// Build binary websocket frames and payload metrics for encoded bytes. +fn ws_encode_binary_frames( + timing: Duration, + in_use: InUseSerializeBuffer, + data: Bytes, + num_rows: Option, +) -> ( + EncodedPayloadMetrics, + InUseSerializeBuffer, + impl Iterator + use<>, +) { + ws_encode_frames(timing, in_use, data.len(), data, Data::Binary, num_rows) +} - let metrics = EncodeMetrics { - timing: start.elapsed(), - encoded_len: data.len(), +/// Build websocket frames and payload metrics for already-serialized bytes. +fn ws_encode_frames( + timing: Duration, + in_use: InUseSerializeBuffer, + encoded_len: usize, + data: Bytes, + ty: Data, + num_rows: Option, +) -> ( + EncodedPayloadMetrics, + InUseSerializeBuffer, + impl Iterator + use<>, +) { + let metrics = EncodedPayloadMetrics { + timing, + encoded_len, + num_rows, }; - let frames = fragment(data, Data::Binary, 4096); + let frames = fragment(data, ty, 4096); (metrics, in_use, frames) } @@ -1545,33 +1862,32 @@ impl ClientMessage { } } +/// Cached metric handles for the websocket send path. struct SendMetrics { - database: Identity, encode_timing: Histogram, + payload_size: Histogram, + payload_num_rows: Histogram, } impl SendMetrics { + /// Resolve metric handles for one database once per websocket send loop. fn new(database: Identity) -> Self { Self { encode_timing: WORKER_METRICS.websocket_serialize_secs.with_label_values(&database), - database, + payload_size: WORKER_METRICS.websocket_sent_msg_size.with_label_values(&database), + payload_num_rows: WORKER_METRICS.websocket_sent_num_rows.with_label_values(&database), } } - fn report(&self, workload: Option, num_rows: Option, encode: EncodeMetrics) { + /// Report one encoded websocket payload. + fn report(&self, encode: EncodedPayloadMetrics) { self.encode_timing.observe(encode.timing.as_secs_f64()); + self.payload_size.observe(encode.encoded_len as f64); - // These metrics should be updated together, - // or not at all. - if let (Some(workload), Some(num_rows)) = (workload, num_rows) { - WORKER_METRICS - .websocket_sent_num_rows - .with_label_values(&self.database, &workload) - .observe(num_rows as f64); - WORKER_METRICS - .websocket_sent_msg_size - .with_label_values(&self.database, &workload) - .observe(encode.encoded_len as f64); + if let Some(num_rows) = encode.num_rows { + // Some websocket payloads, such as control or error messages, do + // not correspond to a known logical row count. + self.payload_num_rows.observe(num_rows as f64); } } } diff --git a/crates/core/src/client/messages.rs b/crates/core/src/client/messages.rs index 798596b5bca..123c1c75d4a 100644 --- a/crates/core/src/client/messages.rs +++ b/crates/core/src/client/messages.rs @@ -196,20 +196,46 @@ pub fn serialize( /// conditional compression when configured. pub fn serialize_v2( bsatn_rlb_pool: &BsatnRowListBuilderPool, - mut buffer: SerializeBuffer, + buffer: SerializeBuffer, msg: ws_v2::ServerMessage, compression: ws_v1::Compression, +) -> (InUseSerializeBuffer, Bytes) { + serialize_v2_messages(bsatn_rlb_pool, buffer, std::iter::once(msg), compression) +} + +/// Serialize one or more [`ws_v2::ServerMessage`]s into a v3 websocket payload. +/// +/// Protocol v3 keeps the v2 message schema, but allows the uncompressed payload +/// body to contain consecutive BSATN-encoded server messages. +pub fn serialize_v3( + bsatn_rlb_pool: &BsatnRowListBuilderPool, + buffer: SerializeBuffer, + msgs: impl IntoIterator, + compression: ws_v1::Compression, +) -> (InUseSerializeBuffer, Bytes) { + serialize_v2_messages(bsatn_rlb_pool, buffer, msgs, compression) +} + +fn serialize_v2_messages( + bsatn_rlb_pool: &BsatnRowListBuilderPool, + mut buffer: SerializeBuffer, + msgs: impl IntoIterator, + compression: ws_v1::Compression, ) -> (InUseSerializeBuffer, Bytes) { let srv_msg = buffer.write_with_tag(ws_common::SERVER_MSG_COMPRESSION_TAG_NONE, |w| { - bsatn::to_writer(w.into_inner(), &msg).expect("should be able to bsatn encode v2 message"); + let out = w.into_inner(); + for msg in msgs { + write_v2_server_message(bsatn_rlb_pool, out, msg); + } }); let srv_msg_len = srv_msg.len(); + finalize_binary_serialize_buffer(buffer, srv_msg_len, compression) +} - // At this point, we no longer have a use for `msg`, - // so try to reclaim its buffers. +fn write_v2_server_message(bsatn_rlb_pool: &BsatnRowListBuilderPool, out: &mut BytesMut, msg: ws_v2::ServerMessage) { + bsatn::to_writer(out, &msg).expect("should be able to bsatn encode v2 message"); + // At this point, we no longer have a use for `msg`, so try to reclaim its buffers. msg.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer)); - - finalize_binary_serialize_buffer(buffer, srv_msg_len, compression) } #[derive(Debug, From)] diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 14ce95c5ccf..c1847fa6d1c 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -204,8 +204,8 @@ metrics_group!( pub tokio_mean_polls_per_park: GaugeVec, #[name = spacetime_websocket_sent_msg_size_bytes] - #[help = "The size of messages sent to connected sessions"] - #[labels(db: Identity, workload: WorkloadType)] + #[help = "The size of websocket payloads sent to connected sessions"] + #[labels(db: Identity)] // Prometheus histograms have default buckets, // which broadly speaking, // are tailored to measure the response time of a network service. @@ -219,8 +219,8 @@ metrics_group!( pub websocket_sent_msg_size: HistogramVec, #[name = spacetime_websocket_sent_num_rows] - #[help = "The number of rows sent to connected sessions"] - #[labels(db: Identity, workload: WorkloadType)] + #[help = "The number of rows sent in websocket payloads"] + #[labels(db: Identity)] // Prometheus histograms have default buckets, // which broadly speaking, // are tailored to measure the response time of a network service.