From 361b4c4a1cc9be12ee1e1362dcd6b761056d77bf Mon Sep 17 00:00:00 2001 From: Zeke Foppa Date: Fri, 13 Jun 2025 18:15:48 -0700 Subject: [PATCH 01/10] [release/v1.2.0]: empty commit to allow PR creation From fcb83516cda5fed5ed4e756811d60dd5edf69e3b Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Sat, 21 Jun 2025 08:43:45 -0700 Subject: [PATCH 02/10] Rewrite unoptimized remote queries --- crates/core/src/host/module_host.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 98a789bddfb..5ff7d838bef 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1019,7 +1019,23 @@ impl ModuleHost { let db = replica_ctx.relational_db.clone(); let subscriptions = replica_ctx.subscriptions.clone(); let auth = AuthCtx::new(replica_ctx.owner_identity, caller_identity); - log::debug!("One-off query: {query}"); + + const BUILDING_STATE_SQL: &str = "SELECT location_state.* FROM location_state JOIN building_state ON building_state.entity_id = location_state.entity_id"; + const CLAIM_STATE_SQL: &str = "SELECT location_state.* FROM location_state JOIN claim_state ON claim_state.owner_building_entity_id = location_state.entity_id"; + + const OPT_BUILDING_STATE_SQL: &str = "SELECT location_state.* FROM building_state JOIN location_state ON building_state.entity_id = location_state.entity_id"; + const OPT_CLAIM_STATE_SQL: &str = "SELECT location_state.* FROM claim_state JOIN location_state ON claim_state.owner_building_entity_id = location_state.entity_id"; + + let query = if query == BUILDING_STATE_SQL { + OPT_BUILDING_STATE_SQL.to_owned() + } else if query == CLAIM_STATE_SQL { + OPT_CLAIM_STATE_SQL.to_owned() + } else { + query + }; + + log::info!("One-off query: {query}"); + let metrics = asyncify(move || { db.with_read_only(Workload::Sql, |tx| { // We wrap the actual query in a closure so we can use ? to handle errors without making From 208756e3f7d8d46c6722aa1822c76760cb553d4b Mon Sep 17 00:00:00 2001 From: Roberto Pommella Alegro Date: Sun, 22 Jun 2025 13:09:41 -0300 Subject: [PATCH 03/10] fix: comment logs that are spamming for bitcraft (#2887) --- .../src/host/wasm_common/module_host_actor.rs | 29 ++++++++++--------- .../module_subscription_manager.rs | 4 +-- ...ient_connected_error_rejects_connection.py | 2 +- smoketests/tests/panic.py | 2 +- 4 files changed, 19 insertions(+), 18 deletions(-) diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index ca4a70ef34e..185f65703cd 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -7,7 +7,8 @@ use std::sync::Arc; use std::time::Duration; use super::instrumentation::CallTimes; -use crate::database_logger::{self, SystemLogger}; +// use crate::database_logger; +use crate::database_logger::SystemLogger; use crate::db::datastore::locking_tx_datastore::MutTxId; use crate::db::datastore::traits::{IsolationLevel, Program}; use crate::db::db_metrics::DB_METRICS; @@ -460,19 +461,19 @@ impl WasmModuleInstance { } } Ok(Err(errmsg)) => { - log::info!("reducer returned error: {errmsg}"); - - self.replica_context().logger.write( - database_logger::LogLevel::Error, - &database_logger::Record { - ts: chrono::DateTime::from_timestamp_micros(timestamp.to_micros_since_unix_epoch()).unwrap(), - target: Some(reducer_name), - filename: None, - line_number: None, - message: &errmsg, - }, - &(), - ); + // log::info!("reducer returned error: {errmsg}"); + + // self.replica_context().logger.write( + // database_logger::LogLevel::Error, + // &database_logger::Record { + // ts: chrono::DateTime::from_timestamp_micros(timestamp.to_micros_since_unix_epoch()).unwrap(), + // target: Some(reducer_name), + // filename: None, + // line_number: None, + // message: &errmsg, + // }, + // &(), + // ); EventStatus::Failed(errmsg.into()) } // We haven't actually committed yet - `commit_and_broadcast_event` will commit diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 0bb2d9d0082..05b525f740b 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -1521,8 +1521,8 @@ impl SendWorker { } fn send_to_client(client: &ClientConnectionSender, message: impl Into) { - if let Err(e) = client.send_message(message) { - tracing::warn!(%client.id, "failed to send update message to client: {e}") + if let Err(_e) = client.send_message(message) { + // tracing::warn!(%client.id, "failed to send update message to client: {e}") } } diff --git a/smoketests/tests/client_connected_error_rejects_connection.py b/smoketests/tests/client_connected_error_rejects_connection.py index 8654643ad19..093cf884552 100644 --- a/smoketests/tests/client_connected_error_rejects_connection.py +++ b/smoketests/tests/client_connected_error_rejects_connection.py @@ -36,7 +36,7 @@ def test_client_connected_error_rejects_connection(self): self.subscribe("select * from all_u8s", n = 0)() logs = self.logs(100) - self.assertIn('Rejecting connection from client', logs) + # self.assertIn('Rejecting connection from client', logs) self.assertNotIn('This should never be called, since we reject all connections!', logs) class ClientDisconnectedErrorStillDeletesStClient(Smoketest): diff --git a/smoketests/tests/panic.py b/smoketests/tests/panic.py index 0fd917340b3..6902f0d36b4 100644 --- a/smoketests/tests/panic.py +++ b/smoketests/tests/panic.py @@ -47,4 +47,4 @@ def test_reducer_error_message(self): with self.assertRaises(Exception): self.call("fail") - self.assertIn("oopsie :(", self.logs(2)) + # self.assertIn("oopsie :(", self.logs(2)) From 27a85514422c8265d8d12fa7390133b5bf4d55f1 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 25 Jun 2025 15:40:24 -0700 Subject: [PATCH 04/10] Fix query overwrites in the subscription manager --- .../subscription/module_subscription_actor.rs | 124 ++++++++++++++++-- .../module_subscription_manager.rs | 23 +++- 2 files changed, 129 insertions(+), 18 deletions(-) diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index b027c07c2fb..e963ad82a88 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -554,7 +554,7 @@ impl ModuleSubscriptions { fn compile_queries( &self, sender: Identity, - queries: impl IntoIterator>, + queries: &[Box], num_queries: usize, metrics: &SubscriptionMetrics, ) -> Result<(Vec>, AuthCtx, TxId, HistogramTimer), DBError> { @@ -563,12 +563,13 @@ impl ModuleSubscriptions { let mut query_hashes = Vec::with_capacity(num_queries); for sql in queries { - if is_subscribe_to_all_tables(&sql) { + let sql = sql.trim(); + if is_subscribe_to_all_tables(sql) { subscribe_to_all_tables = true; continue; } - let hash = QueryHash::from_string(&sql, sender, false); - let hash_with_param = QueryHash::from_string(&sql, sender, true); + let hash = QueryHash::from_string(sql, sender, false); + let hash_with_param = QueryHash::from_string(sql, sender, true); query_hashes.push((sql, hash, hash_with_param)); } @@ -606,10 +607,10 @@ impl ModuleSubscriptions { plans.push(unit); } else { plans.push(Arc::new( - compile_query_with_hashes(&auth, &tx, &sql, hash, hash_with_param).map_err(|err| { + compile_query_with_hashes(&auth, &tx, sql, hash, hash_with_param).map_err(|err| { DBError::WithSql { error: Box::new(DBError::Other(err.into())), - sql, + sql: sql.into(), } })?, )); @@ -670,7 +671,7 @@ impl ModuleSubscriptions { let (queries, auth, tx, compile_timer) = return_on_err!( self.compile_queries( sender.id.identity, - request.query_strings, + &request.query_strings, num_queries, &subscription_metrics ), @@ -767,7 +768,7 @@ impl ModuleSubscriptions { let (queries, auth, tx, compile_timer) = self.compile_queries( sender.id.identity, - subscription.query_strings, + &subscription.query_strings, num_queries, &subscription_metrics, )?; @@ -2331,9 +2332,9 @@ mod tests { Ok(()) } - /// Test that one client unsubscribing does not affect another + /// Test that one client subscribing does not affect another #[tokio::test] - async fn test_unsubscribe() -> anyhow::Result<()> { + async fn test_subscribe_distinct_queries_same_plan() -> anyhow::Result<()> { // Establish a connection for each client let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(1)); let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(2)); @@ -2341,7 +2342,7 @@ mod tests { let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); - let u_id = db.create_table_for_test( + let u_id = db.create_table_for_test_with_the_works( "u", &[ ("i", AlgebraicType::U64), @@ -2349,8 +2350,12 @@ mod tests { ("b", AlgebraicType::U64), ], &[0.into()], + // The join column for this table does not have to be unique, + // because pruning only requires us to probe the join index on `v`. + &[], + StAccess::Public, )?; - let v_id = db.create_table_for_test( + let v_id = db.create_table_for_test_with_the_works( "v", &[ ("i", AlgebraicType::U64), @@ -2358,21 +2363,116 @@ mod tests { ("y", AlgebraicType::U64), ], &[0.into(), 1.into()], + &[0.into()], + StAccess::Public, )?; commit_tx(&db, &subs, [], [(v_id, product![1u64, 1u64, 1u64])])?; let mut query_ids = 0; + // Both clients subscribe to the same query modulo whitespace subscribe_multi( &subs, &["select u.* from u join v on u.i = v.i where v.x = 1"], tx_for_a, &mut query_ids, )?; + subscribe_multi( + &subs, + &["select u.* from u join v on u.i = v.i where v.x = 1"], + tx_for_b.clone(), + &mut query_ids, + )?; + + // Wait for both subscriptions + assert_matches!( + rx_for_a.recv().await, + Some(SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(_), + .. + })) + ); + assert_matches!( + rx_for_b.recv().await, + Some(SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(_), + .. + })) + ); + + // Insert a new row into `u` + commit_tx(&db, &subs, [], [(u_id, product![1u64, 0u64, 0u64])])?; + + assert_tx_update_for_table( + &mut rx_for_a, + u_id, + &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]), + [product![1u64, 0u64, 0u64]], + [], + ) + .await; + + assert_tx_update_for_table( + &mut rx_for_b, + u_id, + &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]), + [product![1u64, 0u64, 0u64]], + [], + ) + .await; + + Ok(()) + } + + /// Test that one client unsubscribing does not affect another + #[tokio::test] + async fn test_unsubscribe_distinct_queries_same_plan() -> anyhow::Result<()> { + // Establish a connection for each client + let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(1)); + let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(2)); + + let db = relational_db()?; + let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); + + let u_id = db.create_table_for_test_with_the_works( + "u", + &[ + ("i", AlgebraicType::U64), + ("a", AlgebraicType::U64), + ("b", AlgebraicType::U64), + ], + &[0.into()], + // The join column for this table does not have to be unique, + // because pruning only requires us to probe the join index on `v`. + &[], + StAccess::Public, + )?; + let v_id = db.create_table_for_test_with_the_works( + "v", + &[ + ("i", AlgebraicType::U64), + ("x", AlgebraicType::U64), + ("y", AlgebraicType::U64), + ], + &[0.into(), 1.into()], + &[0.into()], + StAccess::Public, + )?; + + commit_tx(&db, &subs, [], [(v_id, product![1u64, 1u64, 1u64])])?; + + let mut query_ids = 0; + subscribe_multi( &subs, &["select u.* from u join v on u.i = v.i where v.x = 1"], + tx_for_a, + &mut query_ids, + )?; + subscribe_multi( + &subs, + &["select u.* from u join v on u.i = v.i where v.x = 1"], tx_for_b.clone(), &mut query_ids, )?; diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 05b525f740b..9e6a1046fd5 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -399,7 +399,7 @@ impl QueriedTableIndexIds { /// See [`JoinEdge`] for more details. #[derive(Debug, Default)] pub struct JoinEdges { - edges: BTreeMap>, + edges: BTreeMap>>, } impl JoinEdges { @@ -408,7 +408,12 @@ impl JoinEdges { let mut inserted = false; for (edge, rhs_val) in qs.query.join_edges() { inserted = true; - self.edges.entry(edge).or_default().insert(rhs_val, qs.query.hash); + self.edges + .entry(edge) + .or_default() + .entry(rhs_val) + .or_default() + .insert(qs.query.hash); } inserted } @@ -416,10 +421,15 @@ impl JoinEdges { /// If this query has any join edges, remove them from the map. fn remove_query(&mut self, query: &Query) { for (edge, rhs_val) in query.join_edges() { - if let Some(hashes) = self.edges.get_mut(&edge) { - hashes.remove(&rhs_val); - if hashes.is_empty() { - self.edges.remove(&edge); + if let Some(values) = self.edges.get_mut(&edge) { + if let Some(hashes) = values.get_mut(&rhs_val) { + hashes.remove(&query.hash); + if hashes.is_empty() { + values.remove(&rhs_val); + if values.is_empty() { + self.edges.remove(&edge); + } + } } } } @@ -436,6 +446,7 @@ impl JoinEdges { self.edges .range(JoinEdge::range_for_table(table_id)) .filter_map(move |(edge, hashes)| find_rhs_val(edge, row).as_ref().and_then(|rhs_val| hashes.get(rhs_val))) + .flatten() } } From 6ce4d647c570f71f1a29455836d2e72e5613bb72 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Wed, 25 Jun 2025 16:20:44 +0200 Subject: [PATCH 05/10] Remove unnecessary `.clone()` in `pipelined.rs` (#2897) --- crates/execution/src/pipelined.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/execution/src/pipelined.rs b/crates/execution/src/pipelined.rs index 29cf165f109..1e7ea064039 100644 --- a/crates/execution/src/pipelined.rs +++ b/crates/execution/src/pipelined.rs @@ -857,7 +857,7 @@ impl PipelinedIxJoin { .map(Row::Ptr) .map(Tuple::Row) { - f(u.clone().join(v.clone()))?; + f(u.clone().join(v))?; } Ok(()) })?; From 2612f73a638000150c063c2b453b92ce116e4d4c Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 25 Jun 2025 16:28:29 -0700 Subject: [PATCH 06/10] Track disconnects initiated by the client (#2893) --- crates/client-api/src/routes/subscribe.rs | 9 ++++++++- crates/core/src/worker_metrics/mod.rs | 7 ++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 84e0830f3c5..3137c4f10b2 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -431,8 +431,15 @@ async fn ws_client_actor_inner( // if this is the closed-by-them case, let the ClientConnectionSenders know now. sendrx.close(); - closed = true; log::trace!("Close frame {:?}", close_frame); + if !closed { + // This is the client telling us they want to close. + WORKER_METRICS + .ws_clients_closed_connection + .with_label_values(&addr) + .inc(); + } + closed = true; } } } diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 28bf5e813e3..c8bb6bde8c0 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -21,10 +21,15 @@ metrics_group!( pub ws_clients_spawned: IntGaugeVec, #[name = spacetime_worker_ws_clients_aborted] - #[help = "Number of ws client connections aborted"] + #[help = "Number of ws client connections aborted by either the server or the client"] #[labels(database_identity: Identity)] pub ws_clients_aborted: IntGaugeVec, + #[name = spacetime_worker_ws_clients_closed_connection] + #[help = "Number of ws client connections closed by the client as opposed to being termiated by the server"] + #[labels(database_identity: Identity)] + pub ws_clients_closed_connection: IntGaugeVec, + #[name = spacetime_websocket_requests_total] #[help = "The cumulative number of websocket request messages"] #[labels(database_identity: Identity, protocol: str)] From 84a265f7c0913f64d91c3876f673faff5cb31263 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Fri, 27 Jun 2025 11:23:37 +0200 Subject: [PATCH 07/10] core: Allow adjusting CPU reservation pools (#2907) --- crates/core/src/startup.rs | 153 +++++++++++++++++++++++++++++-------- 1 file changed, 122 insertions(+), 31 deletions(-) diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index 0dc94760328..5ee01c2ead5 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -169,7 +169,79 @@ fn reload_config(conf_file: &ConfigToml, reload_handle: &reload::Handle Cores { - Cores::get().unwrap_or_default() + pin_threads_with_reservations(CoreReservations::default()) +} + +/// Like [`pin_threads`], but with a custom [`CoreReservations`]. +#[must_use] +pub fn pin_threads_with_reservations(reservations: CoreReservations) -> Cores { + Cores::get(reservations).unwrap_or_default() +} + +/// The desired distribution of available cores to purposes. +/// +/// Note that, in addition to `reserved`, [`Cores`] reserves two additional +/// cores for the operating system. That is, the denominator for fractions +/// given below is `num_cpus - reserved - 2`. +pub struct CoreReservations { + /// Cores to run database instances on. + /// + /// Default: 1/8 + pub databases: f64, + /// Cores to run tokio worker threads on. + /// + /// Default: 4/8 + pub tokio_workers: f64, + /// Cores to run rayon threads on. + /// + /// Default: 1/8 + pub rayon: f64, + /// Extra reserved cores. + /// + /// If greater than zero, this many cores will be reserved _before_ + /// any of the other reservations are made (but after reserving the OS cores). + /// + /// Default: 0 + pub reserved: usize, +} + +impl Default for CoreReservations { + fn default() -> Self { + Self { + databases: 1.0 / 8.0, + tokio_workers: 4.0 / 8.0, + rayon: 1.0 / 8.0, + reserved: 0, + } + } +} + +impl CoreReservations { + /// Apply this reservation to an arbitrary list of core ids. + /// + /// Returns the allocated cores in the order: + /// + /// - reserved + /// - databases + /// - tokio_workers + /// - rayon + /// + /// Left public for testing and debugging purposes. + pub fn apply(&self, cores: &mut Vec) -> [Vec; 4] { + let reserved = cores.drain(..self.reserved).collect_vec(); + + let total = cores.len() as f64; + let frac = |frac: f64| (total * frac).ceil() as usize; + fn claim(cores: &mut Vec, n: usize) -> impl Iterator + '_ { + cores.drain(..n.min(cores.len())) + } + + let databases = claim(cores, frac(self.databases)).collect_vec(); + let tokio_workers = claim(cores, frac(self.tokio_workers)).collect_vec(); + let rayon = claim(cores, frac(self.rayon)).collect_vec(); + + [reserved, databases, tokio_workers, rayon] + } } /// A type holding cores divvied up into different sets. @@ -178,48 +250,43 @@ pub fn pin_threads() -> Cores { #[derive(Default)] pub struct Cores { /// The cores to run database instances on. - /// - /// Currently, this is 1/8 of num_cpus. pub databases: JobCores, - /// The cores to run tokio worker and blocking threads on. - /// - /// Currently, tokio worker threads are 4/8 of num_cpus, and tokio blocking - /// threads are pinned non-exclusively to 2/8 of num_cpus. + /// The cores to run tokio worker threads on. pub tokio: TokioCores, /// The cores to run rayon threads on. - /// - /// Currently, this is 1/8 of num_cpus. pub rayon: RayonCores, + /// Extra cores if a [`CoreReservations`] with `reserved > 0` was used. + /// + /// If `Some`, the boxed array is non-empty. + pub reserved: Option>, + /// Cores shared between tokio runtimes to schedule blocking tasks on. + /// + /// All remaining cores after [`CoreReservations`] have been made become + /// blocking cores. + /// + /// See `Tokio.blocking` for more context. + #[cfg(target_os = "linux")] + pub blocking: Option, } impl Cores { - fn get() -> Option { - let cores = &mut core_affinity::get_core_ids() - .filter(|cores| cores.len() >= 10)? - .into_iter() - // We reserve the first two cores for the OS. - // This allows us to pin interrupt handlers (IRQs) to these cores, - // particularly those for incoming network traffic, - // preventing them from preempting the main reducer threads. - .filter(|core_id| core_id.id > 1) - .collect_vec() - .into_iter(); - - let total = cores.len() as f64; - let frac = |frac: f64| (total * frac).ceil() as usize; + fn get(reservations: CoreReservations) -> Option { + let mut cores = Self::get_core_ids()?; - let databases = cores.take(frac(1.0 / 8.0)).collect(); + let [reserved, databases, tokio_workers, rayon] = reservations.apply(&mut cores); - let tokio_workers = cores.take(frac(4.0 / 8.0)).collect(); - - let rayon = RayonCores(Some(cores.take(frac(1.0 / 8.0)).collect())); + let reserved = (!reserved.is_empty()).then(|| reserved.into()); + let databases = databases.into_iter().collect::(); + let rayon = RayonCores((!rayon.is_empty()).then_some(rayon)); // see comment on `TokioCores.blocking` #[cfg(target_os = "linux")] - let remaining = cores.try_fold(nix::sched::CpuSet::new(), |mut cpuset, core| { - cpuset.set(core.id).ok()?; - Some(cpuset) - }); + let remaining = cores + .into_iter() + .try_fold(nix::sched::CpuSet::new(), |mut cpuset, core| { + cpuset.set(core.id).ok()?; + Some(cpuset) + }); let tokio = TokioCores { workers: Some(tokio_workers), @@ -231,8 +298,32 @@ impl Cores { databases, tokio, rayon, + reserved, + #[cfg(target_os = "linux")] + blocking: remaining, }) } + + /// Get the cores of the local host, as reported by the operating system. + /// + /// Cores 0 and 1 are not included in the returned vec, as we reserve them + /// for the operating system. + /// + /// Returns `None` if `num_cpus - 2` is less than 8. + /// If `Some` is returned, the `Vec` is non-empty. + pub fn get_core_ids() -> Option> { + let cores = core_affinity::get_core_ids() + .filter(|cores| cores.len() >= 10)? + .into_iter() + // We reserve the first two cores for the OS. + // This allows us to pin interrupt handlers (IRQs) to these cores, + // particularly those for incoming network traffic, + // preventing them from preempting the main reducer threads. + .filter(|core_id| core_id.id > 1) + .collect_vec(); + + (!cores.is_empty()).then_some(cores) + } } #[derive(Default)] From 03721b4a7212aee21583483a754e641c5d59267c Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Fri, 27 Jun 2025 14:55:22 +0200 Subject: [PATCH 08/10] durability: Fix task leak (#2875) --- crates/durability/src/imp/local.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 404729c354c..4a5c07feec5 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -7,7 +7,7 @@ use std::{ AtomicI64, AtomicU64, Ordering::{Acquire, Relaxed, Release}, }, - Arc, + Arc, Weak, }, time::Duration, }; @@ -116,7 +116,7 @@ impl Local { ); rt.spawn( FlushAndSyncTask { - clog: clog.clone(), + clog: Arc::downgrade(&clog), period: opts.sync_interval, offset: offset.clone(), abort: persister_task.abort_handle(), @@ -254,7 +254,7 @@ fn flush_error(e: io::Error) { } struct FlushAndSyncTask { - clog: Arc>>, + clog: Weak>>, period: Duration, offset: Arc, /// Handle to abort the [`PersisterTask`] if fsync panics. @@ -272,15 +272,17 @@ impl FlushAndSyncTask { loop { interval.tick().await; + let Some(clog) = self.clog.upgrade() else { + break; + }; // Skip if nothing changed. - if let Some(committed) = self.clog.max_committed_offset() { + if let Some(committed) = clog.max_committed_offset() { let durable = self.offset.load(Acquire); if durable.is_positive() && committed == durable as _ { continue; } } - let clog = self.clog.clone(); let task = spawn_blocking(move || clog.flush_and_sync()).await; match task { Err(e) => { From be6d305ae848a1722e2f9db6ad64ab17992017e4 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Mon, 30 Jun 2025 09:14:52 -0700 Subject: [PATCH 09/10] Filter out dropped clients in the send worker (#2899) --- crates/core/src/client/client_connection.rs | 7 ++++- .../module_subscription_manager.rs | 27 ++++++++++++++----- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index 9f306b21270..25cb330cac9 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -1,5 +1,6 @@ use std::collections::VecDeque; use std::ops::Deref; +use std::sync::atomic::Ordering; use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; use std::sync::Arc; use std::time::Instant; @@ -158,6 +159,10 @@ impl ClientConnectionSender { Self::dummy_with_channel(id, config).0 } + pub fn is_cancelled(&self) -> bool { + self.cancelled.load(Ordering::Relaxed) + } + /// Send a message to the client. For data-related messages, you should probably use /// `BroadcastQueue::send` to ensure that the client sees data messages in a consistent order. pub fn send_message(&self, message: impl Into) -> Result<(), ClientSendError> { @@ -175,7 +180,7 @@ impl ClientConnectionSender { // the channel, so forcibly kick the client tracing::warn!(identity = %self.id.identity, connection_id = %self.id.connection_id, "client channel capacity exceeded"); self.abort_handle.abort(); - self.cancelled.store(true, Relaxed); + self.cancelled.store(true, Ordering::Relaxed); return Err(ClientSendError::Cancelled); } Err(mpsc::error::TrySendError::Closed(_)) => return Err(ClientSendError::Disconnected), diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 9e6a1046fd5..542ea435ab5 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -1208,12 +1208,7 @@ impl SubscriptionManager { SingleQueryUpdate { update, num_rows } } - // filter out clients that've dropped - let clients_for_query = qstate.all_clients().filter(|id| { - self.clients - .get(*id) - .is_some_and(|info| !info.dropped.load(Ordering::Acquire)) - }); + let clients_for_query = qstate.all_clients(); match eval_delta(tx, &mut acc.metrics, plan) { Err(err) => { @@ -1292,6 +1287,16 @@ struct SendWorkerClient { outbound_ref: Client, } +impl SendWorkerClient { + fn is_dropped(&self) -> bool { + self.dropped.load(Ordering::Relaxed) + } + + fn is_cancelled(&self) -> bool { + self.outbound_ref.is_cancelled() + } +} + /// Asynchronous background worker which aggregates each of the clients' updates from a [`ComputedQueries`] /// into `DbUpdate`s and then sends them to the clients' WebSocket workers. /// @@ -1330,6 +1335,14 @@ impl Drop for SendWorker { } } +impl SendWorker { + fn is_client_dropped_or_cancelled(&self, client_id: &ClientId) -> bool { + self.clients + .get(client_id) + .is_some_and(|client| client.is_cancelled() || client.is_dropped()) + } +} + #[derive(Debug, Clone)] pub struct BroadcastQueue(SenderWithGauge); @@ -1431,6 +1444,8 @@ impl SendWorker { let mut eval = updates .into_iter() + // Filter out dropped or cancelled clients + .filter(|upd| !self.is_client_dropped_or_cancelled(&upd.id)) // Filter out clients whose subscriptions failed .filter(|upd| !clients_with_errors.contains(&upd.id)) // For each subscriber, aggregate all the updates for the same table. From 4eb621357526365ae7861cb2bc8ff87d68025907 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Mon, 30 Jun 2025 18:41:59 -0700 Subject: [PATCH 10/10] Wrap websocket flush in timeout (#2908) --- crates/client-api/src/routes/subscribe.rs | 132 +++++++++++++++++----- 1 file changed, 105 insertions(+), 27 deletions(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 3137c4f10b2..85bd8acb42d 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -182,6 +182,7 @@ where } const LIVELINESS_TIMEOUT: Duration = Duration::from_secs(60); +const SEND_TIMEOUT: Duration = Duration::from_secs(5); async fn ws_client_actor(client: ClientConnection, ws: WebSocketStream, sendrx: MeteredReceiver) { // ensure that even if this task gets cancelled, we always cleanup the connection @@ -266,12 +267,12 @@ async fn ws_client_actor_inner( Some(Ok(m)) => Item::Message(ClientMessage::from_message(m)), Some(Err(error)) => { log::warn!("Websocket receive error: {}", error); - continue; + break; } // the client sent us a close frame None => { - break - }, + break; + } }, // If we have an outgoing message to send, send it off. @@ -311,11 +312,29 @@ async fn ws_client_actor_inner( // now we flush all the messages to the socket (ws.flush().await, msg_buffer) }; + // Build a future that both times out and drives the send. + // + // Note that if flushing cannot immediately complete for whatever reason, + // it will wait without polling the other futures in the `select!` arms. + // Among other things, this means our liveness tick will not be polled. + // + // To avoid waiting indefinitely, we wrap the send in a timeout. + // A timeout is treated as an unresponsive client and we drop the connection. + let send_all = tokio::time::timeout(SEND_TIMEOUT, send_all); // Flush the websocket while continuing to poll the `handle_queue`, // to avoid deadlocks or delays due to enqueued futures holding resources. let send_all = also_poll(send_all, make_progress(&mut current_message)); let t1 = Instant::now(); - let (send_all_result, buf) = send_all.await; + let (send_all_result, buf) = match send_all.await { + Ok((send_all_result, buf)) => { + (send_all_result, buf) + } + Err(e) => { + // Our send timed out; drop client without trying to send them a Close + log::warn!("send_all timed out: {e}"); + break; + } + }; msg_buffer = buf; if let Err(error) = send_all_result { log::warn!("Websocket send error: {error}") @@ -335,13 +354,33 @@ async fn ws_client_actor_inner( Err(NoSuchModule) => { // Send a close frame while continuing to poll the `handle_queue`, // to avoid deadlocks or delays due to enqueued futures holding resources. - let close = also_poll( - ws.close(Some(CloseFrame { code: CloseCode::Away, reason: "module exited".into() })), - make_progress(&mut current_message), - ); - if let Err(e) = close.await { - log::warn!("error closing: {e:#}") - } + let close = ws.close(Some(CloseFrame { code: CloseCode::Away, reason: "module exited".into() })); + // Wrap the close in a timeout + let close = tokio::time::timeout(SEND_TIMEOUT, close); + match also_poll(close, make_progress(&mut current_message)).await { + Ok(Err(e)) => { + log::warn!("error closing websocket: {e:#}") + } + Err(e) => { + // Our send timed out; drop client without trying to send them a Close. + // + // Is it correct to break if a reducer is still in progress? + // Answer: Yes it is. + // + // If a reducer is currently being executed, + // we are waiting for the `current_message` future to complete. + // When we break, the task completes and this future is dropped. + // + // Notably though the reducer itself will run to completion, + // however when it tries to notify this task that it is done, + // it will encounter a closed sender in `JobThread::run`, + // dropping the value that it's trying to send. + // In particular it will not throw an error or panic. + log::warn!("websocket close timed out: {e}"); + break; + } + _ => {} + }; closed = true; } } @@ -352,10 +391,29 @@ async fn ws_client_actor_inner( _ = liveness_check_interval.tick() => { // If we received a pong at some point, send a fresh ping. if mem::take(&mut got_pong) { + // Build a future that both times out and drives the send. + // + // Note that if the send cannot immediately complete for whatever reason, + // it will wait without polling the other futures in the `select!` arms. + // Among other things, this means we won't poll the websocket for a Close frame. + // + // To avoid waiting indefinitely, we wrap the ping in a timeout. + // A timeout is treated as an unresponsive client and we drop the connection. + let ping = ws.send(WsMessage::Ping(Bytes::new())); + let ping_with_timeout = tokio::time::timeout(SEND_TIMEOUT, ping); + // Send a ping message while continuing to poll the `handle_queue`, // to avoid deadlocks or delays due to enqueued futures holding resources. - if let Err(e) = also_poll(ws.send(WsMessage::Ping(Bytes::new())), make_progress(&mut current_message)).await { - log::warn!("error sending ping: {e:#}"); + match also_poll(ping_with_timeout, make_progress(&mut current_message)).await { + Ok(Err(e)) => { + log::warn!("error sending ping: {e:#}"); + } + Err(e) => { + // Our ping timed out; drop them without trying to send them a Close + log::warn!("ping timed out after: {e}"); + break; + } + _ => {} } continue; } else { @@ -380,13 +438,22 @@ async fn ws_client_actor_inner( Item::HandleResult(res) => { if let Err(e) = res { if let MessageHandleError::Execution(err) = e { - log::error!("{err:#}"); + log::error!("reducer execution error: {err:#}"); // Serialize the message and keep a handle to the buffer. let (msg_alloc, msg_data) = serialize(msg_buffer, err, client.config); - // Buffer the message without necessarily sending it. - if let Err(error) = ws.send(datamsg_to_wsmsg(msg_data)).await { - log::warn!("Websocket send error: {error}") + let send = async { ws.send(datamsg_to_wsmsg(msg_data)).await }; + let send = tokio::time::timeout(SEND_TIMEOUT, send); + + match send.await { + Ok(Err(error)) => { + log::warn!("Websocket send error: {error}") + } + Err(error) => { + log::warn!("send timed out after: {error}"); + break; + } + _ => {} } // At this point, @@ -399,16 +466,23 @@ async fn ws_client_actor_inner( continue; } - log::debug!("Client caused error on text message: {}", e); - if let Err(e) = ws - .close(Some(CloseFrame { - code: CloseCode::Error, - reason: format!("{e:#}").into(), - })) - .await - { - log::warn!("error closing websocket: {e:#}") - }; + log::warn!("Client caused error on text message: {}", e); + let close = ws.close(Some(CloseFrame { + code: CloseCode::Error, + reason: format!("{e:#}").into(), + })); + + // Wrap the close in a timeout + match tokio::time::timeout(SEND_TIMEOUT, close).await { + Ok(Err(e)) => { + log::warn!("error closing websocket: {e:#}") + } + Err(e) => { + log::warn!("send timed out after: {e}"); + break; + } + _ => {} + } } } Item::Message(ClientMessage::Ping(_message)) => { @@ -439,6 +513,10 @@ async fn ws_client_actor_inner( .with_label_values(&addr) .inc(); } + + // Can't we just break out of the loop here? + // Not, if we want tungstenite to send a close frame back to the client. + // That will only happen once `ws.next()` returns `None`. closed = true; } }