Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ where

let identity_token = auth.creds.token().into();

let module_rx = leader.module_watcher().await.map_err(log_and_500)?;
let mut module_rx = leader.module_watcher().await.map_err(log_and_500)?;

let client_id = ClientActorId {
identity: auth.identity,
Expand All @@ -163,32 +163,46 @@ where
let ws = match ws_upgrade.upgrade(ws_config).await {
Ok(ws) => ws,
Err(err) => {
log::error!("WebSocket init error: {err}");
log::error!("websocket: WebSocket init error: {err}");
return;
}
};

match forwarded_for {
let identity = client_id.identity;
let client_log_string = match forwarded_for {
Some(TypedHeader(XForwardedFor(ip))) => {
log::debug!("New client connected from ip {ip}")
format!("ip {ip} with Identity {identity} and ConnectionId {connection_id}")
}
None => log::debug!("New client connected from unknown ip"),
}
None => format!("unknown ip with Identity {identity} and ConnectionId {connection_id}"),
};

let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx);
let client = match ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor).await
{
Ok(s) => s,
log::debug!("websocket: New client connected from {client_log_string}");

let connected = match ClientConnection::call_client_connected_maybe_reject(&mut module_rx, client_id).await {
Ok(connected) => {
log::debug!("websocket: client_connected returned Ok for {client_log_string}");
connected
}
Err(e @ (ClientConnectedError::Rejected(_) | ClientConnectedError::OutOfEnergy)) => {
log::info!("{e}");
log::info!(
"websocket: Rejecting connection for {client_log_string} due to error from client_connected reducer: {e}"
);
return;
}
Err(e @ (ClientConnectedError::DBError(_) | ClientConnectedError::ReducerCall(_))) => {
log::warn!("ModuleHost died while we were connecting: {e:#}");
log::warn!("websocket: ModuleHost died while {client_log_string} was connecting: {e:#}");
return;
}
};

log::debug!(
"websocket: Database accepted connection from {client_log_string}; spawning ws_client_actor and ClientConnection"
);

let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx);
let client =
ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor, connected).await;

// Send the client their identity token message as the first message
// NOTE: We're adding this to the protocol because some client libraries are
// unable to access the http response headers.
Expand All @@ -200,7 +214,7 @@ where
connection_id,
};
if let Err(e) = client.send_message(message) {
log::warn!("{e}, before identity token was sent")
log::warn!("websocket: Error sending IdentityToken message to {client_log_string}: {e}");
}
});

Expand Down
41 changes: 37 additions & 4 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,15 +392,49 @@ impl<T> Drop for MeteredReceiver<T> {
const CLIENT_CHANNEL_CAPACITY: usize = 16 * KB;
const KB: usize = 1024;

/// Value returned by [`ClientConnection::call_client_connected_maybe_reject`]
/// and consumed by [`ClientConnection::spawn`] which acts as a proof that the client is authorized.
///
/// Because this struct does not capture the module or database info or the client connection info,
/// a malicious caller could [`ClientConnected::call_client_connected_maybe_reject`] for one client
/// and then use the resulting `Connected` token to [`ClientConnection::spawn`] for a different client.
/// We're not particularly worried about that.
/// This token exists as a sanity check that non-malicious callers don't accidentally [`ClientConnection::spawn`]
/// for an unauthorized client.
#[non_exhaustive]
pub struct Connected {
_private: (),
}

impl ClientConnection {
/// Returns an error if ModuleHost closed
/// Call the database at `module_rx`'s `client_connection` reducer, if any,
/// and return `Err` if it signals rejecting this client's connection.
///
/// Call this method before [`Self::spawn`]
/// and pass the returned [`Connected`] to [`Self::spawn`] as proof that the client is authorized.
pub async fn call_client_connected_maybe_reject(
Comment thread
gefjon marked this conversation as resolved.
module_rx: &mut watch::Receiver<ModuleHost>,
id: ClientActorId,
) -> Result<Connected, ClientConnectedError> {
let module = module_rx.borrow_and_update().clone();
module.call_identity_connected(id.identity, id.connection_id).await?;
Ok(Connected { _private: () })
}

/// Spawn a new [`ClientConnection`] for a WebSocket subscriber.
///
/// Callers should first call [`Self::call_client_connected_maybe_reject`]
/// to verify that the database at `module_rx` approves of this connection,
/// and should not invoke this method if that call returns an error,
/// and pass the returned [`Connected`] as `_proof_of_client_connected_call`.
pub async fn spawn<Fut>(
id: ClientActorId,
config: ClientConfig,
replica_id: u64,
mut module_rx: watch::Receiver<ModuleHost>,
actor: impl FnOnce(ClientConnection, MeteredReceiver<SerializableMessage>) -> Fut,
) -> Result<ClientConnection, ClientConnectedError>
_proof_of_client_connected_call: Connected,
) -> ClientConnection
where
Fut: Future<Output = ()> + Send + 'static,
{
Expand All @@ -409,7 +443,6 @@ impl ClientConnection {
// logically subscribed to the database, not any particular replica. We should handle failover for
// them and stuff. Not right now though.
let module = module_rx.borrow_and_update().clone();
module.call_identity_connected(id.identity, id.connection_id).await?;

let (sendtx, sendrx) = mpsc::channel::<SerializableMessage>(CLIENT_CHANNEL_CAPACITY);

Expand Down Expand Up @@ -455,7 +488,7 @@ impl ClientConnection {
// if this fails, the actor() function called .abort(), which like... okay, I guess?
let _ = fut_tx.send(actor_fut);

Ok(this)
this
}

pub fn dummy(
Expand Down
Loading