Skip to content

Commit 1dd11f1

Browse files
apollo_network: gossipsub event metrics
1 parent 02cb2f3 commit 1dd11f1

File tree

8 files changed

+240
-9
lines changed

8 files changed

+240
-9
lines changed

crates/apollo_consensus_manager/src/consensus_manager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ impl ConsensusManager {
9898
num_blacklisted_peers: CONSENSUS_NUM_BLACKLISTED_PEERS,
9999
broadcast_metrics_by_topic: Some(broadcast_metrics_by_topic),
100100
sqmr_metrics: None,
101+
gossipsub_metrics: None,
101102
});
102103
let mut network_manager =
103104
NetworkManager::new(self.config.network_config.clone(), None, network_manager_metrics);

crates/apollo_mempool_p2p/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ pub fn create_p2p_propagator_and_runner(
5050
num_blacklisted_peers: MEMPOOL_P2P_NUM_BLACKLISTED_PEERS,
5151
broadcast_metrics_by_topic: Some(broadcast_metrics_by_topic),
5252
sqmr_metrics: None,
53+
gossipsub_metrics: None,
5354
});
5455
let mut network_manager = NetworkManager::new(
5556
mempool_p2p_config.network_config,

crates/apollo_network/src/gossipsub_impl.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@ pub type Topic = gossipsub::Sha256Topic;
1212

1313
#[derive(Debug)]
1414
pub enum ExternalEvent {
15-
#[allow(dead_code)]
1615
Received { originated_peer_id: PeerId, message: Bytes, topic_hash: TopicHash },
16+
Subscribed { peer_id: PeerId, topic_hash: TopicHash },
17+
Unsubscribed { peer_id: PeerId, topic_hash: TopicHash },
18+
GossipsubNotSupported { peer_id: PeerId },
19+
SlowPeer { peer_id: PeerId, failed_messages: gossipsub::FailedMessages },
1720
}
1821

1922
impl From<gossipsub::Event> for mixed_behaviour::Event {
@@ -40,9 +43,26 @@ impl From<gossipsub::Event> for mixed_behaviour::Event {
4043
},
4144
))
4245
}
43-
_ => mixed_behaviour::Event::ToOtherBehaviourEvent(
44-
mixed_behaviour::ToOtherBehaviourEvent::NoOp,
45-
),
46+
gossipsub::Event::Subscribed { peer_id, topic } => {
47+
mixed_behaviour::Event::ExternalEvent(mixed_behaviour::ExternalEvent::GossipSub(
48+
ExternalEvent::Subscribed { peer_id, topic_hash: topic },
49+
))
50+
}
51+
gossipsub::Event::Unsubscribed { peer_id, topic } => {
52+
mixed_behaviour::Event::ExternalEvent(mixed_behaviour::ExternalEvent::GossipSub(
53+
ExternalEvent::Unsubscribed { peer_id, topic_hash: topic },
54+
))
55+
}
56+
gossipsub::Event::GossipsubNotSupported { peer_id } => {
57+
mixed_behaviour::Event::ExternalEvent(mixed_behaviour::ExternalEvent::GossipSub(
58+
ExternalEvent::GossipsubNotSupported { peer_id },
59+
))
60+
}
61+
gossipsub::Event::SlowPeer { peer_id, failed_messages } => {
62+
mixed_behaviour::Event::ExternalEvent(mixed_behaviour::ExternalEvent::GossipSub(
63+
ExternalEvent::SlowPeer { peer_id, failed_messages },
64+
))
65+
}
4666
}
4767
}
4868
}

crates/apollo_network/src/network_manager/metrics.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,79 @@ impl SqmrNetworkMetrics {
2929
}
3030
}
3131

32+
pub struct GossipsubMetrics {
33+
/// Number of peers in the mesh network (peers we directly exchange messages with)
34+
pub num_mesh_peers: MetricGauge,
35+
/// Total number of known peers and their subscribed topics
36+
pub num_all_peers: MetricGauge,
37+
/// Number of topics we are currently subscribed to
38+
pub num_subscribed_topics: MetricGauge,
39+
/// Number of peers with gossipsub protocol support
40+
pub num_gossipsub_peers: MetricGauge,
41+
/// Number of peers with floodsub protocol support
42+
pub num_floodsub_peers: MetricGauge,
43+
/// Average number of subscribed topics per peer
44+
pub avg_topics_per_peer: MetricGauge,
45+
/// Maximum number of subscribed topics by any single peer
46+
pub max_topics_per_peer: MetricGauge,
47+
/// Minimum number of subscribed topics by any single peer (for peers with >0 topics)
48+
pub min_topics_per_peer: MetricGauge,
49+
/// Total number of topic subscriptions across all peers
50+
pub total_topic_subscriptions: MetricGauge,
51+
/// Average mesh peers per topic that we're subscribed to
52+
pub avg_mesh_peers_per_topic: MetricGauge,
53+
/// Maximum mesh peers for any single topic we're subscribed to
54+
pub max_mesh_peers_per_topic: MetricGauge,
55+
/// Minimum mesh peers for any single topic we're subscribed to
56+
pub min_mesh_peers_per_topic: MetricGauge,
57+
/// Number of peers with positive peer scores (if peer scoring is enabled)
58+
pub num_peers_with_positive_score: MetricGauge,
59+
/// Number of peers with negative peer scores (if peer scoring is enabled)
60+
pub num_peers_with_negative_score: MetricGauge,
61+
/// Average peer score across all scored peers (if peer scoring is enabled)
62+
pub avg_peer_score: MetricGauge,
63+
64+
// event metrics
65+
pub count_event_messages_received: MetricCounter,
66+
pub count_event_peer_subscribed: MetricCounter,
67+
pub count_event_peer_unsubscribed: MetricCounter,
68+
pub count_event_gossipsub_not_supported: MetricCounter,
69+
pub count_event_slow_peers: MetricCounter,
70+
}
71+
72+
impl GossipsubMetrics {
73+
pub fn register(&self) {
74+
self.num_mesh_peers.register();
75+
self.num_all_peers.register();
76+
self.num_subscribed_topics.register();
77+
self.num_gossipsub_peers.register();
78+
self.num_floodsub_peers.register();
79+
self.avg_topics_per_peer.register();
80+
self.max_topics_per_peer.register();
81+
self.min_topics_per_peer.register();
82+
self.total_topic_subscriptions.register();
83+
self.avg_mesh_peers_per_topic.register();
84+
self.max_mesh_peers_per_topic.register();
85+
self.min_mesh_peers_per_topic.register();
86+
self.num_peers_with_positive_score.register();
87+
self.num_peers_with_negative_score.register();
88+
self.avg_peer_score.register();
89+
self.count_event_messages_received.register();
90+
self.count_event_peer_subscribed.register();
91+
self.count_event_peer_unsubscribed.register();
92+
self.count_event_gossipsub_not_supported.register();
93+
self.count_event_slow_peers.register();
94+
}
95+
}
96+
3297
// TODO(alonl, shahak): Consider making these fields private and receive Topics instead of
3398
// TopicHashes in the constructor
3499
pub struct NetworkMetrics {
35100
pub num_connected_peers: MetricGauge,
36101
pub num_blacklisted_peers: MetricGauge,
37102
pub broadcast_metrics_by_topic: Option<HashMap<TopicHash, BroadcastNetworkMetrics>>,
38103
pub sqmr_metrics: Option<SqmrNetworkMetrics>,
104+
pub gossipsub_metrics: Option<GossipsubMetrics>,
39105
}
40106

41107
impl NetworkMetrics {
@@ -52,5 +118,8 @@ impl NetworkMetrics {
52118
if let Some(sqmr_metrics) = self.sqmr_metrics.as_ref() {
53119
sqmr_metrics.register();
54120
}
121+
if let Some(gossipsub_metrics) = self.gossipsub_metrics.as_ref() {
122+
gossipsub_metrics.register();
123+
}
55124
}
56125
}

crates/apollo_network/src/network_manager/mod.rs

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::collections::{BTreeMap, HashMap};
99
use std::net::Ipv4Addr;
1010
use std::pin::Pin;
1111
use std::task::{Context, Poll};
12+
use std::time::Duration;
1213

1314
use apollo_network_types::network_types::{BroadcastedMessageMetadata, OpaquePeerId};
1415
use async_trait::async_trait;
@@ -23,6 +24,7 @@ use libp2p::identity::Keypair;
2324
use libp2p::swarm::SwarmEvent;
2425
use libp2p::{noise, yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder};
2526
use metrics::NetworkMetrics;
27+
use tokio::time::{sleep_until, Instant};
2628
use tracing::{debug, error, trace, warn};
2729

2830
use self::swarm_trait::SwarmTrait;
@@ -61,6 +63,7 @@ pub struct GenericNetworkManager<SwarmT: SwarmTrait> {
6163
continue_propagation_sender: Sender<BroadcastedMessageMetadata>,
6264
continue_propagation_receiver: Receiver<BroadcastedMessageMetadata>,
6365
metrics: Option<NetworkMetrics>,
66+
next_metrics_update: Instant,
6467
}
6568

6669
impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
@@ -89,6 +92,12 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
8992
Some(broadcasted_message_metadata) = self.continue_propagation_receiver.next() => {
9093
self.swarm.continue_propagation(broadcasted_message_metadata);
9194
}
95+
_ = sleep_until(self.next_metrics_update) => {
96+
if let Some(ref metrics) = self.metrics {
97+
self.swarm.update_metrics(metrics);
98+
}
99+
self.next_metrics_update = Instant::now() + Duration::from_secs(1);
100+
}
92101
}
93102
}
94103
}
@@ -128,6 +137,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
128137
continue_propagation_sender,
129138
continue_propagation_receiver,
130139
metrics,
140+
next_metrics_update: Instant::now() + Duration::from_secs(1),
131141
}
132142
}
133143

@@ -527,23 +537,58 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
527537
}
528538
}
529539

540+
/// Update the metrics for gossipsub events if these metrics were requested.
541+
fn update_gossipsub_metrics_on_event(&self, event: &gossipsub_impl::ExternalEvent) {
542+
if let Some(gossipsub_metrics) =
543+
self.metrics.as_ref().and_then(|metrics| metrics.gossipsub_metrics.as_ref())
544+
{
545+
match &event {
546+
gossipsub_impl::ExternalEvent::Received { .. } => {
547+
gossipsub_metrics.count_event_messages_received.increment(1);
548+
}
549+
gossipsub_impl::ExternalEvent::Subscribed { .. } => {
550+
gossipsub_metrics.count_event_peer_subscribed.increment(1);
551+
}
552+
gossipsub_impl::ExternalEvent::Unsubscribed { .. } => {
553+
gossipsub_metrics.count_event_peer_unsubscribed.increment(1);
554+
}
555+
gossipsub_impl::ExternalEvent::GossipsubNotSupported { .. } => {
556+
gossipsub_metrics.count_event_gossipsub_not_supported.increment(1);
557+
}
558+
gossipsub_impl::ExternalEvent::SlowPeer { .. } => {
559+
gossipsub_metrics.count_event_slow_peers.increment(1);
560+
}
561+
}
562+
}
563+
}
564+
530565
fn handle_gossipsub_behaviour_event(
531566
&mut self,
532567
event: gossipsub_impl::ExternalEvent,
533568
) -> Result<(), NetworkError> {
569+
// Record gossipsub metrics if available
570+
self.update_gossipsub_metrics_on_event(&event);
571+
572+
let gossipsub_impl::ExternalEvent::Received { originated_peer_id, message, topic_hash } =
573+
event
574+
else {
575+
return Ok(());
576+
};
577+
578+
// Record broadcast metrics for legacy compatibility
534579
if let Some(broadcast_metrics_by_topic) =
535580
self.metrics.as_ref().and_then(|metrics| metrics.broadcast_metrics_by_topic.as_ref())
536581
{
537-
let gossipsub_impl::ExternalEvent::Received { ref topic_hash, .. } = event;
538-
match broadcast_metrics_by_topic.get(topic_hash) {
582+
match broadcast_metrics_by_topic.get(&topic_hash) {
539583
Some(broadcast_metrics) => {
540584
broadcast_metrics.num_received_broadcast_messages.increment(1)
541585
}
542-
None => error!("Attempted to update topic metric with unregistered topic_hash"),
586+
None => {
587+
error!("Attempted to update topic metric with unregistered topic_hash")
588+
}
543589
}
544590
}
545-
let gossipsub_impl::ExternalEvent::Received { originated_peer_id, message, topic_hash } =
546-
event;
591+
547592
trace!("Received broadcast message with topic hash: {topic_hash:?}");
548593
let broadcasted_message_metadata = BroadcastedMessageMetadata {
549594
originator_id: OpaquePeerId::private_new(originated_peer_id),

crates/apollo_network/src/network_manager/swarm_trait.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use tracing::{info, warn};
88
use super::BroadcastedMessageMetadata;
99
use crate::gossipsub_impl::Topic;
1010
use crate::misconduct_score::MisconductScore;
11+
use crate::network_manager::metrics::NetworkMetrics;
1112
use crate::peer_manager::ReputationModifier;
1213
use crate::sqmr::behaviour::SessionIdNotFoundError;
1314
use crate::sqmr::{InboundSessionId, OutboundSessionId, SessionId};
@@ -48,6 +49,8 @@ pub trait SwarmTrait: Stream<Item = Event> + Unpin {
4849
fn add_new_supported_inbound_protocol(&mut self, protocol_name: StreamProtocol);
4950

5051
fn continue_propagation(&mut self, message_metadata: BroadcastedMessageMetadata);
52+
53+
fn update_metrics(&self, metrics: &NetworkMetrics);
5154
}
5255

5356
impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
@@ -122,4 +125,93 @@ impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
122125

123126
// TODO(shahak): Implement this function.
124127
fn continue_propagation(&mut self, _message_metadata: BroadcastedMessageMetadata) {}
128+
129+
fn update_metrics(&self, metrics: &NetworkMetrics) {
130+
let Some(gossipsub_metrics) = &metrics.gossipsub_metrics else { return };
131+
let gossipsub = &self.behaviour().gossipsub;
132+
133+
// Helper to convert usize counts to f64 metrics
134+
let set_count = |gauge: &apollo_metrics::metrics::MetricGauge, count: usize| {
135+
gauge.set(f64::from(u32::try_from(count).unwrap_or(u32::MAX)));
136+
};
137+
138+
// Basic counts
139+
set_count(&gossipsub_metrics.num_mesh_peers, gossipsub.all_mesh_peers().count());
140+
set_count(&gossipsub_metrics.num_subscribed_topics, gossipsub.topics().count());
141+
142+
// Collect peer data once for analysis
143+
let all_peers: Vec<_> = gossipsub.all_peers().collect();
144+
set_count(&gossipsub_metrics.num_all_peers, all_peers.len());
145+
set_count(&gossipsub_metrics.num_gossipsub_peers, gossipsub.peer_protocol().count());
146+
gossipsub_metrics.num_floodsub_peers.set(0.0); // Currently all peers are gossipsub
147+
148+
// Topic subscription analysis
149+
let topic_counts: Vec<usize> = all_peers.iter().map(|(_, topics)| topics.len()).collect();
150+
let total_subscriptions: usize = topic_counts.iter().sum();
151+
set_count(&gossipsub_metrics.total_topic_subscriptions, total_subscriptions);
152+
153+
if topic_counts.is_empty() {
154+
[
155+
&gossipsub_metrics.avg_topics_per_peer,
156+
&gossipsub_metrics.max_topics_per_peer,
157+
&gossipsub_metrics.min_topics_per_peer,
158+
]
159+
.iter()
160+
.for_each(|metric| metric.set(0.0));
161+
} else {
162+
let avg = f64::from(u32::try_from(total_subscriptions).unwrap_or(u32::MAX)) / f64::from(u32::try_from(topic_counts.len()).unwrap_or(u32::MAX));
163+
gossipsub_metrics.avg_topics_per_peer.set(avg);
164+
165+
if let (Some(&max), Some(&min_non_zero)) =
166+
(topic_counts.iter().max(), topic_counts.iter().filter(|&&c| c > 0).min())
167+
{
168+
set_count(&gossipsub_metrics.max_topics_per_peer, max);
169+
set_count(&gossipsub_metrics.min_topics_per_peer, min_non_zero);
170+
}
171+
}
172+
173+
// Mesh analysis per topic
174+
let our_topics: Vec<_> = gossipsub.topics().collect();
175+
if our_topics.is_empty() {
176+
[
177+
&gossipsub_metrics.avg_mesh_peers_per_topic,
178+
&gossipsub_metrics.max_mesh_peers_per_topic,
179+
&gossipsub_metrics.min_mesh_peers_per_topic,
180+
]
181+
.iter()
182+
.for_each(|metric| metric.set(0.0));
183+
} else {
184+
let mesh_counts: Vec<usize> =
185+
our_topics.iter().map(|topic| gossipsub.mesh_peers(topic).count()).collect();
186+
let total_mesh = mesh_counts.iter().sum::<usize>();
187+
let avg_mesh = f64::from(u32::try_from(total_mesh).unwrap_or(u32::MAX)) / f64::from(u32::try_from(our_topics.len()).unwrap_or(u32::MAX));
188+
gossipsub_metrics.avg_mesh_peers_per_topic.set(avg_mesh);
189+
190+
if let (Some(&min), Some(&max)) = (mesh_counts.iter().min(), mesh_counts.iter().max()) {
191+
set_count(&gossipsub_metrics.min_mesh_peers_per_topic, min);
192+
set_count(&gossipsub_metrics.max_mesh_peers_per_topic, max);
193+
}
194+
}
195+
196+
// Peer scoring analysis
197+
let peer_scores: Vec<f64> =
198+
all_peers.iter().filter_map(|(peer_id, _)| gossipsub.peer_score(peer_id)).collect();
199+
if peer_scores.is_empty() {
200+
[
201+
&gossipsub_metrics.num_peers_with_positive_score,
202+
&gossipsub_metrics.num_peers_with_negative_score,
203+
&gossipsub_metrics.avg_peer_score,
204+
]
205+
.iter()
206+
.for_each(|metric| metric.set(0.0));
207+
} else {
208+
let positive_count = peer_scores.iter().filter(|&&score| score > 0.0).count();
209+
let negative_count = peer_scores.iter().filter(|&&score| score < 0.0).count();
210+
let avg_score = peer_scores.iter().sum::<f64>() / f64::from(u32::try_from(peer_scores.len()).unwrap_or(u32::MAX));
211+
212+
set_count(&gossipsub_metrics.num_peers_with_positive_score, positive_count);
213+
set_count(&gossipsub_metrics.num_peers_with_negative_score, negative_count);
214+
gossipsub_metrics.avg_peer_score.set(avg_score);
215+
}
216+
}
125217
}

crates/apollo_network/src/network_manager/test.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ impl SwarmTrait for MockSwarm {
203203
fn continue_propagation(&mut self, _message_metadata: super::BroadcastedMessageMetadata) {
204204
unimplemented!()
205205
}
206+
207+
fn update_metrics(&self, _: &super::metrics::NetworkMetrics) {}
206208
}
207209

208210
const BUFFER_SIZE: usize = 100;

crates/apollo_state_sync/src/runner/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ impl StateSyncRunner {
201201
num_active_inbound_sessions: P2P_SYNC_NUM_ACTIVE_INBOUND_SESSIONS,
202202
num_active_outbound_sessions: P2P_SYNC_NUM_ACTIVE_OUTBOUND_SESSIONS,
203203
}),
204+
gossipsub_metrics: None,
204205
});
205206
NetworkManager::new(
206207
network_config.clone(),

0 commit comments

Comments
 (0)