From 9696d9decfd7edf2feb2e4470b0ca781090d74c2 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Tue, 29 Jul 2025 11:47:27 -0400 Subject: [PATCH 1/2] Add additional logging to subscribe route and simplify calling `client_connected` Out-of-band discussions with the BitCraft team brought up questions about whether it was possible for a rejected client connection to start an expensive computation like a subscription before their connection was killed, e.g. by sending a `Subscribe` message along the WebSocket before `client_connected` had finished returning `Err`. I don't believe this was actually possible, as `ClientConnection::spawn` called and awaited `call_identity_connected` before invoking its `actor` closure, and it was that `actor` which processed `Subscribe` messages. But it was somewhat difficult to verify that behavior, and so I re-organized the code so that the outer layer of the `subscribe` handler obviously had that property without having to step into `ClientConnection::spawn`. I also added some additional logging to the subscribe route, including the `X-Forwarded-For` header in more messages, as the BitCraft team complained about having difficulty correlating IP addresses with connections. The log levels remain the same as before, just with additional information added: - Successful connections are at `debug` level, - Rejected connections are at `info` level (these are the ones BitCraft cares about in this case). - Failed connections are at `warn`. As the levels are unchanged, this should not add undesirable log noise. --- crates/client-api/src/routes/subscribe.rs | 36 +++++++++++++-------- crates/core/src/client/client_connection.rs | 24 +++++++++++--- 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 2e718528afd..c4c16f7a3d4 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -145,7 +145,7 @@ where let identity_token = auth.creds.token().into(); - let module_rx = leader.module_watcher().await.map_err(log_and_500)?; + let mut module_rx = leader.module_watcher().await.map_err(log_and_500)?; let client_id = ClientActorId { identity: auth.identity, @@ -168,26 +168,36 @@ where } }; - match forwarded_for { + let identity = client_id.identity; + let client_log_string = match forwarded_for { Some(TypedHeader(XForwardedFor(ip))) => { - log::debug!("New client connected from ip {ip}") + format!("ip {ip} with Identity {identity} and ConnectionId {connection_id}") } - None => log::debug!("New client connected from unknown ip"), - } + None => format!("unknown ip with Identity {identity} and ConnectionId {connection_id}"), + }; - let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx); - let client = match ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor).await - { - Ok(s) => s, + log::debug!("New client connected from {client_log_string}"); + + match ClientConnection::call_client_connected_maybe_reject(&mut module_rx, client_id).await { + Ok(()) => log::info!("client_connected returned Ok for {client_log_string}"), Err(e @ (ClientConnectedError::Rejected(_) | ClientConnectedError::OutOfEnergy)) => { - log::info!("{e}"); + log::info!( + "Rejecting connection for {client_log_string} due to error from client_connected reducer: {e}" + ); return; } Err(e @ (ClientConnectedError::DBError(_) | ClientConnectedError::ReducerCall(_))) => { - log::warn!("ModuleHost died while we were connecting: {e:#}"); + log::warn!("ModuleHost died while {client_log_string} was connecting: {e:#}"); return; } - }; + } + + log::debug!( + "Database accepted connection from {client_log_string}; spawning ws_client_actor and ClientConnection" + ); + + let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx); + let client = ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor).await; // Send the client their identity token message as the first message // NOTE: We're adding this to the protocol because some client libraries are @@ -200,7 +210,7 @@ where connection_id, }; if let Err(e) = client.send_message(message) { - log::warn!("{e}, before identity token was sent") + log::warn!("Error sending IdentityToken message to {client_log_string}: {e}"); } }); diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index acd9466d5b1..e915753d1db 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -393,14 +393,31 @@ const CLIENT_CHANNEL_CAPACITY: usize = 16 * KB; const KB: usize = 1024; impl ClientConnection { - /// Returns an error if ModuleHost closed + /// Call the database at `module_rx`'s `client_connection` reducer, if any, + /// and return `Err` if it signals rejecting this client's connection. + /// + /// Call this method before [`Self::spawn`], + /// and do not call [`Self::spawn`] if this method returns `Err`. + pub async fn call_client_connected_maybe_reject( + module_rx: &mut watch::Receiver, + id: ClientActorId, + ) -> Result<(), ClientConnectedError> { + let module = module_rx.borrow_and_update().clone(); + module.call_identity_connected(id.identity, id.connection_id).await + } + + /// Spawn a new [`ClientConnection`] for a WebSocket subscriber. + /// + /// Callers should first call [`Self::call_client_connected_maybe_reject`] + /// to verify that the database at `module_rx` approves of this connection, + /// and should not invoke this method if that call returns an error. pub async fn spawn( id: ClientActorId, config: ClientConfig, replica_id: u64, mut module_rx: watch::Receiver, actor: impl FnOnce(ClientConnection, MeteredReceiver) -> Fut, - ) -> Result + ) -> ClientConnection where Fut: Future + Send + 'static, { @@ -409,7 +426,6 @@ impl ClientConnection { // logically subscribed to the database, not any particular replica. We should handle failover for // them and stuff. Not right now though. let module = module_rx.borrow_and_update().clone(); - module.call_identity_connected(id.identity, id.connection_id).await?; let (sendtx, sendrx) = mpsc::channel::(CLIENT_CHANNEL_CAPACITY); @@ -455,7 +471,7 @@ impl ClientConnection { // if this fails, the actor() function called .abort(), which like... okay, I guess? let _ = fut_tx.send(actor_fut); - Ok(this) + this } pub fn dummy( From 508e24598d609a3e6ca81b9a8bdd56677b4fa42d Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Tue, 29 Jul 2025 14:00:27 -0400 Subject: [PATCH 2/2] Address Kim's review - Add `struct Connected` which acts as a "proof" of having called `client_connected`, rather than just relying on docstrings to enforce correct usage. - Drop a log that prints on successful connections to `debug`, in keeping with the PR description. - Slap the prefix `websocket: ` into log messages for easier filtering. --- crates/client-api/src/routes/subscribe.rs | 24 ++++++++++-------- crates/core/src/client/client_connection.rs | 27 +++++++++++++++++---- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index c4c16f7a3d4..f59f8cd68dd 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -163,7 +163,7 @@ where let ws = match ws_upgrade.upgrade(ws_config).await { Ok(ws) => ws, Err(err) => { - log::error!("WebSocket init error: {err}"); + log::error!("websocket: WebSocket init error: {err}"); return; } }; @@ -176,28 +176,32 @@ where None => format!("unknown ip with Identity {identity} and ConnectionId {connection_id}"), }; - log::debug!("New client connected from {client_log_string}"); + log::debug!("websocket: New client connected from {client_log_string}"); - match ClientConnection::call_client_connected_maybe_reject(&mut module_rx, client_id).await { - Ok(()) => log::info!("client_connected returned Ok for {client_log_string}"), + let connected = match ClientConnection::call_client_connected_maybe_reject(&mut module_rx, client_id).await { + Ok(connected) => { + log::debug!("websocket: client_connected returned Ok for {client_log_string}"); + connected + } Err(e @ (ClientConnectedError::Rejected(_) | ClientConnectedError::OutOfEnergy)) => { log::info!( - "Rejecting connection for {client_log_string} due to error from client_connected reducer: {e}" + "websocket: Rejecting connection for {client_log_string} due to error from client_connected reducer: {e}" ); return; } Err(e @ (ClientConnectedError::DBError(_) | ClientConnectedError::ReducerCall(_))) => { - log::warn!("ModuleHost died while {client_log_string} was connecting: {e:#}"); + log::warn!("websocket: ModuleHost died while {client_log_string} was connecting: {e:#}"); return; } - } + }; log::debug!( - "Database accepted connection from {client_log_string}; spawning ws_client_actor and ClientConnection" + "websocket: Database accepted connection from {client_log_string}; spawning ws_client_actor and ClientConnection" ); let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx); - let client = ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor).await; + let client = + ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor, connected).await; // Send the client their identity token message as the first message // NOTE: We're adding this to the protocol because some client libraries are @@ -210,7 +214,7 @@ where connection_id, }; if let Err(e) = client.send_message(message) { - log::warn!("Error sending IdentityToken message to {client_log_string}: {e}"); + log::warn!("websocket: Error sending IdentityToken message to {client_log_string}: {e}"); } }); diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index e915753d1db..cf3e5511fff 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -392,31 +392,48 @@ impl Drop for MeteredReceiver { const CLIENT_CHANNEL_CAPACITY: usize = 16 * KB; const KB: usize = 1024; +/// Value returned by [`ClientConnection::call_client_connected_maybe_reject`] +/// and consumed by [`ClientConnection::spawn`] which acts as a proof that the client is authorized. +/// +/// Because this struct does not capture the module or database info or the client connection info, +/// a malicious caller could [`ClientConnected::call_client_connected_maybe_reject`] for one client +/// and then use the resulting `Connected` token to [`ClientConnection::spawn`] for a different client. +/// We're not particularly worried about that. +/// This token exists as a sanity check that non-malicious callers don't accidentally [`ClientConnection::spawn`] +/// for an unauthorized client. +#[non_exhaustive] +pub struct Connected { + _private: (), +} + impl ClientConnection { /// Call the database at `module_rx`'s `client_connection` reducer, if any, /// and return `Err` if it signals rejecting this client's connection. /// - /// Call this method before [`Self::spawn`], - /// and do not call [`Self::spawn`] if this method returns `Err`. + /// Call this method before [`Self::spawn`] + /// and pass the returned [`Connected`] to [`Self::spawn`] as proof that the client is authorized. pub async fn call_client_connected_maybe_reject( module_rx: &mut watch::Receiver, id: ClientActorId, - ) -> Result<(), ClientConnectedError> { + ) -> Result { let module = module_rx.borrow_and_update().clone(); - module.call_identity_connected(id.identity, id.connection_id).await + module.call_identity_connected(id.identity, id.connection_id).await?; + Ok(Connected { _private: () }) } /// Spawn a new [`ClientConnection`] for a WebSocket subscriber. /// /// Callers should first call [`Self::call_client_connected_maybe_reject`] /// to verify that the database at `module_rx` approves of this connection, - /// and should not invoke this method if that call returns an error. + /// and should not invoke this method if that call returns an error, + /// and pass the returned [`Connected`] as `_proof_of_client_connected_call`. pub async fn spawn( id: ClientActorId, config: ClientConfig, replica_id: u64, mut module_rx: watch::Receiver, actor: impl FnOnce(ClientConnection, MeteredReceiver) -> Fut, + _proof_of_client_connected_call: Connected, ) -> ClientConnection where Fut: Future + Send + 'static,