Skip to content

Commit 9696d9d

Browse files
committed
Add additional logging to subscribe route and simplify calling client_connected
Out-of-band discussions with the BitCraft team brought up questions about whether it was possible for a rejected client connection to start an expensive computation like a subscription before their connection was killed, e.g. by sending a `Subscribe` message along the WebSocket before `client_connected` had finished returning `Err`. I don't believe this was actually possible, as `ClientConnection::spawn` called and awaited `call_identity_connected` before invoking its `actor` closure, and it was that `actor` which processed `Subscribe` messages. But it was somewhat difficult to verify that behavior, and so I re-organized the code so that the outer layer of the `subscribe` handler obviously had that property without having to step into `ClientConnection::spawn`. I also added some additional logging to the subscribe route, including the `X-Forwarded-For` header in more messages, as the BitCraft team complained about having difficulty correlating IP addresses with connections. The log levels remain the same as before, just with additional information added: - Successful connections are at `debug` level, - Rejected connections are at `info` level (these are the ones BitCraft cares about in this case). - Failed connections are at `warn`. As the levels are unchanged, this should not add undesirable log noise.
1 parent e214f73 commit 9696d9d

2 files changed

Lines changed: 43 additions & 17 deletions

File tree

crates/client-api/src/routes/subscribe.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ where
145145

146146
let identity_token = auth.creds.token().into();
147147

148-
let module_rx = leader.module_watcher().await.map_err(log_and_500)?;
148+
let mut module_rx = leader.module_watcher().await.map_err(log_and_500)?;
149149

150150
let client_id = ClientActorId {
151151
identity: auth.identity,
@@ -168,26 +168,36 @@ where
168168
}
169169
};
170170

171-
match forwarded_for {
171+
let identity = client_id.identity;
172+
let client_log_string = match forwarded_for {
172173
Some(TypedHeader(XForwardedFor(ip))) => {
173-
log::debug!("New client connected from ip {ip}")
174+
format!("ip {ip} with Identity {identity} and ConnectionId {connection_id}")
174175
}
175-
None => log::debug!("New client connected from unknown ip"),
176-
}
176+
None => format!("unknown ip with Identity {identity} and ConnectionId {connection_id}"),
177+
};
177178

178-
let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx);
179-
let client = match ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor).await
180-
{
181-
Ok(s) => s,
179+
log::debug!("New client connected from {client_log_string}");
180+
181+
match ClientConnection::call_client_connected_maybe_reject(&mut module_rx, client_id).await {
182+
Ok(()) => log::info!("client_connected returned Ok for {client_log_string}"),
182183
Err(e @ (ClientConnectedError::Rejected(_) | ClientConnectedError::OutOfEnergy)) => {
183-
log::info!("{e}");
184+
log::info!(
185+
"Rejecting connection for {client_log_string} due to error from client_connected reducer: {e}"
186+
);
184187
return;
185188
}
186189
Err(e @ (ClientConnectedError::DBError(_) | ClientConnectedError::ReducerCall(_))) => {
187-
log::warn!("ModuleHost died while we were connecting: {e:#}");
190+
log::warn!("ModuleHost died while {client_log_string} was connecting: {e:#}");
188191
return;
189192
}
190-
};
193+
}
194+
195+
log::debug!(
196+
"Database accepted connection from {client_log_string}; spawning ws_client_actor and ClientConnection"
197+
);
198+
199+
let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx);
200+
let client = ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor).await;
191201

192202
// Send the client their identity token message as the first message
193203
// NOTE: We're adding this to the protocol because some client libraries are
@@ -200,7 +210,7 @@ where
200210
connection_id,
201211
};
202212
if let Err(e) = client.send_message(message) {
203-
log::warn!("{e}, before identity token was sent")
213+
log::warn!("Error sending IdentityToken message to {client_log_string}: {e}");
204214
}
205215
});
206216

crates/core/src/client/client_connection.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -393,14 +393,31 @@ const CLIENT_CHANNEL_CAPACITY: usize = 16 * KB;
393393
const KB: usize = 1024;
394394

395395
impl ClientConnection {
396-
/// Returns an error if ModuleHost closed
396+
/// Call the database at `module_rx`'s `client_connection` reducer, if any,
397+
/// and return `Err` if it signals rejecting this client's connection.
398+
///
399+
/// Call this method before [`Self::spawn`],
400+
/// and do not call [`Self::spawn`] if this method returns `Err`.
401+
pub async fn call_client_connected_maybe_reject(
402+
module_rx: &mut watch::Receiver<ModuleHost>,
403+
id: ClientActorId,
404+
) -> Result<(), ClientConnectedError> {
405+
let module = module_rx.borrow_and_update().clone();
406+
module.call_identity_connected(id.identity, id.connection_id).await
407+
}
408+
409+
/// Spawn a new [`ClientConnection`] for a WebSocket subscriber.
410+
///
411+
/// Callers should first call [`Self::call_client_connected_maybe_reject`]
412+
/// to verify that the database at `module_rx` approves of this connection,
413+
/// and should not invoke this method if that call returns an error.
397414
pub async fn spawn<Fut>(
398415
id: ClientActorId,
399416
config: ClientConfig,
400417
replica_id: u64,
401418
mut module_rx: watch::Receiver<ModuleHost>,
402419
actor: impl FnOnce(ClientConnection, MeteredReceiver<SerializableMessage>) -> Fut,
403-
) -> Result<ClientConnection, ClientConnectedError>
420+
) -> ClientConnection
404421
where
405422
Fut: Future<Output = ()> + Send + 'static,
406423
{
@@ -409,7 +426,6 @@ impl ClientConnection {
409426
// logically subscribed to the database, not any particular replica. We should handle failover for
410427
// them and stuff. Not right now though.
411428
let module = module_rx.borrow_and_update().clone();
412-
module.call_identity_connected(id.identity, id.connection_id).await?;
413429

414430
let (sendtx, sendrx) = mpsc::channel::<SerializableMessage>(CLIENT_CHANNEL_CAPACITY);
415431

@@ -455,7 +471,7 @@ impl ClientConnection {
455471
// if this fails, the actor() function called .abort(), which like... okay, I guess?
456472
let _ = fut_tx.send(actor_fut);
457473

458-
Ok(this)
474+
this
459475
}
460476

461477
pub fn dummy(

0 commit comments

Comments
 (0)