Skip to content

Commit f203485

Browse files
tmp fixes:
* apollo_network: ping and disconnect on failure (#10200)
1 parent 5125bb4 commit f203485

File tree

5 files changed

+159
-1
lines changed

5 files changed

+159
-1
lines changed

Cargo.lock

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/apollo_network/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ libp2p = { workspace = true, features = [
2727
"kad",
2828
"macros",
2929
"noise",
30+
"ping",
3031
"quic",
3132
"serde",
3233
"tcp",

crates/apollo_network/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ mod mixed_behaviour;
205205
pub mod network_manager;
206206
pub mod peer_manager;
207207
pub mod propeller_impl;
208+
mod prune_dead_connections;
208209
mod sqmr;
209210
#[cfg(test)]
210211
mod test_utils;

crates/apollo_network/src/mixed_behaviour.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,17 @@ use crate::discovery::DiscoveryConfig;
1919
use crate::event_tracker::EventMetricsTracker;
2020
use crate::network_manager::metrics::EventMetrics;
2121
use crate::peer_manager::PeerManagerConfig;
22-
use crate::{discovery, gossipsub_impl, peer_manager, propeller_impl, sqmr};
22+
use crate::{
23+
discovery,
24+
gossipsub_impl,
25+
gossipsub_impl,
26+
peer_manager,
27+
peer_manager,
28+
propeller_impl,
29+
prune_dead_connections,
30+
sqmr,
31+
sqmr,
32+
};
2333

2434
// TODO(Shahak): consider reducing the pulicity of all behaviour to pub(crate)
2535
#[derive(NetworkBehaviour)]
@@ -34,6 +44,7 @@ pub struct MixedBehaviour {
3444
pub sqmr: sqmr::Behaviour,
3545
pub gossipsub: gossipsub::Behaviour,
3646
pub propeller: propeller::Behaviour,
47+
pub prune_dead_connections: prune_dead_connections::Behaviour,
3748
pub event_tracker_metrics: Toggle<EventMetricsTracker>,
3849
}
3950

@@ -145,6 +156,7 @@ impl MixedBehaviour {
145156
propeller::ConfigBuilder::default().build(),
146157
propeller_metrics,
147158
),
159+
prune_dead_connections: Default::default(),
148160
event_tracker_metrics: event_metrics.map(EventMetricsTracker::new).into(),
149161
}
150162
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
//! This module is responsible for monitoring connection health using ping and disconnecting
2+
//! unhealthy connections.
3+
4+
use std::collections::VecDeque;
5+
use std::convert::Infallible;
6+
use std::task::{Context, Poll};
7+
use std::time::Duration;
8+
9+
use libp2p::core::transport::PortUse;
10+
use libp2p::core::Endpoint;
11+
use libp2p::swarm::{
12+
CloseConnection,
13+
ConnectionDenied,
14+
ConnectionHandler,
15+
ConnectionId,
16+
FromSwarm,
17+
NetworkBehaviour,
18+
ToSwarm,
19+
};
20+
use libp2p::{ping, Multiaddr, PeerId};
21+
use tracing::warn;
22+
23+
/// A behaviour that monitors connection health using ping and disconnects unhealthy connections.
24+
///
25+
/// This behaviour wraps libp2p's ping protocol and immediately disconnects on first ping failure.
26+
/// This prevents split-brain scenarios where one side thinks it's connected while the other
27+
/// doesn't.
28+
///
29+
/// This behaviour is self-contained and does not emit any events. It silently manages
30+
/// connection health in the background.
31+
#[derive(Default)]
32+
pub struct Behaviour {
33+
ping: ping::Behaviour,
34+
pending_close_connections: VecDeque<(PeerId, ConnectionId)>,
35+
}
36+
37+
impl Behaviour {
38+
pub fn new(interval: Duration, timeout: Duration) -> Self {
39+
let ping_config = ping::Config::new().with_interval(interval).with_timeout(timeout);
40+
Self {
41+
ping: ping::Behaviour::new(ping_config),
42+
pending_close_connections: Default::default(),
43+
}
44+
}
45+
}
46+
47+
impl NetworkBehaviour for Behaviour {
48+
type ConnectionHandler = <ping::Behaviour as NetworkBehaviour>::ConnectionHandler;
49+
type ToSwarm = Infallible;
50+
51+
fn handle_established_inbound_connection(
52+
&mut self,
53+
connection_id: ConnectionId,
54+
peer: PeerId,
55+
local_addr: &Multiaddr,
56+
remote_addr: &Multiaddr,
57+
) -> Result<Self::ConnectionHandler, ConnectionDenied> {
58+
self.ping.handle_established_inbound_connection(
59+
connection_id,
60+
peer,
61+
local_addr,
62+
remote_addr,
63+
)
64+
}
65+
66+
fn handle_established_outbound_connection(
67+
&mut self,
68+
connection_id: ConnectionId,
69+
peer: PeerId,
70+
addr: &Multiaddr,
71+
role_override: Endpoint,
72+
port_use: PortUse,
73+
) -> Result<Self::ConnectionHandler, ConnectionDenied> {
74+
self.ping.handle_established_outbound_connection(
75+
connection_id,
76+
peer,
77+
addr,
78+
role_override,
79+
port_use,
80+
)
81+
}
82+
83+
fn on_swarm_event(&mut self, event: FromSwarm<'_>) {
84+
self.ping.on_swarm_event(event);
85+
}
86+
87+
fn on_connection_handler_event(
88+
&mut self,
89+
peer_id: PeerId,
90+
connection_id: ConnectionId,
91+
event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour,
92+
) {
93+
self.ping.on_connection_handler_event(peer_id, connection_id, event);
94+
}
95+
96+
fn poll(
97+
&mut self,
98+
cx: &mut Context<'_>,
99+
) -> Poll<ToSwarm<Self::ToSwarm, <Self::ConnectionHandler as ConnectionHandler>::FromBehaviour>>
100+
{
101+
loop {
102+
match self.ping.poll(cx) {
103+
Poll::Ready(ToSwarm::GenerateEvent(ping_event)) => match ping_event {
104+
ping::Event { result: Ok(_), .. } => {}
105+
ping::Event { peer, connection, result: Err(failure) } => {
106+
warn!(?peer, ?connection, ?failure, "Ping failed, closing connection.");
107+
self.pending_close_connections.push_back((peer, connection));
108+
}
109+
},
110+
Poll::Ready(other) => {
111+
unreachable!("Ping behaviour should not generate swarm events: {other:?}.");
112+
}
113+
Poll::Pending => break,
114+
}
115+
}
116+
117+
if let Some((peer_id, connection_id)) = self.pending_close_connections.pop_front() {
118+
return Poll::Ready(ToSwarm::CloseConnection {
119+
peer_id,
120+
connection: CloseConnection::One(connection_id),
121+
});
122+
}
123+
124+
Poll::Pending
125+
}
126+
}

0 commit comments

Comments
 (0)