Skip to content

Commit bacb5c0

Browse files
apollo_network: using propeller in network stress test
1 parent 2907c4c commit bacb5c0

File tree

7 files changed

+248
-16
lines changed

7 files changed

+248
-16
lines changed

crates/apollo_network/src/bin/broadcast_network_stress_test_node/args.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ pub enum NetworkProtocol {
3333
/// Use Reversed SQMR where receivers initiate requests to broadcasters
3434
#[value(name = "reversed-sqmr")]
3535
ReveresedSqmr,
36+
/// Use Propeller for leader-based erasure-coded broadcasting
37+
#[value(name = "propeller")]
38+
Propeller,
3639
}
3740

3841
impl Display for Mode {
@@ -47,6 +50,41 @@ impl Display for NetworkProtocol {
4750
}
4851
}
4952

53+
impl NetworkProtocol {
54+
/// Validates protocol-specific requirements for the given message size.
55+
/// Returns an error message if validation fails, or Ok(()) if validation passes.
56+
pub fn validate_message_size(&self, message_size_bytes: Option<usize>) -> Result<(), String> {
57+
match self {
58+
NetworkProtocol::Propeller => {
59+
if let Some(message_size) = message_size_bytes {
60+
let default_config = libp2p::propeller::Config::default();
61+
let shreds =
62+
default_config.fec_coding_shreds() + default_config.fec_data_shreds();
63+
if message_size % shreds != 0 {
64+
return Err(format!(
65+
"Propeller protocol requires message size to be a multiple of 64 \
66+
bytes, got {}",
67+
message_size
68+
));
69+
}
70+
let shred_size = message_size / shreds;
71+
if !(shred_size).is_multiple_of(2) {
72+
return Err(format!(
73+
"Propeller protocol requires shred size to be a multiple of 2, got {}",
74+
shred_size
75+
));
76+
}
77+
}
78+
Ok(())
79+
}
80+
NetworkProtocol::Gossipsub | NetworkProtocol::Sqmr | NetworkProtocol::ReveresedSqmr => {
81+
// These protocols don't have specific message size requirements
82+
Ok(())
83+
}
84+
}
85+
}
86+
}
87+
5088
#[derive(Parser, Debug, Clone)]
5189
#[command(version, about, long_about = None)]
5290
pub struct Args {

crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4949
*METADATA_SIZE
5050
);
5151

52+
// Protocol-specific validation
53+
if let Err(validation_error) =
54+
args.network_protocol.validate_message_size(args.message_size_bytes)
55+
{
56+
panic!("{}", validation_error);
57+
}
58+
5259
// Set up metrics
5360
let builder = PrometheusBuilder::new().with_http_listener(SocketAddr::V4(SocketAddrV4::new(
5461
Ipv4Addr::UNSPECIFIED,

crates/apollo_network/src/bin/broadcast_network_stress_test_node/message_handling.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ use apollo_network::network_manager::{
22
BroadcastTopicClient,
33
BroadcastTopicClientTrait,
44
BroadcastTopicServer,
5+
PropellerClient,
6+
PropellerClientTrait,
7+
PropellerMessageServer,
8+
ReceivedPropellerMessage,
59
SqmrClientSender,
610
SqmrServerReceiver,
711
};
@@ -16,6 +20,30 @@ pub enum MessageSender {
1620
Gossipsub(BroadcastTopicClient<TopicType>),
1721
Sqmr(SqmrClientSender<TopicType, TopicType>),
1822
ReveresedSqmr(ReveresedSqmrSender),
23+
Propeller(PropellerSender),
24+
}
25+
26+
/// Wrapper for Propeller client that handles message ID generation
27+
pub struct PropellerSender {
28+
client: PropellerClient<TopicType>,
29+
next_message_id: u64,
30+
}
31+
32+
impl PropellerSender {
33+
pub fn new(client: PropellerClient<TopicType>) -> Self {
34+
Self { client, next_message_id: 1 }
35+
}
36+
37+
async fn send_message(&mut self, message: TopicType) {
38+
let message_id = self.next_message_id;
39+
self.next_message_id += 1;
40+
41+
if let Err(e) = self.client.send_message(message, message_id).await {
42+
error!("Failed to send Propeller message {}: {:?}", message_id, e);
43+
} else {
44+
trace!("Sent Propeller message with ID: {}", message_id);
45+
}
46+
}
1947
}
2048

2149
/// Wrapper for ReveresedSqmr that maintains the last active query
@@ -88,6 +116,9 @@ impl MessageSender {
88116
// Then broadcast the message to all active queries
89117
sender.broadcast_to_queries(message).await;
90118
}
119+
MessageSender::Propeller(sender) => {
120+
sender.send_message(message).await;
121+
}
91122
}
92123
}
93124
}
@@ -96,6 +127,7 @@ pub enum MessageReceiver {
96127
Gossipsub(BroadcastTopicServer<TopicType>),
97128
Sqmr(SqmrServerReceiver<TopicType, TopicType>),
98129
ReveresedSqmr(SqmrClientSender<TopicType, TopicType>),
130+
Propeller(PropellerMessageServer<TopicType>),
99131
}
100132

101133
impl MessageReceiver {
@@ -146,6 +178,26 @@ impl MessageReceiver {
146178
}
147179
}
148180
},
181+
MessageReceiver::Propeller(receiver) => {
182+
receiver
183+
.for_each(
184+
|(message_id, result): ReceivedPropellerMessage<TopicType>| async move {
185+
match result {
186+
Ok(message) => {
187+
trace!("Received Propeller message with ID: {}", message_id);
188+
f(message);
189+
}
190+
Err(e) => {
191+
error!(
192+
"Failed to deserialize Propeller message {}: {:?}",
193+
message_id, e
194+
);
195+
}
196+
}
197+
},
198+
)
199+
.await
200+
}
149201
}
150202
}
151203
}

crates/apollo_network/src/bin/broadcast_network_stress_test_node/network_channels.rs

Lines changed: 114 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,73 @@ use apollo_network::network_manager::{
33
BroadcastTopicClient,
44
BroadcastTopicServer,
55
NetworkManager,
6+
PropellerChannels,
7+
PropellerClient,
8+
PropellerMessageServer,
69
SqmrClientSender,
710
SqmrServerReceiver,
811
};
912
use apollo_network::NetworkConfig;
1013

1114
use crate::args::NetworkProtocol;
12-
use crate::message_handling::{MessageReceiver, MessageSender};
15+
use crate::message_handling::{MessageReceiver, MessageSender, PropellerSender};
1316
use crate::metrics::{create_network_metrics, TOPIC};
1417

1518
pub type TopicType = Vec<u8>;
1619

1720
pub const SQMR_PROTOCOL_NAME: &str = "/stress-test/1.0.0";
1821

22+
/// Creates peer configurations for Propeller protocol from bootstrap addresses
23+
/// Each peer gets equal weight (1000) for simplicity
24+
fn create_propeller_peers_from_bootstrap(bootstrap_addrs: &[String]) -> Vec<(libp2p::PeerId, u64)> {
25+
use std::str::FromStr;
26+
27+
use libp2p::Multiaddr;
28+
29+
bootstrap_addrs
30+
.iter()
31+
.filter_map(|addr_str| {
32+
// Parse the multiaddr and extract the peer ID
33+
match Multiaddr::from_str(addr_str.trim()) {
34+
Ok(multiaddr) => {
35+
// Extract peer ID from the multiaddr (last component should be /p2p/<peer_id>)
36+
for protocol in multiaddr.iter() {
37+
if let libp2p::multiaddr::Protocol::P2p(peer_id) = protocol {
38+
return Some((peer_id, 1000)); // Equal weight for all peers
39+
}
40+
}
41+
tracing::warn!("No peer ID found in bootstrap address: {}", addr_str);
42+
None
43+
}
44+
Err(e) => {
45+
tracing::error!("Failed to parse bootstrap address '{}': {}", addr_str, e);
46+
None
47+
}
48+
}
49+
})
50+
.collect()
51+
}
52+
53+
/// Fallback function to create peer configurations when no bootstrap addresses are provided
54+
/// This creates deterministic peer IDs from node indices for testing
55+
fn create_propeller_peers_fallback(num_nodes: u64) -> Vec<(libp2p::PeerId, u64)> {
56+
use libp2p::identity::Keypair;
57+
58+
(0..num_nodes)
59+
.map(|node_id| {
60+
// Create deterministic peer ID from node ID (same as stress test node)
61+
let mut private_key = [0u8; 32];
62+
private_key[0..8].copy_from_slice(&node_id.to_le_bytes());
63+
64+
let keypair = Keypair::ed25519_from_bytes(private_key)
65+
.expect("Failed to create keypair from private key");
66+
let peer_id = keypair.public().to_peer_id();
67+
68+
(peer_id, 1000) // Equal weight for all peers
69+
})
70+
.collect()
71+
}
72+
1973
/// Network communication channels for different protocols
2074
pub enum NetworkChannels {
2175
Gossipsub {
@@ -30,6 +84,10 @@ pub enum NetworkChannels {
3084
sqmr_client: Option<SqmrClientSender<TopicType, TopicType>>,
3185
sqmr_server: Option<SqmrServerReceiver<TopicType, TopicType>>,
3286
},
87+
Propeller {
88+
propeller_client: Option<PropellerClient<TopicType>>,
89+
propeller_messages_receiver: Option<PropellerMessageServer<TopicType>>,
90+
},
3391
}
3492

3593
impl NetworkChannels {
@@ -49,6 +107,11 @@ impl NetworkChannels {
49107
sqmr_server.take().expect("sqmr_server should be available"),
50108
))
51109
}
110+
NetworkChannels::Propeller { propeller_client, propeller_messages_receiver: _ } => {
111+
MessageSender::Propeller(PropellerSender::new(
112+
propeller_client.take().expect("propeller_client should be available"),
113+
))
114+
}
52115
}
53116
}
54117

@@ -70,6 +133,13 @@ impl NetworkChannels {
70133
sqmr_client.take().expect("sqmr_client should be available"),
71134
)
72135
}
136+
NetworkChannels::Propeller { propeller_messages_receiver, propeller_client: _ } => {
137+
MessageReceiver::Propeller(
138+
propeller_messages_receiver
139+
.take()
140+
.expect("propeller_messages_receiver should be available"),
141+
)
142+
}
73143
}
74144
}
75145
}
@@ -80,6 +150,9 @@ pub fn create_network_manager_with_channels(
80150
network_config: &NetworkConfig,
81151
buffer_size: usize,
82152
protocol: &NetworkProtocol,
153+
num_nodes: u64,
154+
_current_node_id: u64,
155+
bootstrap_addrs: &[String],
83156
) -> (NetworkManager, NetworkChannels) {
84157
let network_metrics = create_network_metrics();
85158
let mut network_manager =
@@ -90,10 +163,8 @@ pub fn create_network_manager_with_channels(
90163
let network_channels = network_manager
91164
.register_broadcast_topic::<TopicType>(TOPIC.clone(), buffer_size)
92165
.expect("Failed to register broadcast topic");
93-
let BroadcastTopicChannels {
94-
broadcasted_messages_receiver,
95-
broadcast_topic_client,
96-
} = network_channels;
166+
let BroadcastTopicChannels { broadcasted_messages_receiver, broadcast_topic_client } =
167+
network_channels;
97168

98169
NetworkChannels::Gossipsub {
99170
broadcast_topic_client: Some(broadcast_topic_client),
@@ -112,10 +183,7 @@ pub fn create_network_manager_with_channels(
112183
buffer_size,
113184
);
114185

115-
NetworkChannels::Sqmr {
116-
sqmr_client: Some(sqmr_client),
117-
sqmr_server: Some(sqmr_server),
118-
}
186+
NetworkChannels::Sqmr { sqmr_client: Some(sqmr_client), sqmr_server: Some(sqmr_server) }
119187
}
120188
NetworkProtocol::ReveresedSqmr => {
121189
let sqmr_client = network_manager
@@ -134,6 +202,43 @@ pub fn create_network_manager_with_channels(
134202
sqmr_server: Some(sqmr_server),
135203
}
136204
}
205+
NetworkProtocol::Propeller => {
206+
// Create peer configurations from bootstrap addresses, or fallback to generated peers
207+
let peers = if !bootstrap_addrs.is_empty() {
208+
tracing::info!(
209+
"Creating Propeller peers from {} bootstrap addresses",
210+
bootstrap_addrs.len()
211+
);
212+
let peers = create_propeller_peers_from_bootstrap(bootstrap_addrs);
213+
tracing::info!(
214+
"Successfully created {} Propeller peers from bootstrap addresses",
215+
peers.len()
216+
);
217+
for (peer_id, weight) in &peers {
218+
tracing::debug!("Propeller peer: {} (weight: {})", peer_id, weight);
219+
}
220+
peers
221+
} else {
222+
tracing::info!(
223+
"No bootstrap addresses provided, using fallback peer generation for {} nodes",
224+
num_nodes
225+
);
226+
create_propeller_peers_fallback(num_nodes)
227+
};
228+
229+
// Register propeller channels
230+
let propeller_channels = network_manager
231+
.register_propeller_channels::<TopicType>(buffer_size, peers)
232+
.expect("Failed to register propeller channels");
233+
234+
let PropellerChannels { propeller_messages_receiver, propeller_client } =
235+
propeller_channels;
236+
237+
NetworkChannels::Propeller {
238+
propeller_client: Some(propeller_client),
239+
propeller_messages_receiver: Some(propeller_messages_receiver),
240+
}
241+
}
137242
};
138243

139244
(network_manager, channels)

crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/args.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def add_shared_args_to_parser(parser: argparse.ArgumentParser):
4141
parser.add_argument(
4242
"--network-protocol",
4343
help="The network protocol to use for communication.",
44-
choices=["gossipsub", "sqmr", "reversed-sqmr"],
44+
choices=["gossipsub", "sqmr", "reversed-sqmr", "propeller"],
4545
default="gossipsub",
4646
)
4747
parser.add_argument(

0 commit comments

Comments
 (0)