Skip to content

Commit de3d50e

Browse files
apollo_network: propeller
1 parent 8c54b64 commit de3d50e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+5814
-31
lines changed

Cargo.lock

Lines changed: 81 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ members = [
6464
"crates/apollo_p2p_sync_config",
6565
"crates/apollo_proc_macros",
6666
"crates/apollo_proc_macros_tests",
67+
"crates/apollo_propeller",
6768
"crates/apollo_protobuf",
6869
"crates/apollo_reverts",
6970
"crates/apollo_rpc",
@@ -179,6 +180,7 @@ apollo_p2p_sync.path = "crates/apollo_p2p_sync"
179180
apollo_p2p_sync_config.path = "crates/apollo_p2p_sync_config"
180181
apollo_proc_macros = { path = "crates/apollo_proc_macros", version = "0.0.0" }
181182
apollo_proc_macros_tests.path = "crates/apollo_proc_macros_tests"
183+
apollo_propeller.path = "crates/apollo_propeller"
182184
apollo_protobuf.path = "crates/apollo_protobuf"
183185
apollo_reverts.path = "crates/apollo_reverts"
184186
apollo_rpc.path = "crates/apollo_rpc"
@@ -209,6 +211,7 @@ assert_matches = "1.5.0"
209211
async-recursion = "1.1.0"
210212
async-stream = "0.3.3"
211213
async-trait = "0.1.79"
214+
asynchronous-codec = "0.7.0"
212215
atomic_refcell = "0.1.13"
213216
axum = "0.6.12"
214217
base64 = "0.13.0"
@@ -279,6 +282,7 @@ libp2p = "0.56.0"
279282
libp2p-swarm-test = "0.6.0"
280283
log = "0.4"
281284
lru = "0.12.0"
285+
lru_time_cache = "0.11.11"
282286
memmap2 = "0.8.0"
283287
mempool_test_utils.path = "crates/mempool_test_utils"
284288
metrics = "0.24.1"
@@ -315,10 +319,14 @@ prost-types = "0.12.1"
315319
protoc-prebuilt = "0.3.0"
316320
pyo3 = "0.19.1"
317321
pyo3-log = "0.8.1"
322+
quick-protobuf = "0.8.1"
323+
quick-protobuf-codec = "0.3.1"
324+
quickcheck = "1.0.3"
318325
quote = "1.0.26"
319326
rand = "0.8.5"
320327
rand_chacha = "0.3.1"
321328
rand_distr = "0.4.3"
329+
reed-solomon-simd = "3.1.0"
322330
regex = "1.10.4"
323331
replace_with = "0.1.7"
324332
reqwest = "0.12"

crates/apollo_consensus_manager/src/consensus_manager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ impl ConsensusManager {
169169
broadcast_metrics_by_topic: Some(broadcast_metrics_by_topic),
170170
sqmr_metrics: None,
171171
event_metrics: Some(EventMetrics { event_counter: CONSENSUS_NETWORK_EVENTS }),
172+
propeller_metrics: None,
172173
});
173174

174175
NetworkManager::new(self.config.network_config.clone(), None, network_manager_metrics)

crates/apollo_mempool_p2p/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ pub fn create_p2p_propagator_and_runner(
5757
broadcast_metrics_by_topic: Some(broadcast_metrics_by_topic),
5858
sqmr_metrics: None,
5959
event_metrics: Some(EventMetrics { event_counter: MEMPOOL_P2P_NETWORK_EVENTS }),
60+
propeller_metrics: None,
6061
});
6162
let mut network_manager = NetworkManager::new(
6263
mempool_p2p_config.network_config,

crates/apollo_network/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ testing = []
1212
apollo_config.workspace = true
1313
apollo_metrics.workspace = true
1414
apollo_network_types.workspace = true
15+
apollo_propeller.workspace = true
1516
async-stream.workspace = true
1617
async-trait.workspace = true
1718
bytes.workspace = true

crates/apollo_network/src/bin/broadcast_network_stress_test_node/args.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ pub enum NetworkProtocol {
3333
/// Use Reversed SQMR where receivers initiate requests to broadcasters
3434
#[value(name = "reversed-sqmr")]
3535
ReveresedSqmr,
36+
/// Use Propeller for leader-based erasure-coded broadcasting
37+
#[value(name = "propeller")]
38+
Propeller,
3639
}
3740

3841
impl Display for Mode {

crates/apollo_network/src/bin/broadcast_network_stress_test_node/message_handling.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ use apollo_network::network_manager::{
22
BroadcastTopicClient,
33
BroadcastTopicClientTrait,
44
BroadcastTopicServer,
5+
PropellerClient,
6+
PropellerClientTrait,
7+
PropellerMessageServer,
8+
ReceivedPropellerMessage,
59
SqmrClientSender,
610
SqmrServerReceiver,
711
};
@@ -16,6 +20,26 @@ pub enum MessageSender {
1620
Gossipsub(BroadcastTopicClient<TopicType>),
1721
Sqmr(SqmrClientSender<TopicType, TopicType>),
1822
ReveresedSqmr(ReveresedSqmrSender),
23+
Propeller(PropellerSender),
24+
}
25+
26+
/// Wrapper for Propeller client that handles message ID generation
27+
pub struct PropellerSender {
28+
client: PropellerClient<TopicType>,
29+
}
30+
31+
impl PropellerSender {
32+
pub fn new(client: PropellerClient<TopicType>) -> Self {
33+
Self { client }
34+
}
35+
36+
async fn send_message(&mut self, message: TopicType) {
37+
if let Err(e) = self.client.send_message(message).await {
38+
error!("Failed to send Propeller message: {:?}", e);
39+
} else {
40+
trace!("Sent Propeller message");
41+
}
42+
}
1943
}
2044

2145
/// Wrapper for ReveresedSqmr that maintains the last active query
@@ -88,6 +112,9 @@ impl MessageSender {
88112
// Then broadcast the message to all active queries
89113
sender.broadcast_to_queries(message).await;
90114
}
115+
MessageSender::Propeller(sender) => {
116+
sender.send_message(message).await;
117+
}
91118
}
92119
}
93120
}
@@ -96,6 +123,7 @@ pub enum MessageReceiver {
96123
Gossipsub(BroadcastTopicServer<TopicType>),
97124
Sqmr(SqmrServerReceiver<TopicType, TopicType>),
98125
ReveresedSqmr(SqmrClientSender<TopicType, TopicType>),
126+
Propeller(PropellerMessageServer<TopicType>),
99127
}
100128

101129
impl MessageReceiver {
@@ -146,6 +174,24 @@ impl MessageReceiver {
146174
}
147175
}
148176
},
177+
MessageReceiver::Propeller(receiver) => receiver
178+
.for_each(
179+
|(_, message_root, result): ReceivedPropellerMessage<TopicType>| async move {
180+
match result {
181+
Ok(message) => {
182+
trace!("Received Propeller message with ID: {}", message_root);
183+
f(message);
184+
}
185+
Err(e) => {
186+
error!(
187+
"Failed to deserialize Propeller message {}: {:?}",
188+
message_root, e
189+
);
190+
}
191+
}
192+
},
193+
)
194+
.await,
149195
}
150196
}
151197
}

0 commit comments

Comments
 (0)