Skip to content

Commit 772332d

Browse files
apollo_network: applying backpressure to swarm broadcasting
1 parent 2d24b1c commit 772332d

File tree

3 files changed

+109
-26
lines changed

3 files changed

+109
-26
lines changed

crates/apollo_network/src/network_manager/mod.rs

Lines changed: 89 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ use futures::future::{ready, BoxFuture, Ready};
1919
use futures::sink::With;
2020
use futures::stream::{FuturesUnordered, Map, Stream};
2121
use futures::{pin_mut, FutureExt, Sink, SinkExt, StreamExt};
22-
use libp2p::gossipsub::{SubscriptionError, TopicHash};
22+
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
2323
use libp2p::identity::Keypair;
2424
use libp2p::swarm::SwarmEvent;
2525
use libp2p::{noise, yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder};
2626
use metrics::NetworkMetrics;
2727
use tokio::time::{sleep_until, Instant};
28+
use tokio_retry::strategy::ExponentialBackoff;
2829
use tracing::{debug, error, trace, warn};
2930

3031
use self::swarm_trait::SwarmTrait;
@@ -43,6 +44,20 @@ pub enum NetworkError {
4344
#[error("Channels for broadcast topic with hash {topic_hash:?} were dropped.")]
4445
BroadcastChannelsDropped { topic_hash: TopicHash },
4546
}
47+
48+
struct BroadcastDetails {
49+
/// Instant of next broadcast
50+
time: Instant,
51+
/// The number of broadcast tries preformed
52+
count: u64,
53+
/// The message to broadcast
54+
message: Bytes,
55+
/// The topic to broadcast on
56+
topic: TopicHash,
57+
/// exponential backoff strategy for broadcasting the next message.
58+
broadcast_retry_strategy: ExponentialBackoff,
59+
}
60+
4661
pub struct GenericNetworkManager<SwarmT: SwarmTrait> {
4762
swarm: SwarmT,
4863
inbound_protocol_to_buffer_size: HashMap<StreamProtocol, usize>,
@@ -64,29 +79,33 @@ pub struct GenericNetworkManager<SwarmT: SwarmTrait> {
6479
continue_propagation_receiver: Receiver<BroadcastedMessageMetadata>,
6580
metrics: Option<NetworkMetrics>,
6681
next_metrics_update: Instant,
82+
/// Next message to broadcast
83+
next_broadcast: Option<BroadcastDetails>,
6784
}
6885

6986
impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
7087
pub async fn run(mut self) -> Result<(), NetworkError> {
7188
if let Some(metrics) = self.metrics.as_ref() {
7289
metrics.register();
7390
}
91+
7492
loop {
93+
let should_broadcast = self.next_broadcast.is_some();
94+
let broadcast_time =
95+
self.next_broadcast.as_ref().map(|x| x.time).unwrap_or(Instant::now());
7596
tokio::select! {
7697
Some(event) = self.swarm.next() => self.handle_swarm_event(event)?,
7798
Some(res) = self.sqmr_inbound_response_receivers.next() => self.handle_response_for_inbound_query(res),
7899
Some((protocol, client_payload)) = self.sqmr_outbound_payload_receivers.next() => {
79100
let protocol = StreamProtocol::try_from_owned(protocol).expect("Invalid protocol should not appear");
80101
self.handle_local_sqmr_payload(protocol, client_payload.expect("An SQMR client channel should not be terminated."))
81102
}
82-
Some((topic_hash, message)) = self.messages_to_broadcast_receivers.next() => {
83-
self.broadcast_message(
84-
message.ok_or(NetworkError::BroadcastChannelsDropped {
85-
topic_hash: topic_hash.clone()
86-
})?,
87-
topic_hash,
88-
);
89-
}
103+
Some((topic_hash, message)) = self.messages_to_broadcast_receivers.next(), if !should_broadcast => {
104+
self.setup_broadcast(topic_hash, message)?;
105+
},
106+
_ = sleep_until(broadcast_time), if should_broadcast => {
107+
self.do_broadcast();
108+
},
90109
Some(Some(peer_id)) = self.reported_peer_receivers.next() => self.swarm.report_peer_as_malicious(peer_id, MisconductScore::MALICIOUS),
91110
Some(peer_id) = self.reported_peers_receiver.next() => self.swarm.report_peer_as_malicious(peer_id, MisconductScore::MALICIOUS),
92111
Some(broadcasted_message_metadata) = self.continue_propagation_receiver.next() => {
@@ -138,6 +157,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
138157
continue_propagation_receiver,
139158
metrics,
140159
next_metrics_update: Instant::now() + Duration::from_secs(1),
160+
next_broadcast: None,
141161
}
142162
}
143163

@@ -274,6 +294,60 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
274294
})
275295
}
276296

297+
fn setup_broadcast(
298+
&mut self,
299+
topic_hash: TopicHash,
300+
message: Option<Bytes>,
301+
) -> Result<(), NetworkError> {
302+
let message = message
303+
.ok_or(NetworkError::BroadcastChannelsDropped { topic_hash: topic_hash.clone() })?;
304+
self.next_broadcast = Some(BroadcastDetails {
305+
time: Instant::now(),
306+
count: 0,
307+
message,
308+
topic: topic_hash,
309+
broadcast_retry_strategy: ExponentialBackoff::from_millis(2)
310+
.max_delay(Duration::from_secs(1)),
311+
});
312+
Ok(())
313+
}
314+
315+
fn do_broadcast(&mut self) {
316+
let mut details =
317+
self.next_broadcast.take().expect("Broadcasting when next broadcast is None");
318+
details.count += 1;
319+
match self.broadcast_message(details.message.clone(), details.topic.clone()) {
320+
Ok(_) => {}
321+
Err(e) => match &e {
322+
PublishError::Duplicate
323+
| PublishError::SigningError(_)
324+
| PublishError::MessageTooLarge => {
325+
error!(
326+
"Failed to broadcast message: `{e:?}` after {} tries Dropping message.",
327+
details.count
328+
);
329+
}
330+
PublishError::AllQueuesFull(_)
331+
| PublishError::NoPeersSubscribedToTopic
332+
| PublishError::TransformFailed(_) => {
333+
let wait_duration = details.broadcast_retry_strategy.next().expect(
334+
"Broadcast retry strategy ended even though it's an infinite iterator.",
335+
);
336+
warn!(
337+
"Failed to broadcast message: `{e:?}` after {} tries. Trying again in {} \
338+
milliseconds. Not reading more messages until then (Applying \
339+
backpressure).",
340+
details.count,
341+
wait_duration.as_millis()
342+
);
343+
344+
details.time = Instant::now() + wait_duration;
345+
self.next_broadcast = Some(details)
346+
}
347+
},
348+
}
349+
}
350+
277351
fn handle_swarm_event(
278352
&mut self,
279353
event: SwarmEvent<mixed_behaviour::Event>,
@@ -661,7 +735,11 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
661735
.insert(outbound_session_id, report_receiver);
662736
}
663737

664-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
738+
fn broadcast_message(
739+
&mut self,
740+
message: Bytes,
741+
topic_hash: TopicHash,
742+
) -> Result<MessageId, PublishError> {
665743
if let Some(broadcast_metrics_by_topic) =
666744
self.metrics.as_ref().and_then(|metrics| metrics.broadcast_metrics_by_topic.as_ref())
667745
{
@@ -673,7 +751,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
673751
}
674752
}
675753
trace!("Sending broadcast message with topic hash: {topic_hash:?}");
676-
self.swarm.broadcast_message(message, topic_hash);
754+
self.swarm.broadcast_message(message, topic_hash)
677755
}
678756

679757
fn report_session_removed_to_metrics(&mut self, session_id: SessionId) {

crates/apollo_network/src/network_manager/swarm_trait.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use futures::stream::Stream;
2-
use libp2p::gossipsub::{SubscriptionError, TopicHash};
2+
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
33
use libp2p::swarm::dial_opts::DialOpts;
44
use libp2p::swarm::{DialError, NetworkBehaviour, SwarmEvent};
55
use libp2p::{Multiaddr, PeerId, StreamProtocol, Swarm};
6-
use tracing::{info, warn};
6+
use tracing::info;
77

88
use super::BroadcastedMessageMetadata;
99
use crate::gossipsub_impl::Topic;
@@ -42,7 +42,11 @@ pub trait SwarmTrait: Stream<Item = Event> + Unpin {
4242

4343
fn subscribe_to_topic(&mut self, topic: &Topic) -> Result<(), SubscriptionError>;
4444

45-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash);
45+
fn broadcast_message(
46+
&mut self,
47+
message: Bytes,
48+
topic_hash: TopicHash,
49+
) -> Result<MessageId, PublishError>;
4650

4751
fn report_peer_as_malicious(&mut self, peer_id: PeerId, misconduct_score: MisconductScore);
4852

@@ -100,16 +104,12 @@ impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
100104
self.behaviour_mut().gossipsub.subscribe(topic).map(|_| ())
101105
}
102106

103-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
104-
let result = self.behaviour_mut().gossipsub.publish(topic_hash.clone(), message);
105-
if let Err(err) = result {
106-
// TODO(shahak): Consider reporting to the subscriber broadcast failures or retrying
107-
// upon failure.
108-
warn!(
109-
"Error occured while broadcasting a message to the topic with hash \
110-
{topic_hash:?}: {err:?}"
111-
);
112-
}
107+
fn broadcast_message(
108+
&mut self,
109+
message: Bytes,
110+
topic_hash: TopicHash,
111+
) -> Result<MessageId, PublishError> {
112+
self.behaviour_mut().gossipsub.publish(topic_hash.clone(), message)
113113
}
114114

115115
fn report_peer_as_malicious(&mut self, peer_id: PeerId, misconduct_score: MisconductScore) {

crates/apollo_network/src/network_manager/test.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use futures::{pin_mut, Future, SinkExt, StreamExt};
1515
use lazy_static::lazy_static;
1616
use libp2p::core::transport::PortUse;
1717
use libp2p::core::ConnectedPoint;
18-
use libp2p::gossipsub::{SubscriptionError, TopicHash};
18+
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
1919
use libp2p::swarm::ConnectionId;
2020
use libp2p::{Multiaddr, PeerId, StreamProtocol};
2121
use tokio::select;
@@ -175,10 +175,15 @@ impl SwarmTrait for MockSwarm {
175175
Ok(())
176176
}
177177

178-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
178+
fn broadcast_message(
179+
&mut self,
180+
message: Bytes,
181+
topic_hash: TopicHash,
182+
) -> Result<MessageId, PublishError> {
179183
for sender in &self.broadcasted_messages_senders {
180184
sender.unbounded_send((message.clone(), topic_hash.clone())).unwrap();
181185
}
186+
Ok(MessageId::new(&message))
182187
}
183188

184189
fn report_peer_as_malicious(&mut self, peer_id: PeerId, _: MisconductScore) {

0 commit comments

Comments
 (0)