From aefdc13d7d4334e377bc7f7522094ead9029ee73 Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Thu, 15 May 2025 12:24:38 +0400 Subject: [PATCH 1/7] fix: use mutiple connections for ws subscriptions This fix rewrites the way account subscriptions work in the cloning pipeline, instead of using a single connection per shard, we now use pool of connections, which are used to distribute subscriptions, giving us more subscription capacity, before threshold imposed by RPC provider is reached. --- .../src/remote_account_updates_shard.rs | 154 ++++++++++++++---- .../tests/remote_account_updates.rs | 6 + 2 files changed, 124 insertions(+), 36 deletions(-) diff --git a/magicblock-account-updates/src/remote_account_updates_shard.rs b/magicblock-account-updates/src/remote_account_updates_shard.rs index 3062a830..d6a44994 100644 --- a/magicblock-account-updates/src/remote_account_updates_shard.rs +++ b/magicblock-account-updates/src/remote_account_updates_shard.rs @@ -7,12 +7,12 @@ use std::{ }; use conjunto_transwise::RpcProviderConfig; -use futures_util::StreamExt; +use futures_util::{stream::FuturesUnordered, Stream, StreamExt}; use log::*; use magicblock_metrics::metrics; -use solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig}; +use solana_account_decoder::{UiAccount, UiAccountEncoding, UiDataSliceConfig}; use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient; -use solana_rpc_client_api::config::RpcAccountInfoConfig; +use solana_rpc_client_api::{config::RpcAccountInfoConfig, response::Response}; use solana_sdk::{ clock::{Clock, Slot}, commitment_config::CommitmentConfig, @@ -24,6 +24,13 @@ use tokio::sync::mpsc::Receiver; use tokio_stream::StreamMap; use tokio_util::sync::CancellationToken; +type BoxFn = Box< + dyn FnOnce() -> Pin + Send + 'static>> + Send, +>; + +type SubscriptionStream = + Pin> + Send + 'static>>; + #[derive(Debug, Error)] pub enum RemoteAccountUpdatesShardError { #[error(transparent)] @@ -65,11 +72,8 @@ impl RemoteAccountUpdatesShard { // Create a pubsub client info!("Shard {}: Starting", self.shard_id); let ws_url = self.rpc_provider_config.ws_url(); - let pubsub_client = PubsubClient::new(ws_url) - .await - .map_err(RemoteAccountUpdatesShardError::PubsubClientError)?; // For every account, we only want the updates, not the actual content of the accounts - let rpc_account_info_config = Some(RpcAccountInfoConfig { + let config = RpcAccountInfoConfig { commitment: self .rpc_provider_config .commitment() @@ -80,21 +84,15 @@ impl RemoteAccountUpdatesShard { length: 0, }), min_context_slot: None, - }); + }; + let mut pool = PubsubPool::new(ws_url, config).await?; // Subscribe to the clock from the RPC (to figure out the latest slot) - let (mut clock_stream, clock_unsubscribe) = pubsub_client - .account_subscribe(&clock::ID, rpc_account_info_config.clone()) - .await - .map_err(RemoteAccountUpdatesShardError::PubsubClientError)?; + let mut clock_stream = pool.subscribe(clock::ID).await?; + println!("subscribed to clock subscription"); let mut clock_slot = 0; // We'll store useful maps for each of the account subscriptions let mut account_streams = StreamMap::new(); // rust compiler is not yet smart enough to figure out the exact type - type BoxFn = Box< - dyn FnOnce() -> Pin + Send + 'static>> - + Send, - >; - let mut account_unsubscribes: HashMap = HashMap::new(); const LOG_CLOCK_FREQ: u64 = 100; let mut log_clock_count = 0; @@ -121,16 +119,14 @@ impl RemoteAccountUpdatesShard { } // When we receive a message to start monitoring an account Some((pubkey, unsub)) = self.monitoring_request_receiver.recv() => { + println!("received subscription"); if unsub { - let Some(request) = account_unsubscribes.remove(&pubkey) else { - continue; - }; account_streams.remove(&pubkey); metrics::set_subscriptions_count(account_streams.len(), &self.shard_id); - request().await; + pool.unsubscribe(&pubkey).await; continue; } - if account_unsubscribes.contains_key(&pubkey) { + if pool.subscribed(&pubkey) { continue; } debug!( @@ -139,12 +135,10 @@ impl RemoteAccountUpdatesShard { pubkey, clock_slot ); - let (stream, unsubscribe) = pubsub_client - .account_subscribe(&pubkey, rpc_account_info_config.clone()) - .await - .map_err(RemoteAccountUpdatesShardError::PubsubClientError)?; + let stream = pool + .subscribe(pubkey) + .await?; account_streams.insert(pubkey, stream); - account_unsubscribes.insert(pubkey, unsubscribe); metrics::set_subscriptions_count(account_streams.len(), &self.shard_id); self.try_to_override_first_subscribed_slot(pubkey, clock_slot); } @@ -164,17 +158,9 @@ impl RemoteAccountUpdatesShard { } } // Cleanup all subscriptions and wait for proper shutdown - for (pubkey, account_unsubscribes) in account_unsubscribes.into_iter() { - info!( - "Shard {}: Account monitoring killed: {:?}", - self.shard_id, pubkey - ); - account_unsubscribes().await; - } - clock_unsubscribe().await; drop(account_streams); drop(clock_stream); - pubsub_client.shutdown().await?; + pool.shutdown().await; info!("Shard {}: Stopped", self.shard_id); // Done Ok(()) @@ -236,3 +222,99 @@ impl RemoteAccountUpdatesShard { } } } + +struct PubsubPool { + clients: Vec, + unusbscribes: HashMap, + config: RpcAccountInfoConfig, +} + +impl PubsubPool { + async fn new( + url: &str, + config: RpcAccountInfoConfig, + ) -> Result { + // 8 is pretty much arbitrary, but a sane value for the number + // of connections per RPC upstream, we don't overcomplicate things + // here, as the whole cloning pipeline will be rewritten quite soon + const CONNECTIONS_PER_POOL: usize = 8; + let mut clients = Vec::with_capacity(CONNECTIONS_PER_POOL); + let mut connections: FuturesUnordered<_> = (0..CONNECTIONS_PER_POOL) + .map(|_| PubSubConnection::new(url)) + .collect(); + while let Some(c) = connections.next().await { + clients.push(c?); + println!("established connection"); + } + Ok(Self { + clients, + unusbscribes: HashMap::new(), + config, + }) + } + + async fn subscribe( + &mut self, + pubkey: Pubkey, + ) -> Result { + let (index, client) = self + .clients + .iter_mut() + .enumerate() + .min_by(|a, b| a.1.subs.cmp(&b.1.subs)) + .expect("clients vec is always greater than 0"); + let (stream, unsubscribe) = client + .inner + .account_subscribe(&pubkey, Some(self.config.clone())) + .await + .map_err(RemoteAccountUpdatesShardError::PubsubClientError)?; + client.subs += 1; + // SAFETY: + // we never drop the PubsubPool before the returned subscription stream + // so the lifetime of the stream can be safely extended to 'static + #[allow(clippy::missing_transmute_annotations)] + let stream = unsafe { std::mem::transmute(stream) }; + self.unusbscribes.insert(pubkey, (index, unsubscribe)); + Ok(stream) + } + + async fn unsubscribe(&mut self, pubkey: &Pubkey) { + let Some((index, callback)) = self.unusbscribes.remove(pubkey) else { + return; + }; + callback().await; + let Some(client) = self.clients.get_mut(index) else { + return; + }; + client.subs = client.subs.saturating_sub(1); + } + + fn subscribed(&mut self, pubkey: &Pubkey) -> bool { + self.unusbscribes.contains_key(pubkey) + } + + async fn shutdown(&mut self) { + // Cleanup all subscriptions and wait for proper shutdown + for (pubkey, (_, callback)) in self.unusbscribes.drain() { + info!("Account monitoring killed: {:?}", pubkey); + callback().await; + } + for client in self.clients.drain(..) { + let _ = client.inner.shutdown().await; + } + } +} + +struct PubSubConnection { + inner: PubsubClient, + subs: usize, +} + +impl PubSubConnection { + async fn new(url: &str) -> Result { + let inner = PubsubClient::new(url) + .await + .map_err(RemoteAccountUpdatesShardError::PubsubClientError)?; + Ok(Self { inner, subs: 0 }) + } +} diff --git a/magicblock-account-updates/tests/remote_account_updates.rs b/magicblock-account-updates/tests/remote_account_updates.rs index 67cf9df3..15c96ef6 100644 --- a/magicblock-account-updates/tests/remote_account_updates.rs +++ b/magicblock-account-updates/tests/remote_account_updates.rs @@ -44,6 +44,8 @@ async fn test_devnet_monitoring_clock_sysvar_changes_over_time() { skip_if_devnet_down!(); // Create account updates worker and client let (client, cancellation_token, worker_handle) = setup(); + // wait a bit for websocket connections to establish + sleep(Duration::from_millis(5_000)).await; // The clock will change every slots, perfect for testing updates let sysvar_clock = clock::ID; // Before starting the monitoring, we should know nothing about the clock @@ -75,6 +77,8 @@ async fn test_devnet_monitoring_multiple_accounts_at_the_same_time() { skip_if_devnet_down!(); // Create account updates worker and client let (client, cancellation_token, worker_handle) = setup(); + // wait a bit for websocket connections to establish + sleep(Duration::from_millis(5_000)).await; // Devnet accounts to be monitored for this test let sysvar_rent = rent::ID; let sysvar_sh = slot_hashes::ID; @@ -106,6 +110,8 @@ async fn test_devnet_monitoring_some_accounts_only() { skip_if_devnet_down!(); // Create account updates worker and client let (client, cancellation_token, worker_handle) = setup(); + // wait a bit for websocket connections to establish + sleep(Duration::from_millis(5_000)).await; // Devnet accounts for this test let sysvar_rent = rent::ID; let sysvar_sh = slot_hashes::ID; From 59aaf663cb23efcd923c9bb3bbedf9a25bc25903 Mon Sep 17 00:00:00 2001 From: Babur Makhmudov <31780624+bmuddha@users.noreply.github.com> Date: Thu, 15 May 2025 12:30:56 +0400 Subject: [PATCH 2/7] Update magicblock-account-updates/src/remote_account_updates_shard.rs Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- magicblock-account-updates/src/remote_account_updates_shard.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/magicblock-account-updates/src/remote_account_updates_shard.rs b/magicblock-account-updates/src/remote_account_updates_shard.rs index d6a44994..a50f7d29 100644 --- a/magicblock-account-updates/src/remote_account_updates_shard.rs +++ b/magicblock-account-updates/src/remote_account_updates_shard.rs @@ -225,7 +225,7 @@ impl RemoteAccountUpdatesShard { struct PubsubPool { clients: Vec, - unusbscribes: HashMap, + unsubscribes: HashMap, config: RpcAccountInfoConfig, } From 00f866b9d8c98d6b3fdcab11acbdff48fb5a9ba9 Mon Sep 17 00:00:00 2001 From: Babur Makhmudov <31780624+bmuddha@users.noreply.github.com> Date: Thu, 15 May 2025 12:31:17 +0400 Subject: [PATCH 3/7] Update magicblock-account-updates/src/remote_account_updates_shard.rs Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- magicblock-account-updates/src/remote_account_updates_shard.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/magicblock-account-updates/src/remote_account_updates_shard.rs b/magicblock-account-updates/src/remote_account_updates_shard.rs index a50f7d29..2d216b7c 100644 --- a/magicblock-account-updates/src/remote_account_updates_shard.rs +++ b/magicblock-account-updates/src/remote_account_updates_shard.rs @@ -88,7 +88,6 @@ impl RemoteAccountUpdatesShard { let mut pool = PubsubPool::new(ws_url, config).await?; // Subscribe to the clock from the RPC (to figure out the latest slot) let mut clock_stream = pool.subscribe(clock::ID).await?; - println!("subscribed to clock subscription"); let mut clock_slot = 0; // We'll store useful maps for each of the account subscriptions let mut account_streams = StreamMap::new(); From 1b8659a61df8d5f50fd264b3a13d1c941e26e7c4 Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Thu, 15 May 2025 12:33:14 +0400 Subject: [PATCH 4/7] fix: remove extra logs and fix typo --- .../src/remote_account_updates_shard.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/magicblock-account-updates/src/remote_account_updates_shard.rs b/magicblock-account-updates/src/remote_account_updates_shard.rs index 2d216b7c..a74ee51a 100644 --- a/magicblock-account-updates/src/remote_account_updates_shard.rs +++ b/magicblock-account-updates/src/remote_account_updates_shard.rs @@ -118,7 +118,6 @@ impl RemoteAccountUpdatesShard { } // When we receive a message to start monitoring an account Some((pubkey, unsub)) = self.monitoring_request_receiver.recv() => { - println!("received subscription"); if unsub { account_streams.remove(&pubkey); metrics::set_subscriptions_count(account_streams.len(), &self.shard_id); @@ -243,11 +242,10 @@ impl PubsubPool { .collect(); while let Some(c) = connections.next().await { clients.push(c?); - println!("established connection"); } Ok(Self { clients, - unusbscribes: HashMap::new(), + unsubscribes: HashMap::new(), config, }) } @@ -273,12 +271,12 @@ impl PubsubPool { // so the lifetime of the stream can be safely extended to 'static #[allow(clippy::missing_transmute_annotations)] let stream = unsafe { std::mem::transmute(stream) }; - self.unusbscribes.insert(pubkey, (index, unsubscribe)); + self.unsubscribes.insert(pubkey, (index, unsubscribe)); Ok(stream) } async fn unsubscribe(&mut self, pubkey: &Pubkey) { - let Some((index, callback)) = self.unusbscribes.remove(pubkey) else { + let Some((index, callback)) = self.unsubscribes.remove(pubkey) else { return; }; callback().await; @@ -289,12 +287,12 @@ impl PubsubPool { } fn subscribed(&mut self, pubkey: &Pubkey) -> bool { - self.unusbscribes.contains_key(pubkey) + self.unsubscribes.contains_key(pubkey) } async fn shutdown(&mut self) { // Cleanup all subscriptions and wait for proper shutdown - for (pubkey, (_, callback)) in self.unusbscribes.drain() { + for (pubkey, (_, callback)) in self.unsubscribes.drain() { info!("Account monitoring killed: {:?}", pubkey); callback().await; } From 731fdf022900112ff9cd307155a285a3e679ffac Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Thu, 15 May 2025 13:35:20 +0400 Subject: [PATCH 5/7] fix(test): add more time to sleep to let connections to be established --- magicblock-account-updates/tests/remote_account_updates.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/magicblock-account-updates/tests/remote_account_updates.rs b/magicblock-account-updates/tests/remote_account_updates.rs index 15c96ef6..a5d028bd 100644 --- a/magicblock-account-updates/tests/remote_account_updates.rs +++ b/magicblock-account-updates/tests/remote_account_updates.rs @@ -45,7 +45,7 @@ async fn test_devnet_monitoring_clock_sysvar_changes_over_time() { // Create account updates worker and client let (client, cancellation_token, worker_handle) = setup(); // wait a bit for websocket connections to establish - sleep(Duration::from_millis(5_000)).await; + sleep(Duration::from_millis(20_000)).await; // The clock will change every slots, perfect for testing updates let sysvar_clock = clock::ID; // Before starting the monitoring, we should know nothing about the clock @@ -78,7 +78,7 @@ async fn test_devnet_monitoring_multiple_accounts_at_the_same_time() { // Create account updates worker and client let (client, cancellation_token, worker_handle) = setup(); // wait a bit for websocket connections to establish - sleep(Duration::from_millis(5_000)).await; + sleep(Duration::from_millis(20_000)).await; // Devnet accounts to be monitored for this test let sysvar_rent = rent::ID; let sysvar_sh = slot_hashes::ID; @@ -111,7 +111,7 @@ async fn test_devnet_monitoring_some_accounts_only() { // Create account updates worker and client let (client, cancellation_token, worker_handle) = setup(); // wait a bit for websocket connections to establish - sleep(Duration::from_millis(5_000)).await; + sleep(Duration::from_millis(20_000)).await; // Devnet accounts for this test let sysvar_rent = rent::ID; let sysvar_sh = slot_hashes::ID; From 957d924e8fe7768ebbc6b591d833fa80e8bc4cb6 Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Thu, 15 May 2025 17:15:29 +0400 Subject: [PATCH 6/7] fix(test): tweak sleep times for monitoring subscriptions --- .../src/remote_account_updates_shard.rs | 2 +- .../tests/remote_account_updates.rs | 25 +++++++------------ 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/magicblock-account-updates/src/remote_account_updates_shard.rs b/magicblock-account-updates/src/remote_account_updates_shard.rs index a74ee51a..370fb758 100644 --- a/magicblock-account-updates/src/remote_account_updates_shard.rs +++ b/magicblock-account-updates/src/remote_account_updates_shard.rs @@ -91,7 +91,6 @@ impl RemoteAccountUpdatesShard { let mut clock_slot = 0; // We'll store useful maps for each of the account subscriptions let mut account_streams = StreamMap::new(); - // rust compiler is not yet smart enough to figure out the exact type const LOG_CLOCK_FREQ: u64 = 100; let mut log_clock_count = 0; @@ -115,6 +114,7 @@ impl RemoteAccountUpdatesShard { } else { warn!("Shard {}: Received empty clock data", self.shard_id); } + self.try_to_override_last_known_update_slot(clock::ID, clock_slot); } // When we receive a message to start monitoring an account Some((pubkey, unsub)) = self.monitoring_request_receiver.recv() => { diff --git a/magicblock-account-updates/tests/remote_account_updates.rs b/magicblock-account-updates/tests/remote_account_updates.rs index a5d028bd..9b8eb9c4 100644 --- a/magicblock-account-updates/tests/remote_account_updates.rs +++ b/magicblock-account-updates/tests/remote_account_updates.rs @@ -14,7 +14,7 @@ use test_tools::skip_if_devnet_down; use tokio::time::sleep; use tokio_util::sync::CancellationToken; -fn setup() -> ( +async fn setup() -> ( RemoteAccountUpdatesClient, CancellationToken, tokio::task::JoinHandle<()>, @@ -22,7 +22,7 @@ fn setup() -> ( // Create account updates worker and client let mut worker = RemoteAccountUpdatesWorker::new( vec![RpcProviderConfig::devnet(), RpcProviderConfig::devnet()], - Duration::from_secs(1), // We constantly refresh stuff to make it struggle + Duration::from_secs(50 * 60), // We the same config as in production ); let client = RemoteAccountUpdatesClient::new(&worker); // Run the worker in a separate task @@ -35,6 +35,8 @@ fn setup() -> ( .await }) }; + // wait a bit for websocket connections to establish + sleep(Duration::from_millis(5_000)).await; // Ready to run (client, cancellation_token, worker_handle) } @@ -43,13 +45,9 @@ fn setup() -> ( async fn test_devnet_monitoring_clock_sysvar_changes_over_time() { skip_if_devnet_down!(); // Create account updates worker and client - let (client, cancellation_token, worker_handle) = setup(); - // wait a bit for websocket connections to establish - sleep(Duration::from_millis(20_000)).await; + let (client, cancellation_token, worker_handle) = setup().await; // The clock will change every slots, perfect for testing updates let sysvar_clock = clock::ID; - // Before starting the monitoring, we should know nothing about the clock - assert!(client.get_last_known_update_slot(&sysvar_clock).is_none()); // Start the monitoring assert!(client .ensure_account_monitoring(&sysvar_clock) @@ -76,9 +74,7 @@ async fn test_devnet_monitoring_clock_sysvar_changes_over_time() { async fn test_devnet_monitoring_multiple_accounts_at_the_same_time() { skip_if_devnet_down!(); // Create account updates worker and client - let (client, cancellation_token, worker_handle) = setup(); - // wait a bit for websocket connections to establish - sleep(Duration::from_millis(20_000)).await; + let (client, cancellation_token, worker_handle) = setup().await; // Devnet accounts to be monitored for this test let sysvar_rent = rent::ID; let sysvar_sh = slot_hashes::ID; @@ -109,9 +105,7 @@ async fn test_devnet_monitoring_multiple_accounts_at_the_same_time() { async fn test_devnet_monitoring_some_accounts_only() { skip_if_devnet_down!(); // Create account updates worker and client - let (client, cancellation_token, worker_handle) = setup(); - // wait a bit for websocket connections to establish - sleep(Duration::from_millis(20_000)).await; + let (client, cancellation_token, worker_handle) = setup().await; // Devnet accounts for this test let sysvar_rent = rent::ID; let sysvar_sh = slot_hashes::ID; @@ -119,7 +113,6 @@ async fn test_devnet_monitoring_some_accounts_only() { // We shouldnt known anything about the accounts until we subscribe assert!(client.get_last_known_update_slot(&sysvar_rent).is_none()); assert!(client.get_last_known_update_slot(&sysvar_sh).is_none()); - assert!(client.get_last_known_update_slot(&sysvar_clock).is_none()); // Start monitoring only some of the accounts assert!(client.ensure_account_monitoring(&sysvar_rent).await.is_ok()); assert!(client.ensure_account_monitoring(&sysvar_sh).await.is_ok()); @@ -128,7 +121,7 @@ async fn test_devnet_monitoring_some_accounts_only() { // Check that we detected the accounts changes only on the accounts we monitored assert!(client.get_last_known_update_slot(&sysvar_rent).is_none()); // Rent doesn't change assert!(client.get_last_known_update_slot(&sysvar_sh).is_some()); - assert!(client.get_last_known_update_slot(&sysvar_clock).is_none()); + assert!(client.get_last_known_update_slot(&sysvar_clock).is_some()); // Cleanup everything correctly cancellation_token.cancel(); assert!(worker_handle.await.is_ok()); @@ -138,7 +131,7 @@ async fn test_devnet_monitoring_some_accounts_only() { async fn test_devnet_monitoring_invalid_and_immutable_and_program_account() { skip_if_devnet_down!(); // Create account updates worker and client - let (client, cancellation_token, worker_handle) = setup(); + let (client, cancellation_token, worker_handle) = setup().await; // Devnet accounts for this test (none of them should change) let new_account = Keypair::new().pubkey(); let system_program = system_program::ID; From 8f955db9152388221ba48f3d6f6d616608778c77 Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Thu, 15 May 2025 19:30:10 +0400 Subject: [PATCH 7/7] fix(test): remove sysvar clock check from assertion --- magicblock-account-updates/tests/remote_account_updates.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/magicblock-account-updates/tests/remote_account_updates.rs b/magicblock-account-updates/tests/remote_account_updates.rs index 9b8eb9c4..aa5cd1a2 100644 --- a/magicblock-account-updates/tests/remote_account_updates.rs +++ b/magicblock-account-updates/tests/remote_account_updates.rs @@ -82,7 +82,6 @@ async fn test_devnet_monitoring_multiple_accounts_at_the_same_time() { // We shouldnt known anything about the accounts until we subscribe assert!(client.get_last_known_update_slot(&sysvar_rent).is_none()); assert!(client.get_last_known_update_slot(&sysvar_sh).is_none()); - assert!(client.get_last_known_update_slot(&sysvar_clock).is_none()); // Start monitoring the accounts now assert!(client.ensure_account_monitoring(&sysvar_rent).await.is_ok()); assert!(client.ensure_account_monitoring(&sysvar_sh).await.is_ok());