Skip to content

Commit 6cda205

Browse files
feat: allow for multiple ws endpoints in remote config (#372)
<!-- greptile_comment --> ## Greptile Summary This PR adds support for multiple WebSocket endpoints in the remote configuration, enabling more flexible validator setup and improved connection management. - Added new `CustomWithMultipleWs` variant in `magicblock-config/src/accounts.rs` to handle multiple WebSocket URLs - Modified `RemoteAccountUpdatesShard` and `RemoteAccountUpdatesWorker` to use direct URL and commitment fields instead of `RpcProviderConfig` - Updated `magicblock-mutator/src/cluster.rs` needs improvement in WebSocket URL handling and error cases - Added test coverage in `magicblock-config/tests/parse_config.rs` for multiple WebSocket configurations - Simplified configuration parsing by removing custom deserialization in favor of serde defaults <!-- /greptile_comment --> --------- Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
1 parent 5512f23 commit 6cda205

File tree

9 files changed

+106
-115
lines changed

9 files changed

+106
-115
lines changed

magicblock-account-updates/src/remote_account_updates_shard.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use std::{
66
sync::{Arc, RwLock},
77
};
88

9-
use conjunto_transwise::RpcProviderConfig;
109
use futures_util::StreamExt;
1110
use log::*;
1211
use magicblock_metrics::metrics;
@@ -15,7 +14,7 @@ use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
1514
use solana_rpc_client_api::config::RpcAccountInfoConfig;
1615
use solana_sdk::{
1716
clock::{Clock, Slot},
18-
commitment_config::CommitmentConfig,
17+
commitment_config::{CommitmentConfig, CommitmentLevel},
1918
pubkey::Pubkey,
2019
sysvar::clock,
2120
};
@@ -35,7 +34,8 @@ pub enum RemoteAccountUpdatesShardError {
3534

3635
pub struct RemoteAccountUpdatesShard {
3736
shard_id: String,
38-
rpc_provider_config: RpcProviderConfig,
37+
url: String,
38+
commitment: Option<CommitmentLevel>,
3939
monitoring_request_receiver: Receiver<(Pubkey, bool)>,
4040
first_subscribed_slots: Arc<RwLock<HashMap<Pubkey, Slot>>>,
4141
last_known_update_slots: Arc<RwLock<HashMap<Pubkey, Slot>>>,
@@ -44,14 +44,16 @@ pub struct RemoteAccountUpdatesShard {
4444
impl RemoteAccountUpdatesShard {
4545
pub fn new(
4646
shard_id: String,
47-
rpc_provider_config: RpcProviderConfig,
47+
url: String,
48+
commitment: Option<CommitmentLevel>,
4849
monitoring_request_receiver: Receiver<(Pubkey, bool)>,
4950
first_subscribed_slots: Arc<RwLock<HashMap<Pubkey, Slot>>>,
5051
last_known_update_slots: Arc<RwLock<HashMap<Pubkey, Slot>>>,
5152
) -> Self {
5253
Self {
5354
shard_id,
54-
rpc_provider_config,
55+
url,
56+
commitment,
5557
monitoring_request_receiver,
5658
first_subscribed_slots,
5759
last_known_update_slots,
@@ -64,15 +66,13 @@ impl RemoteAccountUpdatesShard {
6466
) -> Result<(), RemoteAccountUpdatesShardError> {
6567
// Create a pubsub client
6668
info!("Shard {}: Starting", self.shard_id);
67-
let ws_url = self.rpc_provider_config.ws_url();
68-
let pubsub_client = PubsubClient::new(ws_url)
69+
let pubsub_client = PubsubClient::new(&self.url)
6970
.await
7071
.map_err(RemoteAccountUpdatesShardError::PubsubClientError)?;
7172
// For every account, we only want the updates, not the actual content of the accounts
7273
let rpc_account_info_config = Some(RpcAccountInfoConfig {
7374
commitment: self
74-
.rpc_provider_config
75-
.commitment()
75+
.commitment
7676
.map(|commitment| CommitmentConfig { commitment }),
7777
encoding: Some(UiAccountEncoding::Base64),
7878
data_slice: Some(UiDataSliceConfig {

magicblock-account-updates/src/remote_account_updates_worker.rs

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ use std::{
77
time::Duration,
88
};
99

10-
use conjunto_transwise::RpcProviderConfig;
1110
use log::*;
12-
use solana_sdk::{clock::Slot, pubkey::Pubkey};
11+
use solana_sdk::{
12+
clock::Slot, commitment_config::CommitmentLevel, pubkey::Pubkey,
13+
};
1314
use thiserror::Error;
1415
use tokio::{
1516
sync::mpsc::{channel, Receiver, Sender},
@@ -42,7 +43,8 @@ struct RemoteAccountUpdatesWorkerRunner {
4243
}
4344

4445
pub struct RemoteAccountUpdatesWorker {
45-
rpc_provider_configs: Vec<RpcProviderConfig>,
46+
ws_urls: Vec<String>,
47+
commitment: Option<CommitmentLevel>,
4648
refresh_interval: Duration,
4749
monitoring_request_receiver: Receiver<(Pubkey, bool)>,
4850
monitoring_request_sender: Sender<(Pubkey, bool)>,
@@ -52,13 +54,15 @@ pub struct RemoteAccountUpdatesWorker {
5254

5355
impl RemoteAccountUpdatesWorker {
5456
pub fn new(
55-
rpc_provider_configs: Vec<RpcProviderConfig>,
57+
ws_urls: Vec<String>,
58+
commitment: Option<CommitmentLevel>,
5659
refresh_interval: Duration,
5760
) -> Self {
5861
let (monitoring_request_sender, monitoring_request_receiver) =
5962
channel(INFLIGHT_ACCOUNT_FETCHES_LIMIT);
6063
Self {
61-
rpc_provider_configs,
64+
ws_urls,
65+
commitment,
6266
refresh_interval,
6367
monitoring_request_receiver,
6468
monitoring_request_sender,
@@ -91,13 +95,12 @@ impl RemoteAccountUpdatesWorker {
9195
let mut runners = vec![];
9296
let mut monitored_accounts = HashSet::new();
9397
// Initialize all the runners for all configs
94-
for (index, rpc_provider_config) in
95-
self.rpc_provider_configs.iter().enumerate()
96-
{
98+
for (index, url) in self.ws_urls.iter().enumerate() {
9799
runners.push(
98100
self.create_runner_from_config(
99101
index,
100-
rpc_provider_config.clone(),
102+
url.clone(),
103+
self.commitment,
101104
&monitored_accounts,
102105
)
103106
.await,
@@ -127,14 +130,15 @@ impl RemoteAccountUpdatesWorker {
127130
}
128131
// Periodically we refresh runners to keep them fresh
129132
_ = refresh_interval.tick() => {
130-
current_refresh_index = (current_refresh_index + 1) % self.rpc_provider_configs.len();
131-
let rpc_provider_config = self.rpc_provider_configs
133+
current_refresh_index = (current_refresh_index + 1) % self.ws_urls.len();
134+
let url = self.ws_urls
132135
.get(current_refresh_index)
133136
.unwrap()
134137
.clone();
135138
let new_runner = self.create_runner_from_config(
136139
current_refresh_index,
137-
rpc_provider_config,
140+
url,
141+
self.commitment,
138142
&monitored_accounts
139143
).await;
140144
let old_runner = std::mem::replace(&mut runners[current_refresh_index], new_runner);
@@ -157,7 +161,8 @@ impl RemoteAccountUpdatesWorker {
157161
async fn create_runner_from_config(
158162
&self,
159163
index: usize,
160-
rpc_provider_config: RpcProviderConfig,
164+
url: String,
165+
commitment: Option<CommitmentLevel>,
161166
monitored_accounts: &HashSet<Pubkey>,
162167
) -> RemoteAccountUpdatesWorkerRunner {
163168
let (monitoring_request_sender, monitoring_request_receiver) =
@@ -171,7 +176,8 @@ impl RemoteAccountUpdatesWorker {
171176
let join_handle = tokio::spawn(async move {
172177
let mut shard = RemoteAccountUpdatesShard::new(
173178
shard_id.clone(),
174-
rpc_provider_config,
179+
url,
180+
commitment,
175181
monitoring_request_receiver,
176182
first_subscribed_slots,
177183
last_known_update_slots,

magicblock-account-updates/tests/remote_account_updates.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ fn setup() -> (
2121
) {
2222
// Create account updates worker and client
2323
let mut worker = RemoteAccountUpdatesWorker::new(
24-
vec![RpcProviderConfig::devnet(), RpcProviderConfig::devnet()],
24+
vec![RpcProviderConfig::devnet().ws_url().into(); 2],
25+
Some(solana_sdk::commitment_config::CommitmentLevel::Confirmed),
2526
Duration::from_secs(1), // We constantly refresh stuff to make it struggle
2627
);
2728
let client = RemoteAccountUpdatesClient::new(&worker);

magicblock-accounts/src/utils/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ pub fn try_rpc_cluster_from_cluster(
3333
Cluster::CustomWithWs(http, ws) => {
3434
Ok(RpcCluster::Custom(http.to_string(), ws.to_string()))
3535
}
36+
Cluster::CustomWithMultipleWs { http, ws } => {
37+
Ok(RpcCluster::Custom(http.to_string(), ws[0].to_string()))
38+
}
3639
}
3740
}
3841

magicblock-api/src/external_config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ pub(crate) fn cluster_from_remote(
3030
CustomWithWs(http, ws) => {
3131
Cluster::CustomWithWs(http.clone(), ws.clone())
3232
}
33+
CustomWithMultipleWs { http, ws } => Cluster::CustomWithMultipleWs {
34+
http: http.clone(),
35+
ws: ws.clone(),
36+
},
3337
}
3438
}
3539

magicblock-api/src/magic_validator.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -271,12 +271,8 @@ impl MagicValidator {
271271
RemoteAccountFetcherWorker::new(remote_rpc_config.clone());
272272

273273
let remote_account_updates_worker = RemoteAccountUpdatesWorker::new(
274-
// We'll maintain 3 connections constantly (those could be on different nodes if we wanted to)
275-
vec![
276-
remote_rpc_config.clone(),
277-
remote_rpc_config.clone(),
278-
remote_rpc_config.clone(),
279-
],
274+
accounts_config.remote_cluster.ws_urls(),
275+
remote_rpc_config.commitment(),
280276
// We'll kill/refresh one connection every 50 minutes
281277
Duration::from_secs(60 * 50),
282278
);

magicblock-config/src/accounts.rs

Lines changed: 9 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
1-
use std::{fmt, str::FromStr};
1+
use std::str::FromStr;
22

33
use magicblock_accounts_db::config::AccountsDbConfig;
4-
use serde::{
5-
de::{self, Deserializer, SeqAccess, Visitor},
6-
Deserialize, Serialize,
7-
};
4+
use serde::{Deserialize, Serialize};
85
use solana_sdk::{native_token::LAMPORTS_PER_SOL, pubkey::Pubkey};
96
use strum_macros::EnumString;
107
use url::Url;
@@ -62,90 +59,17 @@ pub enum RemoteConfig {
6259
#[serde(alias = "local")]
6360
#[serde(alias = "localhost")]
6461
Development,
65-
#[serde(untagged, deserialize_with = "deserialize_url")]
62+
#[serde(untagged)]
6663
Custom(Url),
67-
#[serde(untagged, deserialize_with = "deserialize_tuple_url")]
64+
#[serde(untagged)]
6865
CustomWithWs(Url, Url),
66+
#[serde(untagged)]
67+
CustomWithMultipleWs {
68+
http: Url,
69+
ws: Vec<Url>,
70+
},
6971
}
7072

71-
pub fn deserialize_url<'de, D>(deserializer: D) -> Result<Url, D::Error>
72-
where
73-
D: Deserializer<'de>,
74-
{
75-
struct UrlVisitor;
76-
77-
impl Visitor<'_> for UrlVisitor {
78-
type Value = Url;
79-
80-
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
81-
formatter.write_str("a valid URL string")
82-
}
83-
84-
fn visit_str<E>(self, value: &str) -> Result<Url, E>
85-
where
86-
E: de::Error,
87-
{
88-
Url::parse(value).map_err(|e| {
89-
// The error returned here by serde is a bit unhelpful so we help out
90-
// by logging a bit more information.
91-
eprintln!(
92-
"RemoteConfig encountered invalid URL '{value}', err: ({e}).",
93-
);
94-
de::Error::custom(e)
95-
})
96-
}
97-
}
98-
99-
deserializer.deserialize_str(UrlVisitor)
100-
}
101-
102-
pub fn deserialize_tuple_url<'de, D>(
103-
deserializer: D,
104-
) -> Result<(Url, Url), D::Error>
105-
where
106-
D: Deserializer<'de>,
107-
{
108-
struct UrlTupleVisitor;
109-
110-
impl<'de> Visitor<'de> for UrlTupleVisitor {
111-
type Value = (Url, Url);
112-
113-
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
114-
formatter.write_str("a sequence of two URL strings")
115-
}
116-
117-
fn visit_seq<A>(self, mut seq: A) -> Result<(Url, Url), A::Error>
118-
where
119-
A: SeqAccess<'de>,
120-
{
121-
let first: String = seq.next_element()?.ok_or_else(|| {
122-
eprintln!("expected a sequence of two URLs: http and ws");
123-
de::Error::invalid_length(0, &self)
124-
})?;
125-
let second: String = seq.next_element()?.ok_or_else(|| {
126-
eprintln!("expected a sequence of two URLs: http and ws");
127-
de::Error::invalid_length(1, &self)
128-
})?;
129-
130-
let http = Url::parse(&first).map_err(|e| {
131-
eprintln!(
132-
"Invalid HTTP URL in RemoteConfig '{first}', err: ({e}).",
133-
);
134-
de::Error::custom(e)
135-
})?;
136-
let ws = Url::parse(&second).map_err(|e| {
137-
eprintln!(
138-
"Invalid WS URL in RemoteConfig '{second}', err: ({e}).",
139-
);
140-
de::Error::custom(e)
141-
})?;
142-
143-
Ok((http, ws))
144-
}
145-
}
146-
147-
deserializer.deserialize_seq(UrlTupleVisitor)
148-
}
14973
// -----------------
15074
// LifecycleMode
15175
// -----------------

magicblock-config/tests/parse_config.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,3 +244,15 @@ payer = { init_sol = 2000, init_lamports = 300_000 }
244244
let config = toml::from_str::<EphemeralConfig>(toml).unwrap();
245245
assert!(config.accounts.payer.try_init_lamports().is_err());
246246
}
247+
248+
#[test]
249+
fn test_custom_remote_with_multiple_ws() {
250+
let toml = r#"
251+
[accounts]
252+
remote = { http = "http://localhost:8899", ws = ["ws://awesomews1.com:933", "wss://awesomews2.com:944"] }
253+
"#;
254+
255+
let res = toml::from_str::<EphemeralConfig>(toml);
256+
println!("{res:?}");
257+
assert!(res.is_ok());
258+
}

magicblock-mutator/src/cluster.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ pub const MAINNET_URL: &str = "https://api.mainnet-beta.solana.com";
66
pub const DEVNET_URL: &str = "https://api.devnet.solana.com";
77
pub const DEVELOPMENT_URL: &str = "http://127.0.0.1:8899";
88

9+
const WS_MAINNET: &str = "wss://api.mainnet-beta.solana.com/";
10+
const WS_TESTNET: &str = "wss://api.testnet.solana.com/";
11+
pub const WS_DEVNET: &str = "wss://api.devnet.solana.com/";
12+
const WS_DEVELOPMENT: &str = "ws://localhost:8900";
13+
914
/// TODO(vbrunet)
1015
/// - this probably belong in a different crate, "mutator" is specific to the data dump mechanisms
1116
/// - conjunto_addresses::cluster::RpcCluster already achieve this and is a full duplicate
@@ -15,6 +20,7 @@ pub enum Cluster {
1520
Known(ClusterType),
1621
Custom(Url),
1722
CustomWithWs(Url, Url),
23+
CustomWithMultipleWs { http: Url, ws: Vec<Url> },
1824
}
1925

2026
impl From<ClusterType> for Cluster {
@@ -35,6 +41,45 @@ impl Cluster {
3541
},
3642
Cluster::Custom(url) => url.as_str(),
3743
Cluster::CustomWithWs(url, _) => url.as_str(),
44+
Cluster::CustomWithMultipleWs { http, .. } => http.as_str(),
45+
}
46+
}
47+
48+
pub fn ws_urls(&self) -> Vec<String> {
49+
use ClusterType::*;
50+
const WS_SHARD_COUNT: usize = 3;
51+
match self {
52+
Cluster::Known(cluster) => vec![
53+
match cluster {
54+
Testnet => WS_TESTNET.into(),
55+
MainnetBeta => WS_MAINNET.into(),
56+
Devnet => WS_DEVNET.into(),
57+
Development => WS_DEVELOPMENT.into(),
58+
};
59+
WS_SHARD_COUNT
60+
],
61+
Cluster::Custom(url) => {
62+
let mut ws_url = url.clone();
63+
ws_url
64+
.set_scheme(if url.scheme() == "https" {
65+
"wss"
66+
} else {
67+
"ws"
68+
})
69+
.expect("valid scheme");
70+
if let Some(port) = ws_url.port() {
71+
ws_url
72+
.set_port(Some(port + 1))
73+
.expect("valid url with port");
74+
}
75+
vec![ws_url.to_string(); WS_SHARD_COUNT]
76+
}
77+
Cluster::CustomWithWs(_, ws) => {
78+
vec![ws.to_string(); WS_SHARD_COUNT]
79+
}
80+
Cluster::CustomWithMultipleWs { ws, .. } => {
81+
ws.iter().map(Url::to_string).collect()
82+
}
3883
}
3984
}
4085
}

0 commit comments

Comments
 (0)