Skip to content

Commit 0de2486

Browse files
apollo_network: applying backpressure to swarm broadcasting
1 parent 865d0ce commit 0de2486

File tree

3 files changed

+43
-25
lines changed

3 files changed

+43
-25
lines changed

crates/apollo_network/src/network_manager/mod.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use futures::future::{ready, BoxFuture, Ready};
1919
use futures::sink::With;
2020
use futures::stream::{FuturesUnordered, Map, Stream};
2121
use futures::{pin_mut, FutureExt, Sink, SinkExt, StreamExt};
22-
use libp2p::gossipsub::{SubscriptionError, TopicHash};
22+
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
2323
use libp2p::identity::Keypair;
2424
use libp2p::metrics::{Metrics, Recorder, Registry};
2525
use libp2p::swarm::SwarmEvent;
@@ -72,21 +72,30 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
7272
if let Some(metrics) = self.metrics.as_ref() {
7373
metrics.register();
7474
}
75+
76+
let mut message_to_send: Option<(Bytes, TopicHash)> = None;
7577
loop {
78+
if let Some((message, topic_hash)) = message_to_send.as_ref() {
79+
match self.broadcast_message(message.clone(), topic_hash.clone()) {
80+
Ok(_) => {
81+
message_to_send = None;
82+
}
83+
Err(e) => {
84+
warn!("Failed to broadcast message: `{e:?}` Applying Backpressure.");
85+
}
86+
};
87+
}
7688
tokio::select! {
7789
Some(event) = self.swarm.next() => self.handle_swarm_event(event)?,
7890
Some(res) = self.sqmr_inbound_response_receivers.next() => self.handle_response_for_inbound_query(res),
7991
Some((protocol, client_payload)) = self.sqmr_outbound_payload_receivers.next() => {
8092
let protocol = StreamProtocol::try_from_owned(protocol).expect("Invalid protocol should not appear");
8193
self.handle_local_sqmr_payload(protocol, client_payload.expect("An SQMR client channel should not be terminated."))
8294
}
83-
Some((topic_hash, message)) = self.messages_to_broadcast_receivers.next() => {
84-
self.broadcast_message(
85-
message.ok_or(NetworkError::BroadcastChannelsDropped {
86-
topic_hash: topic_hash.clone()
87-
})?,
88-
topic_hash,
89-
);
95+
Some((topic_hash, message)) = self.messages_to_broadcast_receivers.next(), if message_to_send.is_none() => {
96+
message_to_send = Some((message.ok_or(NetworkError::BroadcastChannelsDropped {
97+
topic_hash: topic_hash.clone(),
98+
})?, topic_hash));
9099
}
91100
Some(Some(peer_id)) = self.reported_peer_receivers.next() => self.swarm.report_peer_as_malicious(peer_id, MisconductScore::MALICIOUS),
92101
Some(peer_id) = self.reported_peers_receiver.next() => self.swarm.report_peer_as_malicious(peer_id, MisconductScore::MALICIOUS),
@@ -642,7 +651,11 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
642651
.insert(outbound_session_id, report_receiver);
643652
}
644653

645-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
654+
fn broadcast_message(
655+
&mut self,
656+
message: Bytes,
657+
topic_hash: TopicHash,
658+
) -> Result<MessageId, PublishError> {
646659
if let Some(broadcast_metrics_by_topic) =
647660
self.metrics.as_ref().and_then(|metrics| metrics.broadcast_metrics_by_topic.as_ref())
648661
{
@@ -654,7 +667,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
654667
}
655668
}
656669
trace!("Sending broadcast message with topic hash: {topic_hash:?}");
657-
self.swarm.broadcast_message(message, topic_hash);
670+
self.swarm.broadcast_message(message, topic_hash)
658671
}
659672

660673
fn report_session_removed_to_metrics(&mut self, session_id: SessionId) {

crates/apollo_network/src/network_manager/swarm_trait.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use futures::stream::Stream;
2-
use libp2p::gossipsub::{SubscriptionError, TopicHash};
2+
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
33
use libp2p::swarm::dial_opts::DialOpts;
44
use libp2p::swarm::{DialError, NetworkBehaviour, SwarmEvent};
55
use libp2p::{Multiaddr, PeerId, StreamProtocol, Swarm};
6-
use tracing::{info, warn};
6+
use tracing::info;
77

88
use super::BroadcastedMessageMetadata;
99
use crate::gossipsub_impl::Topic;
@@ -41,7 +41,11 @@ pub trait SwarmTrait: Stream<Item = Event> + Unpin {
4141

4242
fn subscribe_to_topic(&mut self, topic: &Topic) -> Result<(), SubscriptionError>;
4343

44-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash);
44+
fn broadcast_message(
45+
&mut self,
46+
message: Bytes,
47+
topic_hash: TopicHash,
48+
) -> Result<MessageId, PublishError>;
4549

4650
fn report_peer_as_malicious(&mut self, peer_id: PeerId, misconduct_score: MisconductScore);
4751

@@ -97,16 +101,12 @@ impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
97101
self.behaviour_mut().gossipsub.subscribe(topic).map(|_| ())
98102
}
99103

100-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
101-
let result = self.behaviour_mut().gossipsub.publish(topic_hash.clone(), message);
102-
if let Err(err) = result {
103-
// TODO(shahak): Consider reporting to the subscriber broadcast failures or retrying
104-
// upon failure.
105-
warn!(
106-
"Error occured while broadcasting a message to the topic with hash \
107-
{topic_hash:?}: {err:?}"
108-
);
109-
}
104+
fn broadcast_message(
105+
&mut self,
106+
message: Bytes,
107+
topic_hash: TopicHash,
108+
) -> Result<MessageId, PublishError> {
109+
self.behaviour_mut().gossipsub.publish(topic_hash.clone(), message)
110110
}
111111

112112
fn report_peer_as_malicious(&mut self, peer_id: PeerId, misconduct_score: MisconductScore) {

crates/apollo_network/src/network_manager/test.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use futures::{pin_mut, Future, SinkExt, StreamExt};
1515
use lazy_static::lazy_static;
1616
use libp2p::core::transport::PortUse;
1717
use libp2p::core::ConnectedPoint;
18-
use libp2p::gossipsub::{SubscriptionError, TopicHash};
18+
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
1919
use libp2p::swarm::ConnectionId;
2020
use libp2p::{Multiaddr, PeerId, StreamProtocol};
2121
use tokio::select;
@@ -175,10 +175,15 @@ impl SwarmTrait for MockSwarm {
175175
Ok(())
176176
}
177177

178-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
178+
fn broadcast_message(
179+
&mut self,
180+
message: Bytes,
181+
topic_hash: TopicHash,
182+
) -> Result<MessageId, PublishError> {
179183
for sender in &self.broadcasted_messages_senders {
180184
sender.unbounded_send((message.clone(), topic_hash.clone())).unwrap();
181185
}
186+
Ok(MessageId::new(&message))
182187
}
183188

184189
fn report_peer_as_malicious(&mut self, peer_id: PeerId, _: MisconductScore) {

0 commit comments

Comments
 (0)