Skip to content

Commit 08ae5c7

Browse files
fix: use mutiple connections for ws subscriptions (#365)
This fix rewrites the way account subscriptions work in the cloning pipeline, instead of using a single connection per shard, we now use pool of connections, which are used to distribute subscriptions, giving us more subscription capacity, before threshold imposed by RPC provider is reached. closes gh-362 <!-- greptile_comment --> ## Greptile Summary This PR implements a WebSocket connection pool for account subscriptions to increase capacity before hitting RPC provider limits. The changes replace single-connection architecture with a distributed approach. - Added `PubsubPool` in `remote_account_updates_shard.rs` to manage multiple WebSocket connections (8 per RPC upstream) - Modified test setup in `remote_account_updates.rs` to include 5-second delay for connection establishment - Added connection tracking and subscription distribution logic across pool - Uses unsafe `transmute` for stream lifetime extension which should be reviewed carefully - Implements proper cleanup of subscriptions and connections during shutdown <!-- /greptile_comment --> --------- Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
1 parent 6cda205 commit 08ae5c7

File tree

2 files changed

+126
-48
lines changed

2 files changed

+126
-48
lines changed

magicblock-account-updates/src/remote_account_updates_shard.rs

Lines changed: 117 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ use std::{
66
sync::{Arc, RwLock},
77
};
88

9-
use futures_util::StreamExt;
9+
use futures_util::{stream::FuturesUnordered, Stream, StreamExt};
1010
use log::*;
1111
use magicblock_metrics::metrics;
12-
use solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig};
12+
use solana_account_decoder::{UiAccount, UiAccountEncoding, UiDataSliceConfig};
1313
use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
14-
use solana_rpc_client_api::config::RpcAccountInfoConfig;
14+
use solana_rpc_client_api::{config::RpcAccountInfoConfig, response::Response};
1515
use solana_sdk::{
1616
clock::{Clock, Slot},
1717
commitment_config::{CommitmentConfig, CommitmentLevel},
@@ -23,6 +23,13 @@ use tokio::sync::mpsc::Receiver;
2323
use tokio_stream::StreamMap;
2424
use tokio_util::sync::CancellationToken;
2525

26+
type BoxFn = Box<
27+
dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send,
28+
>;
29+
30+
type SubscriptionStream =
31+
Pin<Box<dyn Stream<Item = Response<UiAccount>> + Send + 'static>>;
32+
2633
#[derive(Debug, Error)]
2734
pub enum RemoteAccountUpdatesShardError {
2835
#[error(transparent)]
@@ -66,11 +73,9 @@ impl RemoteAccountUpdatesShard {
6673
) -> Result<(), RemoteAccountUpdatesShardError> {
6774
// Create a pubsub client
6875
info!("Shard {}: Starting", self.shard_id);
69-
let pubsub_client = PubsubClient::new(&self.url)
70-
.await
71-
.map_err(RemoteAccountUpdatesShardError::PubsubClientError)?;
76+
let ws_url = self.url.as_str();
7277
// For every account, we only want the updates, not the actual content of the accounts
73-
let rpc_account_info_config = Some(RpcAccountInfoConfig {
78+
let config = RpcAccountInfoConfig {
7479
commitment: self
7580
.commitment
7681
.map(|commitment| CommitmentConfig { commitment }),
@@ -80,21 +85,13 @@ impl RemoteAccountUpdatesShard {
8085
length: 0,
8186
}),
8287
min_context_slot: None,
83-
});
88+
};
89+
let mut pool = PubsubPool::new(ws_url, config).await?;
8490
// Subscribe to the clock from the RPC (to figure out the latest slot)
85-
let (mut clock_stream, clock_unsubscribe) = pubsub_client
86-
.account_subscribe(&clock::ID, rpc_account_info_config.clone())
87-
.await
88-
.map_err(RemoteAccountUpdatesShardError::PubsubClientError)?;
91+
let mut clock_stream = pool.subscribe(clock::ID).await?;
8992
let mut clock_slot = 0;
9093
// We'll store useful maps for each of the account subscriptions
9194
let mut account_streams = StreamMap::new();
92-
// rust compiler is not yet smart enough to figure out the exact type
93-
type BoxFn = Box<
94-
dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>
95-
+ Send,
96-
>;
97-
let mut account_unsubscribes: HashMap<Pubkey, BoxFn> = HashMap::new();
9895
const LOG_CLOCK_FREQ: u64 = 100;
9996
let mut log_clock_count = 0;
10097

@@ -118,19 +115,17 @@ impl RemoteAccountUpdatesShard {
118115
} else {
119116
warn!("Shard {}: Received empty clock data", self.shard_id);
120117
}
118+
self.try_to_override_last_known_update_slot(clock::ID, clock_slot);
121119
}
122120
// When we receive a message to start monitoring an account
123121
Some((pubkey, unsub)) = self.monitoring_request_receiver.recv() => {
124122
if unsub {
125-
let Some(request) = account_unsubscribes.remove(&pubkey) else {
126-
continue;
127-
};
128123
account_streams.remove(&pubkey);
129124
metrics::set_subscriptions_count(account_streams.len(), &self.shard_id);
130-
request().await;
125+
pool.unsubscribe(&pubkey).await;
131126
continue;
132127
}
133-
if account_unsubscribes.contains_key(&pubkey) {
128+
if pool.subscribed(&pubkey) {
134129
continue;
135130
}
136131
debug!(
@@ -139,12 +134,10 @@ impl RemoteAccountUpdatesShard {
139134
pubkey,
140135
clock_slot
141136
);
142-
let (stream, unsubscribe) = pubsub_client
143-
.account_subscribe(&pubkey, rpc_account_info_config.clone())
144-
.await
145-
.map_err(RemoteAccountUpdatesShardError::PubsubClientError)?;
137+
let stream = pool
138+
.subscribe(pubkey)
139+
.await?;
146140
account_streams.insert(pubkey, stream);
147-
account_unsubscribes.insert(pubkey, unsubscribe);
148141
metrics::set_subscriptions_count(account_streams.len(), &self.shard_id);
149142
self.try_to_override_first_subscribed_slot(pubkey, clock_slot);
150143
}
@@ -164,17 +157,9 @@ impl RemoteAccountUpdatesShard {
164157
}
165158
}
166159
// Cleanup all subscriptions and wait for proper shutdown
167-
for (pubkey, account_unsubscribes) in account_unsubscribes.into_iter() {
168-
info!(
169-
"Shard {}: Account monitoring killed: {:?}",
170-
self.shard_id, pubkey
171-
);
172-
account_unsubscribes().await;
173-
}
174-
clock_unsubscribe().await;
175160
drop(account_streams);
176161
drop(clock_stream);
177-
pubsub_client.shutdown().await?;
162+
pool.shutdown().await;
178163
info!("Shard {}: Stopped", self.shard_id);
179164
// Done
180165
Ok(())
@@ -236,3 +221,98 @@ impl RemoteAccountUpdatesShard {
236221
}
237222
}
238223
}
224+
225+
struct PubsubPool {
226+
clients: Vec<PubSubConnection>,
227+
unsubscribes: HashMap<Pubkey, (usize, BoxFn)>,
228+
config: RpcAccountInfoConfig,
229+
}
230+
231+
impl PubsubPool {
232+
async fn new(
233+
url: &str,
234+
config: RpcAccountInfoConfig,
235+
) -> Result<Self, RemoteAccountUpdatesShardError> {
236+
// 8 is pretty much arbitrary, but a sane value for the number
237+
// of connections per RPC upstream, we don't overcomplicate things
238+
// here, as the whole cloning pipeline will be rewritten quite soon
239+
const CONNECTIONS_PER_POOL: usize = 8;
240+
let mut clients = Vec::with_capacity(CONNECTIONS_PER_POOL);
241+
let mut connections: FuturesUnordered<_> = (0..CONNECTIONS_PER_POOL)
242+
.map(|_| PubSubConnection::new(url))
243+
.collect();
244+
while let Some(c) = connections.next().await {
245+
clients.push(c?);
246+
}
247+
Ok(Self {
248+
clients,
249+
unsubscribes: HashMap::new(),
250+
config,
251+
})
252+
}
253+
254+
async fn subscribe(
255+
&mut self,
256+
pubkey: Pubkey,
257+
) -> Result<SubscriptionStream, RemoteAccountUpdatesShardError> {
258+
let (index, client) = self
259+
.clients
260+
.iter_mut()
261+
.enumerate()
262+
.min_by(|a, b| a.1.subs.cmp(&b.1.subs))
263+
.expect("clients vec is always greater than 0");
264+
let (stream, unsubscribe) = client
265+
.inner
266+
.account_subscribe(&pubkey, Some(self.config.clone()))
267+
.await
268+
.map_err(RemoteAccountUpdatesShardError::PubsubClientError)?;
269+
client.subs += 1;
270+
// SAFETY:
271+
// we never drop the PubsubPool before the returned subscription stream
272+
// so the lifetime of the stream can be safely extended to 'static
273+
#[allow(clippy::missing_transmute_annotations)]
274+
let stream = unsafe { std::mem::transmute(stream) };
275+
self.unsubscribes.insert(pubkey, (index, unsubscribe));
276+
Ok(stream)
277+
}
278+
279+
async fn unsubscribe(&mut self, pubkey: &Pubkey) {
280+
let Some((index, callback)) = self.unsubscribes.remove(pubkey) else {
281+
return;
282+
};
283+
callback().await;
284+
let Some(client) = self.clients.get_mut(index) else {
285+
return;
286+
};
287+
client.subs = client.subs.saturating_sub(1);
288+
}
289+
290+
fn subscribed(&mut self, pubkey: &Pubkey) -> bool {
291+
self.unsubscribes.contains_key(pubkey)
292+
}
293+
294+
async fn shutdown(&mut self) {
295+
// Cleanup all subscriptions and wait for proper shutdown
296+
for (pubkey, (_, callback)) in self.unsubscribes.drain() {
297+
info!("Account monitoring killed: {:?}", pubkey);
298+
callback().await;
299+
}
300+
for client in self.clients.drain(..) {
301+
let _ = client.inner.shutdown().await;
302+
}
303+
}
304+
}
305+
306+
struct PubSubConnection {
307+
inner: PubsubClient,
308+
subs: usize,
309+
}
310+
311+
impl PubSubConnection {
312+
async fn new(url: &str) -> Result<Self, RemoteAccountUpdatesShardError> {
313+
let inner = PubsubClient::new(url)
314+
.await
315+
.map_err(RemoteAccountUpdatesShardError::PubsubClientError)?;
316+
Ok(Self { inner, subs: 0 })
317+
}
318+
}

magicblock-account-updates/tests/remote_account_updates.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use test_tools::skip_if_devnet_down;
1414
use tokio::time::sleep;
1515
use tokio_util::sync::CancellationToken;
1616

17-
fn setup() -> (
17+
async fn setup() -> (
1818
RemoteAccountUpdatesClient,
1919
CancellationToken,
2020
tokio::task::JoinHandle<()>,
@@ -23,7 +23,7 @@ fn setup() -> (
2323
let mut worker = RemoteAccountUpdatesWorker::new(
2424
vec![RpcProviderConfig::devnet().ws_url().into(); 2],
2525
Some(solana_sdk::commitment_config::CommitmentLevel::Confirmed),
26-
Duration::from_secs(1), // We constantly refresh stuff to make it struggle
26+
Duration::from_secs(50 * 60),
2727
);
2828
let client = RemoteAccountUpdatesClient::new(&worker);
2929
// Run the worker in a separate task
@@ -36,6 +36,8 @@ fn setup() -> (
3636
.await
3737
})
3838
};
39+
// wait a bit for websocket connections to establish
40+
sleep(Duration::from_millis(5_000)).await;
3941
// Ready to run
4042
(client, cancellation_token, worker_handle)
4143
}
@@ -44,11 +46,9 @@ fn setup() -> (
4446
async fn test_devnet_monitoring_clock_sysvar_changes_over_time() {
4547
skip_if_devnet_down!();
4648
// Create account updates worker and client
47-
let (client, cancellation_token, worker_handle) = setup();
49+
let (client, cancellation_token, worker_handle) = setup().await;
4850
// The clock will change every slots, perfect for testing updates
4951
let sysvar_clock = clock::ID;
50-
// Before starting the monitoring, we should know nothing about the clock
51-
assert!(client.get_last_known_update_slot(&sysvar_clock).is_none());
5252
// Start the monitoring
5353
assert!(client
5454
.ensure_account_monitoring(&sysvar_clock)
@@ -75,15 +75,14 @@ async fn test_devnet_monitoring_clock_sysvar_changes_over_time() {
7575
async fn test_devnet_monitoring_multiple_accounts_at_the_same_time() {
7676
skip_if_devnet_down!();
7777
// Create account updates worker and client
78-
let (client, cancellation_token, worker_handle) = setup();
78+
let (client, cancellation_token, worker_handle) = setup().await;
7979
// Devnet accounts to be monitored for this test
8080
let sysvar_rent = rent::ID;
8181
let sysvar_sh = slot_hashes::ID;
8282
let sysvar_clock = clock::ID;
8383
// We shouldnt known anything about the accounts until we subscribe
8484
assert!(client.get_last_known_update_slot(&sysvar_rent).is_none());
8585
assert!(client.get_last_known_update_slot(&sysvar_sh).is_none());
86-
assert!(client.get_last_known_update_slot(&sysvar_clock).is_none());
8786
// Start monitoring the accounts now
8887
assert!(client.ensure_account_monitoring(&sysvar_rent).await.is_ok());
8988
assert!(client.ensure_account_monitoring(&sysvar_sh).await.is_ok());
@@ -106,15 +105,14 @@ async fn test_devnet_monitoring_multiple_accounts_at_the_same_time() {
106105
async fn test_devnet_monitoring_some_accounts_only() {
107106
skip_if_devnet_down!();
108107
// Create account updates worker and client
109-
let (client, cancellation_token, worker_handle) = setup();
108+
let (client, cancellation_token, worker_handle) = setup().await;
110109
// Devnet accounts for this test
111110
let sysvar_rent = rent::ID;
112111
let sysvar_sh = slot_hashes::ID;
113112
let sysvar_clock = clock::ID;
114113
// We shouldnt known anything about the accounts until we subscribe
115114
assert!(client.get_last_known_update_slot(&sysvar_rent).is_none());
116115
assert!(client.get_last_known_update_slot(&sysvar_sh).is_none());
117-
assert!(client.get_last_known_update_slot(&sysvar_clock).is_none());
118116
// Start monitoring only some of the accounts
119117
assert!(client.ensure_account_monitoring(&sysvar_rent).await.is_ok());
120118
assert!(client.ensure_account_monitoring(&sysvar_sh).await.is_ok());
@@ -123,7 +121,7 @@ async fn test_devnet_monitoring_some_accounts_only() {
123121
// Check that we detected the accounts changes only on the accounts we monitored
124122
assert!(client.get_last_known_update_slot(&sysvar_rent).is_none()); // Rent doesn't change
125123
assert!(client.get_last_known_update_slot(&sysvar_sh).is_some());
126-
assert!(client.get_last_known_update_slot(&sysvar_clock).is_none());
124+
assert!(client.get_last_known_update_slot(&sysvar_clock).is_some());
127125
// Cleanup everything correctly
128126
cancellation_token.cancel();
129127
assert!(worker_handle.await.is_ok());
@@ -133,7 +131,7 @@ async fn test_devnet_monitoring_some_accounts_only() {
133131
async fn test_devnet_monitoring_invalid_and_immutable_and_program_account() {
134132
skip_if_devnet_down!();
135133
// Create account updates worker and client
136-
let (client, cancellation_token, worker_handle) = setup();
134+
let (client, cancellation_token, worker_handle) = setup().await;
137135
// Devnet accounts for this test (none of them should change)
138136
let new_account = Keypair::new().pubkey();
139137
let system_program = system_program::ID;

0 commit comments

Comments
 (0)