From 3b9ddcab0662df92a3ee15adc2a54ab8c17bbf3c Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Fri, 11 Jul 2025 12:31:22 +0200 Subject: [PATCH 01/11] client-api: Send WebSocket messages fragmented RFC 6455, Section 5.4 allows to send message fragmentation, and we can do that with tungstenite. --- crates/client-api/src/routes/subscribe.rs | 45 ++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 06aa427ef09..f10273bb0df 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -41,6 +41,8 @@ use tokio::sync::{mpsc, watch}; use tokio::task::JoinHandle; use tokio::time::error::Elapsed; use tokio::time::{sleep_until, timeout}; +use tokio_tungstenite::tungstenite::protocol::frame::coding::{Data, OpCode}; +use tokio_tungstenite::tungstenite::protocol::frame::Frame; use tokio_tungstenite::tungstenite::Utf8Bytes; use crate::auth::SpacetimeAuth; @@ -1046,7 +1048,48 @@ async fn send_message + Unpin>( report_ws_sent_metrics(database_identity, workload, num_rows, timing, &msg_data); let res = async { - ws.feed(datamsg_to_wsmsg(msg_data)).await?; + // EXPERIMENT: Send fragmented messages (RFC 6455, Section 5.4). + let (data, ty) = match datamsg_to_wsmsg(msg_data) { + WsMessage::Text(text) => (text.into(), Data::Text), + WsMessage::Binary(bin) => (bin, Data::Binary), + _ => unreachable!(), + }; + + const FRAGMENT_SIZE: usize = 4096; + + let total_len = data.len(); + + let mut frames = Vec::with_capacity(total_len / FRAGMENT_SIZE); + let mut offset = 0; + while offset < total_len { + let end = (offset + FRAGMENT_SIZE).min(total_len); + let chunk = data.slice(offset..end); + frames.push(Frame::message(chunk, OpCode::Data(Data::Continue), false)); + offset = end; + } + + match frames.as_mut_slice() { + [] => {} + [single] => { + let hdr = single.header_mut(); + hdr.is_final = true; + hdr.opcode = OpCode::Data(ty); + } + [first, .., last] => { + let hdr = first.header_mut(); + hdr.is_final = false; + hdr.opcode = OpCode::Data(ty); + + let hdr = last.header_mut(); + hdr.is_final = true; + hdr.opcode = OpCode::Data(Data::Continue); + } + } + + log::trace!("sending message in {} frames", frames.len()); + for frame in frames { + ws.feed(WsMessage::Frame(frame)).await?; + } // To reclaim the `msg_alloc` memory, we need `SplitSink` to push down // its item slot to the inner sink, which will copy the `Bytes` and // drop the reference. From 3095578df82e28cf04d24b7c4a786c8462d4e7d4 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Fri, 11 Jul 2025 14:38:29 +0200 Subject: [PATCH 02/11] fixup! client-api: Send WebSocket messages fragmented --- crates/client-api/src/routes/subscribe.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index f10273bb0df..e3cdeb58ee8 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -1059,7 +1059,7 @@ async fn send_message + Unpin>( let total_len = data.len(); - let mut frames = Vec::with_capacity(total_len / FRAGMENT_SIZE); + let mut frames = Vec::with_capacity((total_len / FRAGMENT_SIZE) + 1); let mut offset = 0; while offset < total_len { let end = (offset + FRAGMENT_SIZE).min(total_len); From 6176e0735918e5dff74bdbbd8ef59bdb71720c66 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 14 Jul 2025 15:58:43 +0200 Subject: [PATCH 03/11] Actually interleave control with data frames --- Cargo.lock | 1 + crates/client-api/Cargo.toml | 1 + crates/client-api/src/routes/subscribe.rs | 398 ++++++++++++++-------- crates/core/src/client/messages.rs | 7 + 4 files changed, 263 insertions(+), 144 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 31ebb4e8c2e..7b51af53bd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5340,6 +5340,7 @@ dependencies = [ "bytes", "bytestring", "chrono", + "crossbeam-queue", "derive_more", "email_address", "futures", diff --git a/crates/client-api/Cargo.toml b/crates/client-api/Cargo.toml index 186c5455d20..51324e087ed 100644 --- a/crates/client-api/Cargo.toml +++ b/crates/client-api/Cargo.toml @@ -49,6 +49,7 @@ jsonwebtoken.workspace = true scopeguard.workspace = true serde_with.workspace = true async-stream.workspace = true +crossbeam-queue.workspace = true [target.'cfg(not(target_env = "msvc"))'.dependencies] jemalloc_pprof.workspace = true diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index e3cdeb58ee8..eaf8d23e5e5 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -1,5 +1,5 @@ use std::fmt::Display; -use std::future::{poll_fn, Future}; +use std::future::Future; use std::num::NonZeroUsize; use std::panic; use std::pin::{pin, Pin}; @@ -15,14 +15,16 @@ use axum::Extension; use axum_extra::TypedHeader; use bytes::Bytes; use bytestring::ByteString; +use crossbeam_queue::ArrayQueue; use derive_more::From; use futures::{pin_mut, Sink, SinkExt, Stream, StreamExt}; use http::{HeaderValue, StatusCode}; -use prometheus::IntGauge; +use prometheus::{Histogram, IntGauge}; use scopeguard::{defer, ScopeGuard}; use serde::Deserialize; use spacetimedb::client::messages::{ - serialize, IdentityTokenMessage, SerializableMessage, SerializeBuffer, SwitchedServerMessage, ToProtocol, + serialize, IdentityTokenMessage, InUseSerializeBuffer, SerializableMessage, SerializeBuffer, SwitchedServerMessage, + ToProtocol, }; use spacetimedb::client::{ ClientActorId, ClientConfig, ClientConnection, DataMessage, MessageExecutionError, MessageHandleError, @@ -705,6 +707,7 @@ fn ws_recv_loop( if !state.closed() { yield ClientMessage::from_message(m); + continue; } // If closed, keep polling until either: // @@ -927,22 +930,51 @@ async fn ws_send_loop( mut messages: MeteredReceiver, mut unordered: mpsc::UnboundedReceiver, ) { - let mut messages_buf = Vec::with_capacity(32); - let mut serialize_buf = SerializeBuffer::new(config); + // The number of frames we'll `feed` to the `ws` sink in one iteration + // of the `select!` loop. + // + // This batching is done to allow control messages appearing on `unordered` + // to be interleaved with the sending of large messages split across some + // number of frames. + // + // This allows clients with slow connections to respond to `Ping`s, and + // avoid timing out while receiving large messages. + // + // The default frame size is 4KiB, hence we write in batches of 32KiB. + const FRAME_BATCH_SIZE: usize = 8; + let mut frames_batch = Vec::with_capacity(FRAME_BATCH_SIZE); + let (frames_tx, mut frames_rx) = mpsc::unbounded_channel(); - loop { + let (encode_tx, encode_rx) = mpsc::unbounded_channel(); + // Spawn the encode task. + // + // NOTE: It is not technically required to introduce parallelism for encoding. + // We spawn mainly to avoid having to manually poll the `ws_encode_task` + // future in the `select!` loop below, which would be quite error prone. + tokio::spawn(ws_encode_task( + SendMetrics::new(state.database), + config, + encode_rx, + frames_tx, + )); + + 'outer: loop { let closed = state.closed(); tokio::select! { - // `biased` towards the unordered queue, - // which may initiate a connection shutdown. + // `biased` because we want to: + // + // - give control messages precedence + // - and flush outstanding messages + // before taking on more encoding work biased; + // Check for control messages or execution errors. maybe_msg = unordered.recv() => { let Some(msg) = maybe_msg else { break; }; - // We shall not sent more data after a close frame, + // We shall not send more data after a close frame, // but keep polling `unordered` so that `ws_client_actor` keeps // waiting for an acknowledgement from the client, // even if it spuriously initiates another close itself. @@ -951,18 +983,33 @@ async fn ws_send_loop( } match msg { UnorderedWsMessage::Close(close_frame) => { + log::trace!("intiating close"); + state.close(); + // We won't be polling `messages` anymore, + // so let senders know. + messages.close(); + + // Send outstanding frames until one that has the FIN + // bit set. Ensures the client won't receive partial + // messages before we shut down. + log::trace!("draining outgoing frames"); + while let Some(frame) = frames_rx.recv().await { + let eof = frame.header().is_final; + if let Err(e) = ws.feed(WsMessage::Frame(frame)).await { + log::warn!("error sending frame: {e:#}"); + break 'outer; + } + + if eof { + break; + } + } + // Then send the close frame. log::trace!("sending close frame"); if let Err(e) = ws.send(WsMessage::Close(Some(close_frame))).await { log::warn!("error sending close frame: {e:#}"); break; } - // NOTE: It's ok to not update the state if we fail to - // send the close frame, because we assume that the main - // loop will exit when this future terminates. - state.close(); - // We won't be polling `messages` anymore, - // so let senders know. - messages.close(); }, UnorderedWsMessage::Ping(bytes) => { log::trace!("sending ping"); @@ -972,47 +1019,39 @@ async fn ws_send_loop( } }, UnorderedWsMessage::Error(err) => { - log::trace!("sending error result"); - let (msg_alloc, res) = send_message( - &state.database, - config, - serialize_buf, - None, - &mut ws, - err - ).await; - serialize_buf = msg_alloc; - - if let Err(e) = res { - log::warn!("websocket send error: {e}"); - break; - } + log::trace!("encoding execution error"); + encode_tx.send(OutboundMessage::Error(err)).unwrap(); }, } }, - n = messages.recv_many(&mut messages_buf, 32), if !closed => { - if n == 0 { - continue; - } - log::trace!("sending {n} outgoing messages"); - for msg in messages_buf.drain(..n) { - let (msg_alloc, res) = send_message( - &state.database, - config, - serialize_buf, - msg.workload().zip(msg.num_rows()), - &mut ws, - msg - ).await; - serialize_buf = msg_alloc; - - if let Err(e) = res { - log::warn!("websocket send error: {e}"); - return; + // Send a batch of frames. + // + // Branch is disabled if we already sent a close frame. + // + // TODO: If the client sent us a close frame and we're in the middle + // of a large message, we may not send them the whole message. + // If that turns out to be a problem, we'll need to keep track of + // which side initiated the close handshake. + // Unsure if `tungstenite` will support us here, i.e. allows to keep + // sending when the other side initiated the close. + n = frames_rx.recv_many(&mut frames_batch, FRAME_BATCH_SIZE), if !closed => { + log::trace!("sending batch of {n} frames"); + for frame in frames_batch.drain(..n) { + if let Err(e) = ws.feed(WsMessage::Frame(frame)).await { + log::warn!("error sending frame: {e:#}"); + break; } } }, + + // Take on more work. + // + // Branch is disabled if we already sent a close frame. + Some(message) = messages.recv(), if !closed => { + encode_tx.send(OutboundMessage::Message(message)).unwrap(); + }, + } if let Err(e) = ws.flush().await { @@ -1022,88 +1061,162 @@ async fn ws_send_loop( } } -/// Serialize and potentially compress `message`, and feed it to the `ws` sink. -async fn send_message + Unpin>( - database_identity: &Identity, +enum OutboundMessage { + Error(MessageExecutionError), + Message(SerializableMessage), +} + +/// Task that reads [`OutboundMessage`]s from `messages`, encodes them via +/// [`ws_encode_message`], and sends the resuling [`Frame`]s to `outgoing_frames`. +/// +/// Meant to be [`tokio::spawn`]ed. +/// +/// The function also takes care of reusing serialization buffers and reporting +/// metrics via [`SendMetrics`].. +async fn ws_encode_task( + metrics: SendMetrics, config: ClientConfig, - serialize_buf: SerializeBuffer, - metrics_metadata: Option<(WorkloadType, usize)>, - ws: &mut S, + mut messages: mpsc::UnboundedReceiver, + outgoing_frames: mpsc::UnboundedSender, +) { + // Serialize buffers can be reclaimed once all frames of a message are + // copied to the wire. Since we don't know when that will happen, we prepare + // for a few messages to be in-flight, i.e. encoded but not yet sent. + const BUF_POOL_CAPACITY: usize = 16; + let buf_pool = ArrayQueue::new(BUF_POOL_CAPACITY); + let mut in_use_bufs: Vec> = Vec::with_capacity(BUF_POOL_CAPACITY); + + while let Some(message) = messages.recv().await { + // Drop serialize buffers with no external referent, + // returning them to the pool. + in_use_bufs.retain(|in_use| !in_use.is_unique()); + // Get a serialize buffer from the pool, + // or create a fresh one. + let buf = buf_pool.pop().unwrap_or_else(|| SerializeBuffer::new(config)); + + let (workload, num_rows, stats, in_use_buf, frames) = match message { + OutboundMessage::Error(message) => { + let (stats, in_use, frames) = ws_encode_message(config, buf, message, false).await; + (None, None, stats, in_use, frames) + } + OutboundMessage::Message(message) => { + let workload = message.workload(); + let num_rows = message.num_rows(); + let is_large = num_rows.is_some_and(|n| n > 1024); + + let (stats, in_use, frames) = ws_encode_message(config, buf, message, is_large).await; + + (workload, num_rows, stats, in_use, frames) + } + }; + + metrics.report(workload, num_rows, stats); + if in_use_bufs.len() <= BUF_POOL_CAPACITY { + in_use_bufs.push(scopeguard::guard(in_use_buf, |in_use| { + let buf = in_use.try_reclaim().expect("buffer should be unique"); + let _ = buf_pool.push(buf); + })); + } + + for frame in frames { + if outgoing_frames.send(frame).is_err() { + break; + } + } + } +} + +/// Some stats about serialization and compression. +/// +/// Returned by [`ws_encode_message`]. +struct EncodeMetrics { + /// Time it took to serialize and (potentially) compress a message. + /// Does not include scheduling overhead. + timing: Duration, + /// Length in bytes of the serialized and (potentially) compressed message. + encoded_len: usize, +} + +/// Encodes `message` into zero or more WebSocket [`Frame`]s. +/// +/// The `message` is first [`serialize`]d. Depending on the serialized size, +/// client `config` and format (see [`SwitchedServerMessage`]), compression may +/// be applied to the serialized bytes. +/// +/// If `is_large_message` is true, serialization and compression if performed +/// on a `rayon` thread. The value should be chosen s.t. the overhead of +/// scheduling is likely to be lower than the overhead of compression itself. +/// +/// The resulting bytes are then split into [`Frame`]s of at most 4096 bytes +/// of payload each, according to the rules laid out in [RFC6455], Section +/// 5.4 Fragmentation. +/// +/// Returns [`EncodeMetrics`], the [`InUseSerializeBuffer`] that was passed in +/// as `buf` for later reuse, and the [`Frame`]s. +/// +/// NOTE: When sending, the frames of a single message MUST NOT be interleaved +/// with the frames of another message, except for control frames (`Close`, +/// `Ping`, `Pong`). +/// +/// [RFC6455]: https://datatracker.ietf.org/doc/html/rfc6455#section-5.4 +async fn ws_encode_message( + config: ClientConfig, + buf: SerializeBuffer, message: impl ToProtocol + Send + 'static, -) -> (SerializeBuffer, Result<(), S::Error>) { - let (workload, num_rows) = metrics_metadata.unzip(); - // Move large messages to a rayon thread, - // as serialization and compression can take a long time. - // The threshold of 1024 rows is arbitrary, and may need to be refined. + is_large_message: bool, +) -> (EncodeMetrics, InUseSerializeBuffer, Vec) { + const FRAGMENT_SIZE: usize = 4096; + let serialize_and_compress = |serialize_buf, message, config| { let start = Instant::now(); let (msg_alloc, msg_data) = serialize(serialize_buf, message, config); (start.elapsed(), msg_alloc, msg_data) }; - let (timing, msg_alloc, msg_data) = if num_rows.is_some_and(|n| n > 1024) { - spawn_rayon(move || serialize_and_compress(serialize_buf, message, config)).await + let (timing, msg_alloc, msg_data) = if is_large_message { + spawn_rayon(move || serialize_and_compress(buf, message, config)).await } else { - serialize_and_compress(serialize_buf, message, config) + serialize_and_compress(buf, message, config) }; - report_ws_sent_metrics(database_identity, workload, num_rows, timing, &msg_data); - - let res = async { - // EXPERIMENT: Send fragmented messages (RFC 6455, Section 5.4). - let (data, ty) = match datamsg_to_wsmsg(msg_data) { - WsMessage::Text(text) => (text.into(), Data::Text), - WsMessage::Binary(bin) => (bin, Data::Binary), - _ => unreachable!(), - }; - const FRAGMENT_SIZE: usize = 4096; + let metrics = EncodeMetrics { + timing, + encoded_len: msg_data.len(), + }; - let total_len = data.len(); + let (data, ty) = match msg_data { + DataMessage::Text(text) => (bytestring_to_utf8bytes(text).into(), Data::Text), + DataMessage::Binary(bin) => (bin, Data::Binary), + }; - let mut frames = Vec::with_capacity((total_len / FRAGMENT_SIZE) + 1); - let mut offset = 0; - while offset < total_len { - let end = (offset + FRAGMENT_SIZE).min(total_len); - let chunk = data.slice(offset..end); - frames.push(Frame::message(chunk, OpCode::Data(Data::Continue), false)); - offset = end; - } + let total_len = data.len(); + let mut frames = Vec::with_capacity((total_len / FRAGMENT_SIZE) + total_len % FRAGMENT_SIZE); + let mut offset = 0; + while offset < total_len { + let end = (offset + FRAGMENT_SIZE).min(total_len); + let chunk = data.slice(offset..end); + frames.push(Frame::message(chunk, OpCode::Data(Data::Continue), false)); + offset = end; + } - match frames.as_mut_slice() { - [] => {} - [single] => { - let hdr = single.header_mut(); - hdr.is_final = true; - hdr.opcode = OpCode::Data(ty); - } - [first, .., last] => { - let hdr = first.header_mut(); - hdr.is_final = false; - hdr.opcode = OpCode::Data(ty); - - let hdr = last.header_mut(); - hdr.is_final = true; - hdr.opcode = OpCode::Data(Data::Continue); - } + match frames.as_mut_slice() { + [] => {} + [single] => { + let hdr = single.header_mut(); + hdr.is_final = true; + hdr.opcode = OpCode::Data(ty); } - - log::trace!("sending message in {} frames", frames.len()); - for frame in frames { - ws.feed(WsMessage::Frame(frame)).await?; + [first, .., last] => { + let hdr = first.header_mut(); + hdr.is_final = false; + hdr.opcode = OpCode::Data(ty); + + let hdr = last.header_mut(); + hdr.is_final = true; + hdr.opcode = OpCode::Data(Data::Continue); } - // To reclaim the `msg_alloc` memory, we need `SplitSink` to push down - // its item slot to the inner sink, which will copy the `Bytes` and - // drop the reference. - // We don't want to flush the inner sink just yet, as we might be - // writing many messages. - // `SplitSink::poll_ready` does what we want. - poll_fn(|cx| ws.poll_ready_unpin(cx)).await } - .await; - // Reclaim can fail if we didn't succeed pushing down the data to the - // websocket. We must return a buffer, though, so create a fresh one. - let buf = msg_alloc.try_reclaim().unwrap_or_else(|| SerializeBuffer::new(config)); - (buf, res) + (metrics, msg_alloc, frames) } #[derive(Debug)] @@ -1128,37 +1241,34 @@ impl ClientMessage { } } -/// Report metrics on sent rows and message sizes to a websocket client. -fn report_ws_sent_metrics( - addr: &Identity, - workload: Option, - num_rows: Option, - serialize_duration: Duration, - msg_ws: &DataMessage, -) { - // These metrics should be updated together, - // or not at all. - if let (Some(workload), Some(num_rows)) = (workload, num_rows) { - WORKER_METRICS - .websocket_sent_num_rows - .with_label_values(addr, &workload) - .observe(num_rows as f64); - WORKER_METRICS - .websocket_sent_msg_size - .with_label_values(addr, &workload) - .observe(msg_ws.len() as f64); +struct SendMetrics { + database: Identity, + encode_timing: Histogram, +} + +impl SendMetrics { + fn new(database: Identity) -> Self { + Self { + encode_timing: WORKER_METRICS.websocket_serialize_secs.with_label_values(&database), + database, + } } - WORKER_METRICS - .websocket_serialize_secs - .with_label_values(addr) - .observe(serialize_duration.as_secs_f64()); -} + fn report(&self, workload: Option, num_rows: Option, encode: EncodeMetrics) { + self.encode_timing.observe(encode.timing.as_secs_f64()); -fn datamsg_to_wsmsg(msg: DataMessage) -> WsMessage { - match msg { - DataMessage::Text(text) => WsMessage::Text(bytestring_to_utf8bytes(text)), - DataMessage::Binary(bin) => WsMessage::Binary(bin), + // These metrics should be updated together, + // or not at all. + if let (Some(workload), Some(num_rows)) = (workload, num_rows) { + WORKER_METRICS + .websocket_sent_num_rows + .with_label_values(&self.database, &workload) + .observe(num_rows as f64); + WORKER_METRICS + .websocket_sent_msg_size + .with_label_values(&self.database, &workload) + .observe(encode.encoded_len as f64); + } } } @@ -1174,7 +1284,7 @@ fn bytestring_to_utf8bytes(s: ByteString) -> Utf8Bytes { #[cfg(test)] mod tests { use std::{ - future::Future, + future::{poll_fn, Future}, pin::Pin, sync::atomic::AtomicUsize, task::{Context, Poll}, diff --git a/crates/core/src/client/messages.rs b/crates/core/src/client/messages.rs index b37560469c6..b6c34761f9a 100644 --- a/crates/core/src/client/messages.rs +++ b/crates/core/src/client/messages.rs @@ -118,6 +118,13 @@ impl InUseSerializeBuffer { compressed, }) } + + pub fn is_unique(&self) -> bool { + match self { + InUseSerializeBuffer::Uncompressed { uncompressed, .. } => uncompressed.is_unique(), + InUseSerializeBuffer::Compressed { compressed, .. } => compressed.is_unique(), + } + } } /// Serialize `msg` into a [`DataMessage`] containing a [`ws::ServerMessage`]. From be30b1e75a3e02df38bf2308877a8c5a9d97e6e5 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 14 Jul 2025 16:28:51 +0200 Subject: [PATCH 04/11] fixup! Actually interleave control with data frames --- crates/client-api/src/routes/subscribe.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index eaf8d23e5e5..ea78f8e271b 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -1143,9 +1143,9 @@ struct EncodeMetrics { /// client `config` and format (see [`SwitchedServerMessage`]), compression may /// be applied to the serialized bytes. /// -/// If `is_large_message` is true, serialization and compression if performed +/// If `is_large_message` is true, serialization and compression is performed /// on a `rayon` thread. The value should be chosen s.t. the overhead of -/// scheduling is likely to be lower than the overhead of compression itself. +/// scheduling is expected to be lower than the overhead of compression itself. /// /// The resulting bytes are then split into [`Frame`]s of at most 4096 bytes /// of payload each, according to the rules laid out in [RFC6455], Section From bf846fe3f513caee00ab5583758cf2df2d5c1e02 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 14 Jul 2025 16:39:17 +0200 Subject: [PATCH 05/11] fixup! Actually interleave control with data frames --- crates/client-api/src/routes/subscribe.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index ea78f8e271b..5bcac7862de 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -938,7 +938,7 @@ async fn ws_send_loop( // number of frames. // // This allows clients with slow connections to respond to `Ping`s, and - // avoid timing out while receiving large messages. + // avoid timing out, while receiving large messages. // // The default frame size is 4KiB, hence we write in batches of 32KiB. const FRAME_BATCH_SIZE: usize = 8; @@ -1020,7 +1020,11 @@ async fn ws_send_loop( }, UnorderedWsMessage::Error(err) => { log::trace!("encoding execution error"); - encode_tx.send(OutboundMessage::Error(err)).unwrap(); + encode_tx + .send(OutboundMessage::Error(err)) + // `ws_encode_task` shouldn't terminate until + // `encode_tx` is dropped, except by panicking. + .expect("encode task panicked"); }, } }, @@ -1049,7 +1053,11 @@ async fn ws_send_loop( // // Branch is disabled if we already sent a close frame. Some(message) = messages.recv(), if !closed => { - encode_tx.send(OutboundMessage::Message(message)).unwrap(); + encode_tx + .send(OutboundMessage::Message(message)) + // `ws_encode_task` shouldn't terminate until + // `encode_tx` is dropped, except by panicking. + .expect("encode task panicked"); }, } From e8df533a497033f0718acac3022740db35d5c3f5 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 14 Jul 2025 20:29:56 +0200 Subject: [PATCH 06/11] Add test --- crates/client-api/src/routes/subscribe.rs | 123 +++++++++++++++++++--- 1 file changed, 106 insertions(+), 17 deletions(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 5bcac7862de..e9d38c4e37f 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -926,10 +926,28 @@ enum UnorderedWsMessage { async fn ws_send_loop( state: Arc, config: ClientConfig, + ws: impl Sink + Unpin, + messages: MeteredReceiver, + unordered: mpsc::UnboundedReceiver, +) { + let metrics = SendMetrics::new(state.database); + ws_send_loop_inner(state, ws, messages, unordered, |encode_rx, frames_tx| { + ws_encode_task(metrics, config, encode_rx, frames_tx) + }) + .await +} + +async fn ws_send_loop_inner( + state: Arc, mut ws: impl Sink + Unpin, - mut messages: MeteredReceiver, + mut messages: MeteredReceiver, mut unordered: mpsc::UnboundedReceiver, -) { + encoder: impl FnOnce(mpsc::UnboundedReceiver, mpsc::UnboundedSender) -> Encoder, +) where + T: Into, + U: From, + Encoder: Future + Send + 'static, +{ // The number of frames we'll `feed` to the `ws` sink in one iteration // of the `select!` loop. // @@ -948,15 +966,11 @@ async fn ws_send_loop( let (encode_tx, encode_rx) = mpsc::unbounded_channel(); // Spawn the encode task. // - // NOTE: It is not technically required to introduce parallelism for encoding. - // We spawn mainly to avoid having to manually poll the `ws_encode_task` - // future in the `select!` loop below, which would be quite error prone. - tokio::spawn(ws_encode_task( - SendMetrics::new(state.database), - config, - encode_rx, - frames_tx, - )); + // NOTE: It is not technically required to introduce parallelism for + // encoding. We spawn mainly to avoid having to manually poll the `Encoder` + // future in the `select!` loop below, which proved to be quite error + // prone in the past (looking at you, `also_poll`). + tokio::spawn(encoder(encode_rx, frames_tx)); 'outer: loop { let closed = state.closed(); @@ -993,7 +1007,7 @@ async fn ws_send_loop( // bit set. Ensures the client won't receive partial // messages before we shut down. log::trace!("draining outgoing frames"); - while let Some(frame) = frames_rx.recv().await { + while let Ok(frame) = frames_rx.try_recv() { let eof = frame.header().is_final; if let Err(e) = ws.feed(WsMessage::Frame(frame)).await { log::warn!("error sending frame: {e:#}"); @@ -1021,7 +1035,7 @@ async fn ws_send_loop( UnorderedWsMessage::Error(err) => { log::trace!("encoding execution error"); encode_tx - .send(OutboundMessage::Error(err)) + .send(err.into()) // `ws_encode_task` shouldn't terminate until // `encode_tx` is dropped, except by panicking. .expect("encode task panicked"); @@ -1054,7 +1068,7 @@ async fn ws_send_loop( // Branch is disabled if we already sent a close frame. Some(message) = messages.recv(), if !closed => { encode_tx - .send(OutboundMessage::Message(message)) + .send(message.into()) // `ws_encode_task` shouldn't terminate until // `encode_tx` is dropped, except by panicking. .expect("encode task panicked"); @@ -1069,6 +1083,7 @@ async fn ws_send_loop( } } +#[derive(From)] enum OutboundMessage { Error(MessageExecutionError), Message(SerializableMessage), @@ -1195,12 +1210,21 @@ async fn ws_encode_message( DataMessage::Text(text) => (bytestring_to_utf8bytes(text).into(), Data::Text), DataMessage::Binary(bin) => (bin, Data::Binary), }; + let frames = fragment(data, ty, FRAGMENT_SIZE); + (metrics, msg_alloc, frames) +} + +/// Split payload `data` of type `ty` into `fragment_size`d [`Frame`]s, +/// according to the rules laid out in [RFC6455], Section 5.4. +/// +/// [RFC6455]: https://datatracker.ietf.org/doc/html/rfc6455#section-5.4 +fn fragment(data: Bytes, ty: Data, fragment_size: usize) -> Vec { let total_len = data.len(); - let mut frames = Vec::with_capacity((total_len / FRAGMENT_SIZE) + total_len % FRAGMENT_SIZE); + let mut frames = Vec::with_capacity((total_len / fragment_size) + total_len % fragment_size); let mut offset = 0; while offset < total_len { - let end = (offset + FRAGMENT_SIZE).min(total_len); + let end = (offset + fragment_size).min(total_len); let chunk = data.slice(offset..end); frames.push(Frame::message(chunk, OpCode::Data(Data::Continue), false)); offset = end; @@ -1224,7 +1248,7 @@ async fn ws_encode_message( } } - (metrics, msg_alloc, frames) + frames } #[derive(Debug)] @@ -1809,6 +1833,71 @@ mod tests { assert_eq!(received.len(), 10); } + #[tokio::test] + async fn send_loop_interleaves_pings_with_frames() { + let state = Arc::new(dummy_actor_state()); + let mut received = Vec::new(); + let (messages_tx, messages_rx) = mpsc::channel(1); + let messages = MeteredReceiver::new(messages_rx); + let (unordered_tx, unordered_rx) = mpsc::unbounded_channel(); + + #[derive(From)] + enum OutgoingBytes { + #[allow(unused)] + Error(MessageExecutionError), + Bytes(Bytes), + } + + async fn encoder(mut rx: mpsc::UnboundedReceiver, tx: mpsc::UnboundedSender) { + while let Some(data) = rx.recv().await { + if let OutgoingBytes::Bytes(data) = data { + let frames = fragment(data, Data::Binary, 4096); + for frame in frames { + tx.send(frame).unwrap(); + } + } + } + } + + const MESSAGE_SIZE: usize = 10 * 1024 * 1024; + const FRAME_SIZE: usize = 4096; + const NUM_CONTROL_FRAMES: usize = 2; + + let send_loop = tokio::spawn(async move { + ws_send_loop_inner(state, &mut received, messages, unordered_rx, encoder).await; + received + }); + messages_tx.send(Bytes::from_static(&[1; MESSAGE_SIZE])).await.unwrap(); + // Yield task to give the send loop a chance to receive the message. + tokio::task::yield_now().await; + // Send ping, then close. + unordered_tx.send(UnorderedWsMessage::Ping(Bytes::new())).unwrap(); + unordered_tx + .send(UnorderedWsMessage::Close(CloseFrame { + code: CloseCode::Away, + reason: "we're done".into(), + })) + .unwrap(); + + // Shut down the loop. + drop(messages_tx); + drop(unordered_tx); + let received = send_loop.await.unwrap(); + + let ping_pos = received + .iter() + .position(|message| matches!(message, WsMessage::Ping(_))) + .unwrap(); + log::info!("received={} ping-at={}", received.len(), ping_pos); + assert!(ping_pos > 0); + assert!(ping_pos < received.len() - NUM_CONTROL_FRAMES); + // All frames of the message should have been sent before the close frame. + assert_eq!(received.len(), (MESSAGE_SIZE / FRAME_SIZE) + NUM_CONTROL_FRAMES); + assert!(received + .last() + .is_some_and(|message| matches!(message, WsMessage::Close(_)))); + } + async fn is_pending(fut: &mut (impl Future + Unpin)) -> bool { poll_fn(|cx| Poll::Ready(fut.poll_unpin(cx).is_pending())).await } From aae4469e46f61ef0928089ae2c2fa5d81b7bf090 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 14 Jul 2025 22:40:04 +0200 Subject: [PATCH 07/11] Fix send loop termination while sending frame batch --- crates/client-api/src/routes/subscribe.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index e9d38c4e37f..6c845e496c8 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -1058,7 +1058,7 @@ async fn ws_send_loop_inner( for frame in frames_batch.drain(..n) { if let Err(e) = ws.feed(WsMessage::Frame(frame)).await { log::warn!("error sending frame: {e:#}"); - break; + break 'outer; } } }, From 3b77c1a673ce97d51e3853b4434ce5fa279755f3 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 14 Jul 2025 23:08:12 +0200 Subject: [PATCH 08/11] fixup! Actually interleave control with data frames --- crates/client-api/src/routes/subscribe.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 6c845e496c8..ece935a9231 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -1134,7 +1134,7 @@ async fn ws_encode_task( }; metrics.report(workload, num_rows, stats); - if in_use_bufs.len() <= BUF_POOL_CAPACITY { + if in_use_bufs.len() < BUF_POOL_CAPACITY { in_use_bufs.push(scopeguard::guard(in_use_buf, |in_use| { let buf = in_use.try_reclaim().expect("buffer should be unique"); let _ = buf_pool.push(buf); From a9dbf2d8410ea194fc366fe79ce82156200c67c3 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 14 Jul 2025 23:13:20 +0200 Subject: [PATCH 09/11] fixup! Actually interleave control with data frames --- crates/client-api/src/routes/subscribe.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index ece935a9231..363add3f898 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -1143,7 +1143,7 @@ async fn ws_encode_task( for frame in frames { if outgoing_frames.send(frame).is_err() { - break; + return; } } } From a83c38e88f3f7a42c8a165c0f7efd1b41b7115fd Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 16 Jul 2025 11:16:44 +0200 Subject: [PATCH 10/11] Avoid allocating when fragmenting + add property tests --- Cargo.lock | 1 + crates/client-api/Cargo.toml | 1 + crates/client-api/src/routes/subscribe.rs | 128 +++++++++++++++------- 3 files changed, 89 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7b51af53bd4..4a749035641 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5355,6 +5355,7 @@ dependencies = [ "mime", "pretty_assertions", "prometheus", + "proptest", "rand 0.9.1", "regex", "scopeguard", diff --git a/crates/client-api/Cargo.toml b/crates/client-api/Cargo.toml index 51324e087ed..d94b26de3b9 100644 --- a/crates/client-api/Cargo.toml +++ b/crates/client-api/Cargo.toml @@ -57,3 +57,4 @@ jemalloc_pprof.workspace = true [dev-dependencies] jsonwebtoken.workspace = true pretty_assertions = { workspace = true, features = ["unstable"] } +proptest.workspace = true diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 363add3f898..da7d82cb9b9 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -1117,35 +1117,37 @@ async fn ws_encode_task( // or create a fresh one. let buf = buf_pool.pop().unwrap_or_else(|| SerializeBuffer::new(config)); - let (workload, num_rows, stats, in_use_buf, frames) = match message { + let in_use_buf = match message { OutboundMessage::Error(message) => { - let (stats, in_use, frames) = ws_encode_message(config, buf, message, false).await; - (None, None, stats, in_use, frames) + let (stats, in_use, mut frames) = ws_encode_message(config, buf, message, false).await; + metrics.report(None, None, stats); + if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() { + break; + } + + in_use } OutboundMessage::Message(message) => { let workload = message.workload(); let num_rows = message.num_rows(); let is_large = num_rows.is_some_and(|n| n > 1024); - let (stats, in_use, frames) = ws_encode_message(config, buf, message, is_large).await; + let (stats, in_use, mut frames) = ws_encode_message(config, buf, message, is_large).await; + metrics.report(workload, num_rows, stats); + if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() { + break; + } - (workload, num_rows, stats, in_use, frames) + in_use } }; - metrics.report(workload, num_rows, stats); if in_use_bufs.len() < BUF_POOL_CAPACITY { in_use_bufs.push(scopeguard::guard(in_use_buf, |in_use| { let buf = in_use.try_reclaim().expect("buffer should be unique"); let _ = buf_pool.push(buf); })); } - - for frame in frames { - if outgoing_frames.send(frame).is_err() { - return; - } - } } } @@ -1187,7 +1189,7 @@ async fn ws_encode_message( buf: SerializeBuffer, message: impl ToProtocol + Send + 'static, is_large_message: bool, -) -> (EncodeMetrics, InUseSerializeBuffer, Vec) { +) -> (EncodeMetrics, InUseSerializeBuffer, impl Iterator) { const FRAGMENT_SIZE: usize = 4096; let serialize_and_compress = |serialize_buf, message, config| { @@ -1219,36 +1221,18 @@ async fn ws_encode_message( /// according to the rules laid out in [RFC6455], Section 5.4. /// /// [RFC6455]: https://datatracker.ietf.org/doc/html/rfc6455#section-5.4 -fn fragment(data: Bytes, ty: Data, fragment_size: usize) -> Vec { - let total_len = data.len(); - let mut frames = Vec::with_capacity((total_len / fragment_size) + total_len % fragment_size); - let mut offset = 0; - while offset < total_len { - let end = (offset + fragment_size).min(total_len); - let chunk = data.slice(offset..end); - frames.push(Frame::message(chunk, OpCode::Data(Data::Continue), false)); - offset = end; - } +fn fragment(data: Bytes, ty: Data, fragment_size: usize) -> impl Iterator { + let len = data.len(); - match frames.as_mut_slice() { - [] => {} - [single] => { - let hdr = single.header_mut(); - hdr.is_final = true; - hdr.opcode = OpCode::Data(ty); - } - [first, .., last] => { - let hdr = first.header_mut(); - hdr.is_final = false; - hdr.opcode = OpCode::Data(ty); - - let hdr = last.header_mut(); - hdr.is_final = true; - hdr.opcode = OpCode::Data(Data::Continue); - } - } + (0..len).step_by(fragment_size).enumerate().map(move |(i, start)| { + let end = (start + fragment_size).min(len); + let chunk = data.slice(start..end); + + let opcode = OpCode::Data(if i == 0 { ty } else { Data::Continue }); + let is_final = end == len; - frames + Frame::message(chunk, opcode, is_final) + }) } #[derive(Debug)] @@ -1323,11 +1307,13 @@ mod tests { }; use anyhow::anyhow; + use bytes::BytesMut; use futures::{ future::{self, Either, FutureExt as _}, sink, stream, }; use pretty_assertions::assert_matches; + use proptest::prelude::*; use spacetimedb::client::ClientName; use tokio::time::sleep; @@ -1898,6 +1884,66 @@ mod tests { .is_some_and(|message| matches!(message, WsMessage::Close(_)))); } + #[test] + fn fragment_yields_no_frames_if_input_is_empty() { + assert!(fragment(Bytes::new(), Data::Binary, 4096) + .collect::>() + .is_empty()); + } + + const MAX_DATA_SIZE: usize = 1024 * 1024; + const MAX_FRAME_SIZE: usize = 1024; + + proptest! { + #[test] + fn fragment_input_can_be_reconstructed_from_output( + input in prop::collection::vec(any::(), 1..MAX_DATA_SIZE), + fragment_size in 1..MAX_FRAME_SIZE, + ) { + let data = Bytes::from(input); + let mut payloads = BytesMut::new(); + for frame in fragment(data.clone(), Data::Binary, fragment_size) { + payloads.extend(Some(frame.into_payload())); + } + prop_assert_eq!(data, payloads.freeze()); + } + + #[test] + fn fragment_all_frames_except_last_do_not_have_the_fin_bit( + input in prop::collection::vec(any::(), 1..MAX_DATA_SIZE), + fragment_size in 1..MAX_FRAME_SIZE, + ) { + let mut frames = fragment(Bytes::from(input), Data::Binary, fragment_size).collect::>(); + prop_assert!(frames.pop().unwrap().header().is_final); + prop_assert!(frames.into_iter().all(|frame| !frame.header().is_final)); + } + + #[test] + fn fragment_first_frame_has_original_opcode_rest_are_continue( + input in prop::collection::vec(any::(), 1..MAX_DATA_SIZE), + fragment_size in 1..MAX_FRAME_SIZE, + ty in Just(Data::Text).prop_union(Just(Data::Binary)), + ) { + let mut frames = fragment(Bytes::from(input), ty, fragment_size); + prop_assert_eq!(frames.next().unwrap().header().opcode, OpCode::Data(ty)); + for frame in frames { + prop_assert_eq!(frame.header().opcode, OpCode::Data(Data::Continue)); + } + } + + #[test] + fn fragment_produces_expected_number_of_equal_sized_frames( + input in prop::collection::vec(any::(), 1..MAX_DATA_SIZE), + fragment_size in 1..MAX_FRAME_SIZE, + ) { + let input = Bytes::from(input); + let mut frames = fragment(input.clone(), Data::Binary, fragment_size).collect::>(); + prop_assert_eq!(frames.len(), input.len().div_ceil(fragment_size)); + prop_assert!(frames.pop().unwrap().payload().len() <= fragment_size); + prop_assert!(frames.iter().all(|frame| frame.payload().len() == fragment_size)); + } + } + async fn is_pending(fut: &mut (impl Future + Unpin)) -> bool { poll_fn(|cx| Poll::Ready(fut.poll_unpin(cx).is_pending())).await } From fe0ff032bd0d23742525f39a009e421e27b2c832 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Thu, 17 Jul 2025 11:04:24 +0200 Subject: [PATCH 11/11] Put the state updates after close, like before --- crates/client-api/src/routes/subscribe.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 7aea07a1b6e..107d99efd6d 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -1048,11 +1048,6 @@ async fn ws_send_loop_inner( match msg { UnorderedWsMessage::Close(close_frame) => { log::trace!("intiating close"); - state.close(); - // We won't be polling `messages` anymore, - // so let senders know. - messages.close(); - // Send outstanding frames until one that has the FIN // bit set. Ensures the client won't receive partial // messages before we shut down. @@ -1074,6 +1069,20 @@ async fn ws_send_loop_inner( log::warn!("error sending close frame: {e:#}"); break; } + + // Lastly, update the state. + // + // NOTE: It's ok to not update the state if we fail to + // send the close frame, because we assume that the main + // loop with exit when this future terminates. + // We shouldn't set the state to closed before sending + // the close frame, however, as we would start dropping + // messages immediately (defeating the purpose of the + // close handshake). + state.close(); + // We won't be polling `messages` anymore, + // so let senders know. + messages.close(); }, UnorderedWsMessage::Ping(bytes) => { log::trace!("sending ping");