diff --git a/Cargo.lock b/Cargo.lock index 5265b1cdacc..29568ab1fc3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7443,6 +7443,7 @@ dependencies = [ "bytes", "bytestring", "chrono", + "crossbeam-queue", "derive_more 0.99.20", "email_address", "futures", @@ -7458,6 +7459,7 @@ dependencies = [ "mime", "pretty_assertions", "prometheus", + "proptest", "rand 0.9.2", "regex", "scopeguard", diff --git a/crates/client-api/Cargo.toml b/crates/client-api/Cargo.toml index 59c9e636d94..874dab0ccbd 100644 --- a/crates/client-api/Cargo.toml +++ b/crates/client-api/Cargo.toml @@ -52,6 +52,7 @@ jsonwebtoken.workspace = true scopeguard.workspace = true serde_with.workspace = true async-stream.workspace = true +crossbeam-queue.workspace = true humantime.workspace = true thiserror.workspace = true @@ -61,6 +62,7 @@ jemalloc_pprof.workspace = true [dev-dependencies] jsonwebtoken.workspace = true pretty_assertions = { workspace = true, features = ["unstable"] } +proptest.workspace = true toml.workspace = true [lints] diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 94c152d64db..377c598fb14 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -1,5 +1,5 @@ use std::fmt::Display; -use std::future::{poll_fn, Future}; +use std::future::Future; use std::num::NonZeroUsize; use std::panic; use std::pin::{pin, Pin}; @@ -15,14 +15,16 @@ use axum::Extension; use axum_extra::TypedHeader; use bytes::Bytes; use bytestring::ByteString; +use crossbeam_queue::ArrayQueue; use derive_more::From; use futures::{pin_mut, Sink, SinkExt, Stream, StreamExt}; use http::{HeaderValue, StatusCode}; -use prometheus::IntGauge; +use prometheus::{Histogram, IntGauge}; use scopeguard::{defer, ScopeGuard}; use serde::Deserialize; use spacetimedb::client::messages::{ - serialize, IdentityTokenMessage, SerializableMessage, SerializeBuffer, SwitchedServerMessage, ToProtocol, + serialize, IdentityTokenMessage, InUseSerializeBuffer, SerializableMessage, SerializeBuffer, SwitchedServerMessage, + ToProtocol, }; use spacetimedb::client::{ ClientActorId, ClientConfig, ClientConnection, ClientConnectionReceiver, DataMessage, MessageExecutionError, @@ -41,6 +43,8 @@ use tokio::sync::{mpsc, watch}; use tokio::task::JoinHandle; use tokio::time::error::Elapsed; use tokio::time::{sleep_until, timeout}; +use tokio_tungstenite::tungstenite::protocol::frame::coding::{Data, OpCode}; +use tokio_tungstenite::tungstenite::protocol::frame::Frame; use tokio_tungstenite::tungstenite::Utf8Bytes; use crate::auth::SpacetimeAuth; @@ -796,6 +800,7 @@ fn ws_recv_loop( if !state.closed() { yield ClientMessage::from_message(m); + continue; } // If closed, keep polling until either: // @@ -993,12 +998,12 @@ enum UnorderedWsMessage { /// Abstraction over [`ClientConnectionReceiver`], so tests can use a plain /// [`mpsc::Receiver`]. -trait Receiver { - fn recv(&mut self) -> impl Future> + Send; +trait Receiver { + fn recv(&mut self) -> impl Future> + Send; fn close(&mut self); } -impl Receiver for ClientConnectionReceiver { +impl Receiver for ClientConnectionReceiver { async fn recv(&mut self) -> Option { ClientConnectionReceiver::recv(self).await } @@ -1008,8 +1013,8 @@ impl Receiver for ClientConnectionReceiver { } } -impl Receiver for mpsc::Receiver { - async fn recv(&mut self) -> Option { +impl Receiver for mpsc::Receiver { + async fn recv(&mut self) -> Option { mpsc::Receiver::recv(self).await } @@ -1042,25 +1047,69 @@ impl Receiver for mpsc::Receiver { async fn ws_send_loop( state: Arc, config: ClientConfig, + ws: impl Sink + Unpin, + messages: impl Receiver, + unordered: mpsc::UnboundedReceiver, +) { + let metrics = SendMetrics::new(state.database); + ws_send_loop_inner(state, ws, messages, unordered, |encode_rx, frames_tx| { + ws_encode_task(metrics, config, encode_rx, frames_tx) + }) + .await +} + +async fn ws_send_loop_inner( + state: Arc, mut ws: impl Sink + Unpin, - mut messages: impl Receiver, + mut messages: impl Receiver, mut unordered: mpsc::UnboundedReceiver, -) { - let mut serialize_buf = SerializeBuffer::new(config); + encoder: impl FnOnce(mpsc::UnboundedReceiver, mpsc::UnboundedSender) -> Encoder, +) where + T: Into, + U: From, + Encoder: Future + Send + 'static, +{ + // The number of frames we'll `feed` to the `ws` sink in one iteration + // of the `select!` loop. + // + // This batching is done to allow control messages appearing on `unordered` + // to be interleaved with the sending of large messages split across some + // number of frames. + // + // This allows clients with slow connections to respond to `Ping`s, and + // avoid timing out, while receiving large messages. + // + // The default frame size is 4KiB, hence we write in batches of 32KiB. + const FRAME_BATCH_SIZE: usize = 8; + let mut frames_batch = Vec::with_capacity(FRAME_BATCH_SIZE); + let (frames_tx, mut frames_rx) = mpsc::unbounded_channel(); - loop { + let (encode_tx, encode_rx) = mpsc::unbounded_channel(); + // Spawn the encode task. + // + // NOTE: It is not technically required to introduce parallelism for + // encoding. We spawn mainly to avoid having to manually poll the `Encoder` + // future in the `select!` loop below, which proved to be quite error + // prone in the past (looking at you, `also_poll`). + tokio::spawn(encoder(encode_rx, frames_tx)); + + 'outer: loop { let closed = state.closed(); tokio::select! { - // `biased` towards the unordered queue, - // which may initiate a connection shutdown. + // `biased` because we want to: + // + // - give control messages precedence + // - and flush outstanding messages + // before taking on more encoding work biased; + // Check for control messages or execution errors. maybe_msg = unordered.recv() => { let Some(msg) = maybe_msg else { break; }; - // We shall not sent more data after a close frame, + // We shall not send more data after a close frame, // but keep polling `unordered` so that `ws_client_actor` keeps // waiting for an acknowledgement from the client, // even if it spuriously initiates another close itself. @@ -1069,14 +1118,38 @@ async fn ws_send_loop( } match msg { UnorderedWsMessage::Close(close_frame) => { + log::trace!("intiating close"); + // Send outstanding frames until one that has the FIN + // bit set. Ensures the client won't receive partial + // messages before we shut down. + log::trace!("draining outgoing frames"); + while let Ok(frame) = frames_rx.try_recv() { + let eof = frame.header().is_final; + if let Err(e) = ws.feed(WsMessage::Frame(frame)).await { + log::warn!("error sending frame: {e:#}"); + break 'outer; + } + + if eof { + break; + } + } + // Then send the close frame. log::trace!("sending close frame"); if let Err(e) = ws.send(WsMessage::Close(Some(close_frame))).await { log::warn!("error sending close frame: {e:#}"); break; } + + // Lastly, update the state. + // // NOTE: It's ok to not update the state if we fail to // send the close frame, because we assume that the main - // loop will exit when this future terminates. + // loop with exit when this future terminates. + // We shouldn't set the state to closed before sending + // the close frame, however, as we would start dropping + // messages immediately (defeating the purpose of the + // close handshake). state.close(); // We won't be polling `messages` anymore, // so let senders know. @@ -1090,56 +1163,47 @@ async fn ws_send_loop( } }, UnorderedWsMessage::Error(err) => { - log::trace!("sending error result"); - let (msg_alloc, res) = send_message( - &state.database, - config, - serialize_buf, - None, - &mut ws, - err - ).await; - serialize_buf = msg_alloc; - - if let Err(e) = res { - log::warn!("websocket send error: {e}"); - break; - } + log::trace!("encoding execution error"); + encode_tx + .send(err.into()) + // `ws_encode_task` shouldn't terminate until + // `encode_tx` is dropped, except by panicking. + .expect("encode task panicked"); }, } }, - maybe_message = messages.recv(), if !closed => { - let Some(message) = maybe_message else { - // The message sender was dropped, even though no close - // handshake is in progress. This should not normally happen, - // but initiating close seems like the correct thing to do. - log::warn!("message sender dropped without close handshake"); - if let Err(e) = ws.send(WsMessage::Close(None)).await { - log::warn!("error sending close frame: {e:#}"); - break; + // Send a batch of frames. + // + // Branch is disabled if we already sent a close frame. + // + // TODO: If the client sent us a close frame and we're in the middle + // of a large message, we may not send them the whole message. + // If that turns out to be a problem, we'll need to keep track of + // which side initiated the close handshake. + // Unsure if `tungstenite` will support us here, i.e. allows to keep + // sending when the other side initiated the close. + n = frames_rx.recv_many(&mut frames_batch, FRAME_BATCH_SIZE), if !closed => { + log::trace!("sending batch of {n} frames"); + for frame in frames_batch.drain(..n) { + if let Err(e) = ws.feed(WsMessage::Frame(frame)).await { + log::warn!("error sending frame: {e:#}"); + break 'outer; } - state.close(); - // Continue so that `ws_client_actor` keeps waiting for an - // acknowledgement from the client. - continue; - }; - log::trace!("sending outgoing message"); - let (msg_alloc, res) = send_message( - &state.database, - config, - serialize_buf, - message.workload().zip(message.num_rows()), - &mut ws, - message - ).await; - serialize_buf = msg_alloc; - - if let Err(e) = res { - log::warn!("websocket send error: {e}"); - return; } }, + + // Take on more work. + // + // Branch is disabled if we already sent a close frame. + Some(message) = messages.recv(), if !closed => { + encode_tx + .send(message.into()) + // `ws_encode_task` shouldn't terminate until + // `encode_tx` is dropped, except by panicking. + .expect("encode task panicked"); + }, + } if let Err(e) = ws.flush().await { @@ -1149,47 +1213,156 @@ async fn ws_send_loop( } } -/// Serialize and potentially compress `message`, and feed it to the `ws` sink. -async fn send_message + Unpin>( - database_identity: &Identity, +#[derive(From)] +enum OutboundMessage { + Error(MessageExecutionError), + Message(SerializableMessage), +} + +/// Task that reads [`OutboundMessage`]s from `messages`, encodes them via +/// [`ws_encode_message`], and sends the resuling [`Frame`]s to `outgoing_frames`. +/// +/// Meant to be [`tokio::spawn`]ed. +/// +/// The function also takes care of reusing serialization buffers and reporting +/// metrics via [`SendMetrics`].. +async fn ws_encode_task( + metrics: SendMetrics, + config: ClientConfig, + mut messages: mpsc::UnboundedReceiver, + outgoing_frames: mpsc::UnboundedSender, +) { + // Serialize buffers can be reclaimed once all frames of a message are + // copied to the wire. Since we don't know when that will happen, we prepare + // for a few messages to be in-flight, i.e. encoded but not yet sent. + const BUF_POOL_CAPACITY: usize = 16; + let buf_pool = ArrayQueue::new(BUF_POOL_CAPACITY); + let mut in_use_bufs: Vec> = Vec::with_capacity(BUF_POOL_CAPACITY); + + while let Some(message) = messages.recv().await { + // Drop serialize buffers with no external referent, + // returning them to the pool. + in_use_bufs.retain(|in_use| !in_use.is_unique()); + // Get a serialize buffer from the pool, + // or create a fresh one. + let buf = buf_pool.pop().unwrap_or_else(|| SerializeBuffer::new(config)); + + let in_use_buf = match message { + OutboundMessage::Error(message) => { + let (stats, in_use, mut frames) = ws_encode_message(config, buf, message, false).await; + metrics.report(None, None, stats); + if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() { + break; + } + + in_use + } + OutboundMessage::Message(message) => { + let workload = message.workload(); + let num_rows = message.num_rows(); + let is_large = num_rows.is_some_and(|n| n > 1024); + + let (stats, in_use, mut frames) = ws_encode_message(config, buf, message, is_large).await; + metrics.report(workload, num_rows, stats); + if frames.try_for_each(|frame| outgoing_frames.send(frame)).is_err() { + break; + } + + in_use + } + }; + + if in_use_bufs.len() < BUF_POOL_CAPACITY { + in_use_bufs.push(scopeguard::guard(in_use_buf, |in_use| { + let buf = in_use.try_reclaim().expect("buffer should be unique"); + let _ = buf_pool.push(buf); + })); + } + } +} + +/// Some stats about serialization and compression. +/// +/// Returned by [`ws_encode_message`]. +struct EncodeMetrics { + /// Time it took to serialize and (potentially) compress a message. + /// Does not include scheduling overhead. + timing: Duration, + /// Length in bytes of the serialized and (potentially) compressed message. + encoded_len: usize, +} + +/// Encodes `message` into zero or more WebSocket [`Frame`]s. +/// +/// The `message` is first [`serialize`]d. Depending on the serialized size, +/// client `config` and format (see [`SwitchedServerMessage`]), compression may +/// be applied to the serialized bytes. +/// +/// If `is_large_message` is true, serialization and compression is performed +/// on a `rayon` thread. The value should be chosen s.t. the overhead of +/// scheduling is expected to be lower than the overhead of compression itself. +/// +/// The resulting bytes are then split into [`Frame`]s of at most 4096 bytes +/// of payload each, according to the rules laid out in [RFC6455], Section +/// 5.4 Fragmentation. +/// +/// Returns [`EncodeMetrics`], the [`InUseSerializeBuffer`] that was passed in +/// as `buf` for later reuse, and the [`Frame`]s. +/// +/// NOTE: When sending, the frames of a single message MUST NOT be interleaved +/// with the frames of another message, except for control frames (`Close`, +/// `Ping`, `Pong`). +/// +/// [RFC6455]: https://datatracker.ietf.org/doc/html/rfc6455#section-5.4 +async fn ws_encode_message( config: ClientConfig, - serialize_buf: SerializeBuffer, - metrics_metadata: Option<(WorkloadType, usize)>, - ws: &mut S, + buf: SerializeBuffer, message: impl ToProtocol + Send + 'static, -) -> (SerializeBuffer, Result<(), S::Error>) { - let (workload, num_rows) = metrics_metadata.unzip(); - // Move large messages to a rayon thread, - // as serialization and compression can take a long time. - // The threshold of 1024 rows is arbitrary, and may need to be refined. + is_large_message: bool, +) -> (EncodeMetrics, InUseSerializeBuffer, impl Iterator) { + const FRAGMENT_SIZE: usize = 4096; + let serialize_and_compress = |serialize_buf, message, config| { let start = Instant::now(); let (msg_alloc, msg_data) = serialize(serialize_buf, message, config); (start.elapsed(), msg_alloc, msg_data) }; - let (timing, msg_alloc, msg_data) = if num_rows.is_some_and(|n| n > 1024) { - spawn_rayon(move || serialize_and_compress(serialize_buf, message, config)).await + let (timing, msg_alloc, msg_data) = if is_large_message { + spawn_rayon(move || serialize_and_compress(buf, message, config)).await } else { - serialize_and_compress(serialize_buf, message, config) + serialize_and_compress(buf, message, config) }; - report_ws_sent_metrics(database_identity, workload, num_rows, timing, &msg_data); - - let res = async { - ws.feed(datamsg_to_wsmsg(msg_data)).await?; - // To reclaim the `msg_alloc` memory, we need `SplitSink` to push down - // its item slot to the inner sink, which will copy the `Bytes` and - // drop the reference. - // We don't want to flush the inner sink just yet, as we might be - // writing many messages. - // `SplitSink::poll_ready` does what we want. - poll_fn(|cx| ws.poll_ready_unpin(cx)).await - } - .await; - // Reclaim can fail if we didn't succeed pushing down the data to the - // websocket. We must return a buffer, though, so create a fresh one. - let buf = msg_alloc.try_reclaim().unwrap_or_else(|| SerializeBuffer::new(config)); - (buf, res) + let metrics = EncodeMetrics { + timing, + encoded_len: msg_data.len(), + }; + + let (data, ty) = match msg_data { + DataMessage::Text(text) => (bytestring_to_utf8bytes(text).into(), Data::Text), + DataMessage::Binary(bin) => (bin, Data::Binary), + }; + let frames = fragment(data, ty, FRAGMENT_SIZE); + + (metrics, msg_alloc, frames) +} + +/// Split payload `data` of type `ty` into `fragment_size`d [`Frame`]s, +/// according to the rules laid out in [RFC6455], Section 5.4. +/// +/// [RFC6455]: https://datatracker.ietf.org/doc/html/rfc6455#section-5.4 +fn fragment(data: Bytes, ty: Data, fragment_size: usize) -> impl Iterator { + let len = data.len(); + + (0..len).step_by(fragment_size).enumerate().map(move |(i, start)| { + let end = (start + fragment_size).min(len); + let chunk = data.slice(start..end); + + let opcode = OpCode::Data(if i == 0 { ty } else { Data::Continue }); + let is_final = end == len; + + Frame::message(chunk, opcode, is_final) + }) } #[derive(Debug)] @@ -1214,37 +1387,34 @@ impl ClientMessage { } } -/// Report metrics on sent rows and message sizes to a websocket client. -fn report_ws_sent_metrics( - addr: &Identity, - workload: Option, - num_rows: Option, - serialize_duration: Duration, - msg_ws: &DataMessage, -) { - // These metrics should be updated together, - // or not at all. - if let (Some(workload), Some(num_rows)) = (workload, num_rows) { - WORKER_METRICS - .websocket_sent_num_rows - .with_label_values(addr, &workload) - .observe(num_rows as f64); - WORKER_METRICS - .websocket_sent_msg_size - .with_label_values(addr, &workload) - .observe(msg_ws.len() as f64); - } - - WORKER_METRICS - .websocket_serialize_secs - .with_label_values(addr) - .observe(serialize_duration.as_secs_f64()); +struct SendMetrics { + database: Identity, + encode_timing: Histogram, } -fn datamsg_to_wsmsg(msg: DataMessage) -> WsMessage { - match msg { - DataMessage::Text(text) => WsMessage::Text(bytestring_to_utf8bytes(text)), - DataMessage::Binary(bin) => WsMessage::Binary(bin), +impl SendMetrics { + fn new(database: Identity) -> Self { + Self { + encode_timing: WORKER_METRICS.websocket_serialize_secs.with_label_values(&database), + database, + } + } + + fn report(&self, workload: Option, num_rows: Option, encode: EncodeMetrics) { + self.encode_timing.observe(encode.timing.as_secs_f64()); + + // These metrics should be updated together, + // or not at all. + if let (Some(workload), Some(num_rows)) = (workload, num_rows) { + WORKER_METRICS + .websocket_sent_num_rows + .with_label_values(&self.database, &workload) + .observe(num_rows as f64); + WORKER_METRICS + .websocket_sent_msg_size + .with_label_values(&self.database, &workload) + .observe(encode.encoded_len as f64); + } } } @@ -1260,18 +1430,20 @@ fn bytestring_to_utf8bytes(s: ByteString) -> Utf8Bytes { #[cfg(test)] mod tests { use std::{ - future::Future, + future::{poll_fn, Future}, pin::Pin, sync::atomic::AtomicUsize, task::{Context, Poll}, }; use anyhow::anyhow; + use bytes::BytesMut; use futures::{ future::{self, Either, FutureExt as _}, sink, stream, }; use pretty_assertions::assert_matches; + use proptest::prelude::*; use spacetimedb::client::{messages::SerializableMessage, ClientName}; use tokio::time::sleep; @@ -1789,6 +1961,130 @@ mod tests { assert_eq!(received.len(), 10); } + #[tokio::test] + async fn send_loop_interleaves_pings_with_frames() { + let state = Arc::new(dummy_actor_state()); + let mut received = Vec::new(); + let (messages_tx, messages_rx) = mpsc::channel(1); + let (unordered_tx, unordered_rx) = mpsc::unbounded_channel(); + + #[derive(From)] + enum OutgoingBytes { + #[allow(unused)] + Error(MessageExecutionError), + Bytes(Bytes), + } + + async fn encoder(mut rx: mpsc::UnboundedReceiver, tx: mpsc::UnboundedSender) { + while let Some(data) = rx.recv().await { + if let OutgoingBytes::Bytes(data) = data { + let frames = fragment(data, Data::Binary, 4096); + for frame in frames { + tx.send(frame).unwrap(); + } + } + } + } + + const MESSAGE_SIZE: usize = 10 * 1024 * 1024; + const FRAME_SIZE: usize = 4096; + const NUM_CONTROL_FRAMES: usize = 2; + + let send_loop = tokio::spawn(async move { + ws_send_loop_inner(state, &mut received, messages_rx, unordered_rx, encoder).await; + received + }); + messages_tx.send(Bytes::from_static(&[1; MESSAGE_SIZE])).await.unwrap(); + // Yield task to give the send loop a chance to receive the message. + tokio::task::yield_now().await; + // Send ping, then close. + unordered_tx.send(UnorderedWsMessage::Ping(Bytes::new())).unwrap(); + unordered_tx + .send(UnorderedWsMessage::Close(CloseFrame { + code: CloseCode::Away, + reason: "we're done".into(), + })) + .unwrap(); + + // Shut down the loop. + drop(messages_tx); + drop(unordered_tx); + let received = send_loop.await.unwrap(); + + let ping_pos = received + .iter() + .position(|message| matches!(message, WsMessage::Ping(_))) + .unwrap(); + log::info!("received={} ping-at={}", received.len(), ping_pos); + assert!(ping_pos > 0); + assert!(ping_pos < received.len() - NUM_CONTROL_FRAMES); + // All frames of the message should have been sent before the close frame. + assert_eq!(received.len(), (MESSAGE_SIZE / FRAME_SIZE) + NUM_CONTROL_FRAMES); + assert!(received + .last() + .is_some_and(|message| matches!(message, WsMessage::Close(_)))); + } + + #[test] + fn fragment_yields_no_frames_if_input_is_empty() { + assert!(fragment(Bytes::new(), Data::Binary, 4096) + .collect::>() + .is_empty()); + } + + const MAX_DATA_SIZE: usize = 1024 * 1024; + const MAX_FRAME_SIZE: usize = 1024; + + proptest! { + #[test] + fn fragment_input_can_be_reconstructed_from_output( + input in prop::collection::vec(any::(), 1..MAX_DATA_SIZE), + fragment_size in 1..MAX_FRAME_SIZE, + ) { + let data = Bytes::from(input); + let mut payloads = BytesMut::new(); + for frame in fragment(data.clone(), Data::Binary, fragment_size) { + payloads.extend(Some(frame.into_payload())); + } + prop_assert_eq!(data, payloads.freeze()); + } + + #[test] + fn fragment_all_frames_except_last_do_not_have_the_fin_bit( + input in prop::collection::vec(any::(), 1..MAX_DATA_SIZE), + fragment_size in 1..MAX_FRAME_SIZE, + ) { + let mut frames = fragment(Bytes::from(input), Data::Binary, fragment_size).collect::>(); + prop_assert!(frames.pop().unwrap().header().is_final); + prop_assert!(frames.into_iter().all(|frame| !frame.header().is_final)); + } + + #[test] + fn fragment_first_frame_has_original_opcode_rest_are_continue( + input in prop::collection::vec(any::(), 1..MAX_DATA_SIZE), + fragment_size in 1..MAX_FRAME_SIZE, + ty in Just(Data::Text).prop_union(Just(Data::Binary)), + ) { + let mut frames = fragment(Bytes::from(input), ty, fragment_size); + prop_assert_eq!(frames.next().unwrap().header().opcode, OpCode::Data(ty)); + for frame in frames { + prop_assert_eq!(frame.header().opcode, OpCode::Data(Data::Continue)); + } + } + + #[test] + fn fragment_produces_expected_number_of_equal_sized_frames( + input in prop::collection::vec(any::(), 1..MAX_DATA_SIZE), + fragment_size in 1..MAX_FRAME_SIZE, + ) { + let input = Bytes::from(input); + let mut frames = fragment(input.clone(), Data::Binary, fragment_size).collect::>(); + prop_assert_eq!(frames.len(), input.len().div_ceil(fragment_size)); + prop_assert!(frames.pop().unwrap().payload().len() <= fragment_size); + prop_assert!(frames.iter().all(|frame| frame.payload().len() == fragment_size)); + } + } + async fn is_pending(fut: &mut (impl Future + Unpin)) -> bool { poll_fn(|cx| Poll::Ready(fut.poll_unpin(cx).is_pending())).await } diff --git a/crates/core/src/client/messages.rs b/crates/core/src/client/messages.rs index eb2ace727e5..811d87dc1cb 100644 --- a/crates/core/src/client/messages.rs +++ b/crates/core/src/client/messages.rs @@ -119,6 +119,13 @@ impl InUseSerializeBuffer { compressed, }) } + + pub fn is_unique(&self) -> bool { + match self { + InUseSerializeBuffer::Uncompressed { uncompressed, .. } => uncompressed.is_unique(), + InUseSerializeBuffer::Compressed { compressed, .. } => compressed.is_unique(), + } + } } /// Serialize `msg` into a [`DataMessage`] containing a [`ws::ServerMessage`].