diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 5724d076913..1010495eb32 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -3,10 +3,14 @@ - Remove `Rpc` from the public API. See [PR 6091](https://github.com/libp2p/rust-libp2p/pull/6091) +- switch the internal `async-channel` used to dispatch messages from `NetworkBehaviour` to the `ConnectionHandler` + with an internal priority queue. See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX) + ## 0.49.0 - Feature gate metrics related code. This changes some `Behaviour` constructor methods. See [PR 6020](https://github.com/libp2p/rust-libp2p/pull/6020) + - Send IDONTWANT before Publishing a new message. See [PR 6017](https://github.com/libp2p/rust-libp2p/pull/6017) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 45c84a39ffd..f7e23dc31f8 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -61,7 +61,7 @@ use crate::{ mcache::MessageCache, peer_score::{PeerScore, PeerScoreParams, PeerScoreState, PeerScoreThresholds, RejectReason}, protocol::SIGNING_PREFIX, - rpc::Sender, + queue::Queue, rpc_proto::proto, subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter}, time_cache::DuplicateCache, @@ -766,6 +766,7 @@ where if self.send_message( *peer_id, RpcOut::Publish { + message_id: msg_id.clone(), message: raw_message.clone(), timeout: Delay::new(self.config.publish_queue_duration()), }, @@ -1355,6 +1356,7 @@ where self.send_message( *peer_id, RpcOut::Forward { + message_id: id.clone(), message: msg, timeout: Delay::new(self.config.forward_queue_duration()), }, @@ -2096,9 +2098,8 @@ where // steady-state size of the queues. #[cfg(feature = "metrics")] if let Some(m) = &mut self.metrics { - for sender_queue in self.connected_peers.values().map(|v| &v.sender) { - m.observe_priority_queue_size(sender_queue.priority_queue_len()); - m.observe_non_priority_queue_size(sender_queue.non_priority_queue_len()); + for sender_queue in self.connected_peers.values().map(|v| &v.messages) { + m.observe_priority_queue_size(sender_queue.len()); } } @@ -2760,6 +2761,7 @@ where self.send_message( *peer_id, RpcOut::Forward { + message_id: msg_id.clone(), message: message.clone(), timeout: Delay::new(self.config.forward_queue_duration()), }, @@ -2889,7 +2891,12 @@ where } // Try sending the message to the connection handler. - match peer.sender.send_message(rpc) { + if rpc.high_priority() { + peer.messages.push(rpc); + return true; + } + + match peer.messages.try_push(rpc) { Ok(()) => true, Err(rpc) => { // Sending failed because the channel is full. @@ -2897,25 +2904,7 @@ where // Update failed message counter. let failed_messages = self.failed_messages.entry(peer_id).or_default(); - match rpc { - RpcOut::Publish { .. } => { - failed_messages.priority += 1; - failed_messages.publish += 1; - } - RpcOut::Forward { .. } => { - failed_messages.non_priority += 1; - failed_messages.forward += 1; - } - RpcOut::IWant(_) | RpcOut::IHave(_) | RpcOut::IDontWant(_) => { - failed_messages.non_priority += 1; - } - RpcOut::Graft(_) - | RpcOut::Prune(_) - | RpcOut::Subscribe(_) - | RpcOut::Unsubscribe(_) => { - unreachable!("Channel for highpriority control messages is unbounded and should always be open.") - } - } + failed_messages.queue_full += 1; // Update peer score. if let PeerScoreState::Active(peer_score) = &mut self.peer_score { @@ -3141,18 +3130,20 @@ where // occur. let connected_peer = self.connected_peers.entry(peer_id).or_insert(PeerDetails { kind: PeerKind::Floodsub, - connections: vec![], outbound: false, - sender: Sender::new(self.config.connection_handler_queue_len()), + connections: vec![], topics: Default::default(), dont_send: LinkedHashMap::new(), + messages: Queue::new(self.config.connection_handler_queue_len()), }); // Add the new connection connected_peer.connections.push(connection_id); + // This clones a reference to the Queue so any new handlers reference the same underlying + // queue. No data is actually cloned here. Ok(Handler::new( self.config.protocol_config(), - connected_peer.sender.new_receiver(), + connected_peer.messages.clone(), )) } @@ -3166,20 +3157,22 @@ where ) -> Result, ConnectionDenied> { let connected_peer = self.connected_peers.entry(peer_id).or_insert(PeerDetails { kind: PeerKind::Floodsub, - connections: vec![], // Diverging from the go implementation we only want to consider a peer as outbound peer // if its first connection is outbound. outbound: !self.px_peers.contains(&peer_id), - sender: Sender::new(self.config.connection_handler_queue_len()), + connections: vec![], topics: Default::default(), dont_send: LinkedHashMap::new(), + messages: Queue::new(self.config.connection_handler_queue_len()), }); // Add the new connection connected_peer.connections.push(connection_id); + // This clones a reference to the Queue so any new handlers reference the same underlying + // queue. No data is actually cloned here. Ok(Handler::new( self.config.protocol_config(), - connected_peer.sender.new_receiver(), + connected_peer.messages.clone(), )) } @@ -3221,7 +3214,9 @@ where } } } - HandlerEvent::MessageDropped(rpc) => { + // rpc is only used for metrics code + #[allow(unused_variables)] + HandlerEvent::MessagesDropped(rpc) => { // Account for this in the scoring logic if let PeerScoreState::Active(peer_score) = &mut self.peer_score { peer_score.failed_message_slow_peer(&propagation_source); @@ -3230,29 +3225,12 @@ where // Keep track of expired messages for the application layer. let failed_messages = self.failed_messages.entry(propagation_source).or_default(); failed_messages.timeout += 1; - match rpc { - RpcOut::Publish { .. } => { - failed_messages.publish += 1; - } - RpcOut::Forward { .. } => { - failed_messages.forward += 1; - } - _ => {} - } // Record metrics on the failure. #[cfg(feature = "metrics")] if let Some(metrics) = self.metrics.as_mut() { - match rpc { - RpcOut::Publish { message, .. } => { - metrics.publish_msg_dropped(&message.topic); - metrics.timeout_msg_dropped(&message.topic); - } - RpcOut::Forward { message, .. } => { - metrics.forward_msg_dropped(&message.topic); - metrics.timeout_msg_dropped(&message.topic); - } - _ => {} + if let RpcOut::Publish { message, .. } | RpcOut::Forward { message, .. } = rpc { + metrics.timeout_msg_dropped(&message.topic); } } } @@ -3357,6 +3335,16 @@ where if let Some(metrics) = self.metrics.as_mut() { metrics.register_idontwant(message_ids.len()); } + + // Remove messages from the queue. + peer.messages.retain(|rpc| match rpc { + RpcOut::Publish { message_id, .. } + | RpcOut::Forward { message_id, .. } => { + !message_ids.contains(message_id) + } + _ => true, + }); + for message_id in message_ids { peer.dont_send.insert(message_id, Instant::now()); // Don't exceed capacity. diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 85aead47911..1368c027044 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -20,7 +20,7 @@ // Collection of tests for the gossipsub network behaviour -use std::{future, net::Ipv4Addr, thread::sleep}; +use std::{net::Ipv4Addr, thread::sleep}; use asynchronous_codec::{Decoder, Encoder}; use byteorder::{BigEndian, ByteOrder}; @@ -32,7 +32,6 @@ use super::*; use crate::{ config::{ConfigBuilder, TopicMeshConfig}, protocol::GossipsubCodec, - rpc::Receiver, subscription_filter::WhitelistSubscriptionFilter, types::RpcIn, IdentTopic as Topic, @@ -63,7 +62,7 @@ where ) -> ( Behaviour, Vec, - HashMap, + HashMap>, Vec, ) { let keypair = libp2p_identity::Keypair::generate_ed25519(); @@ -92,11 +91,11 @@ where // build and connect peer_no random peers let mut peers = vec![]; - let mut receivers = HashMap::new(); + let mut queues = HashMap::new(); let empty = vec![]; for i in 0..self.peer_no { - let (peer, receiver) = add_peer_with_addr_and_kind( + let (peer, queue) = add_peer_with_addr_and_kind( &mut gs, if self.to_subscribe { &topic_hashes @@ -109,10 +108,10 @@ where self.peer_kind.or(Some(PeerKind::Gossipsubv1_1)), ); peers.push(peer); - receivers.insert(peer, receiver); + queues.insert(peer, queue); } - (gs, peers, receivers, topic_hashes) + (gs, peers, queues, topic_hashes) } fn peer_no(mut self, peer_no: usize) -> Self { @@ -181,7 +180,7 @@ fn add_peer( topic_hashes: &[TopicHash], outbound: bool, explicit: bool, -) -> (PeerId, Receiver) +) -> (PeerId, Queue) where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, @@ -195,7 +194,7 @@ fn add_peer_with_addr( outbound: bool, explicit: bool, address: Multiaddr, -) -> (PeerId, Receiver) +) -> (PeerId, Queue) where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, @@ -217,7 +216,7 @@ fn add_peer_with_addr_and_kind( explicit: bool, address: Multiaddr, kind: Option, -) -> (PeerId, Receiver) +) -> (PeerId, Queue) where D: DataTransform + Default + Clone + Send + 'static, F: TopicSubscriptionFilter + Clone + Default + Send + 'static, @@ -236,8 +235,8 @@ where } }; - let sender = Sender::new(gs.config.connection_handler_queue_len()); - let receiver = sender.new_receiver(); + let queue = Queue::new(gs.config.connection_handler_queue_len()); + let receiver_queue = queue.clone(); let connection_id = ConnectionId::new_unchecked(0); gs.connected_peers.insert( peer, @@ -246,7 +245,7 @@ where outbound, connections: vec![connection_id], topics: Default::default(), - sender, + messages: queue, dont_send: LinkedHashMap::new(), }, ); @@ -281,7 +280,7 @@ where &peer, ); } - (peer, receiver) + (peer, receiver_queue) } fn disconnect_peer(gs: &mut Behaviour, peer_id: &PeerId) @@ -438,7 +437,7 @@ fn test_subscribe() { // - run JOIN(topic) let subscribe_topic = vec![String::from("test_subscribe")]; - let (gs, _, receivers, topic_hashes) = inject_nodes1() + let (gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(subscribe_topic) .to_subscribe(true) @@ -450,12 +449,11 @@ fn test_subscribe() { ); // collect all the subscriptions - let subscriptions = receivers + let subscriptions = queues .into_values() - .fold(0, |mut collected_subscriptions, c| { - let priority = c.priority.get_ref(); - while !priority.is_empty() { - if let Ok(RpcOut::Subscribe(_)) = priority.try_recv() { + .fold(0, |mut collected_subscriptions, mut queue| { + while !queue.is_empty() { + if let Ok(RpcOut::Subscribe(_)) = queue.try_pop() { collected_subscriptions += 1 } } @@ -481,7 +479,7 @@ fn test_unsubscribe() { .collect::>(); // subscribe to topic_strings - let (mut gs, _, receivers, topic_hashes) = inject_nodes1() + let (mut gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topic_strings) .to_subscribe(true) @@ -511,12 +509,11 @@ fn test_unsubscribe() { ); // collect all the subscriptions - let subscriptions = receivers + let subscriptions = queues .into_values() - .fold(0, |mut collected_subscriptions, c| { - let priority = c.priority.get_ref(); - while !priority.is_empty() { - if let Ok(RpcOut::Subscribe(_)) = priority.try_recv() { + .fold(0, |mut collected_subscriptions, mut queue| { + while !queue.is_empty() { + if let Ok(RpcOut::Subscribe(_)) = queue.try_pop() { collected_subscriptions += 1 } } @@ -553,14 +550,14 @@ fn test_join() { .map(|t| Topic::new(t.clone())) .collect::>(); - let (mut gs, _, mut receivers, topic_hashes) = inject_nodes1() + let (mut gs, _, mut queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topic_strings) .to_subscribe(true) .create_network(); // Flush previous GRAFT messages. - receivers = flush_events(&mut gs, receivers); + queues = flush_events(&mut gs, queues); // unsubscribe, then call join to invoke functionality assert!( @@ -584,31 +581,25 @@ fn test_join() { "Should have added 6 nodes to the mesh" ); - fn count_grafts(receivers: HashMap) -> (usize, HashMap) { - let mut new_receivers = HashMap::new(); + fn count_grafts( + queues: HashMap>, + ) -> (usize, HashMap>) { + let mut new_queues = HashMap::new(); let mut acc = 0; - for (peer_id, c) in receivers.into_iter() { - let priority = c.priority.get_ref(); - while !priority.is_empty() { - if let Ok(RpcOut::Graft(_)) = priority.try_recv() { + for (peer_id, mut queue) in queues.into_iter() { + while !queue.is_empty() { + if let Ok(RpcOut::Graft(_)) = queue.try_pop() { acc += 1; } } - new_receivers.insert( - peer_id, - Receiver { - priority_queue_len: c.priority_queue_len, - priority: c.priority, - non_priority: c.non_priority, - }, - ); + new_queues.insert(peer_id, queue); } - (acc, new_receivers) + (acc, new_queues) } // there should be mesh_n GRAFT messages. - let (graft_messages, mut receivers) = count_grafts(receivers); + let (graft_messages, mut queues) = count_grafts(queues); assert_eq!( graft_messages, 6, @@ -632,8 +623,8 @@ fn test_join() { &address, ) .unwrap(); - let sender = Sender::new(gs.config.connection_handler_queue_len()); - let receiver = sender.new_receiver(); + let queue = Queue::new(gs.config.connection_handler_queue_len()); + let receiver_queue = queue.clone(); let connection_id = ConnectionId::new_unchecked(0); gs.connected_peers.insert( random_peer, @@ -642,11 +633,11 @@ fn test_join() { outbound: false, connections: vec![connection_id], topics: Default::default(), - sender, + messages: queue, dont_send: LinkedHashMap::new(), }, ); - receivers.insert(random_peer, receiver); + queues.insert(random_peer, receiver_queue); gs.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id: random_peer, @@ -683,7 +674,7 @@ fn test_join() { } // there should now 6 graft messages to be sent - let (graft_messages, _) = count_grafts(receivers); + let (graft_messages, _) = count_grafts(queues); assert_eq!( graft_messages, 6, @@ -705,7 +696,7 @@ fn test_publish_without_flood_publishing() { .unwrap(); let publish_topic = String::from("test_publish"); - let (mut gs, _, receivers, topic_hashes) = inject_nodes1() + let (mut gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![publish_topic.clone()]) .to_subscribe(true) @@ -732,12 +723,11 @@ fn test_publish_without_flood_publishing() { gs.publish(Topic::new(publish_topic), publish_data).unwrap(); // Collect all publish messages - let publishes = receivers + let publishes = queues .into_values() - .fold(vec![], |mut collected_publish, c| { - let priority = c.priority.get_ref(); - while !priority.is_empty() { - if let Ok(RpcOut::Publish { message, .. }) = priority.try_recv() { + .fold(vec![], |mut collected_publish, mut queue| { + while !queue.is_empty() { + if let Ok(RpcOut::Publish { message, .. }) = queue.try_pop() { collected_publish.push(message); } } @@ -785,7 +775,7 @@ fn test_fanout() { .unwrap(); let fanout_topic = String::from("test_fanout"); - let (mut gs, _, receivers, topic_hashes) = inject_nodes1() + let (mut gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![fanout_topic.clone()]) .to_subscribe(true) @@ -817,12 +807,11 @@ fn test_fanout() { ); // Collect all publish messages - let publishes = receivers + let publishes = queues .into_values() - .fold(vec![], |mut collected_publish, c| { - let priority = c.priority.get_ref(); - while !priority.is_empty() { - if let Ok(RpcOut::Publish { message, .. }) = priority.try_recv() { + .fold(vec![], |mut collected_publish, mut queue| { + while !queue.is_empty() { + if let Ok(RpcOut::Publish { message, .. }) = queue.try_pop() { collected_publish.push(message); } } @@ -857,7 +846,7 @@ fn test_fanout() { /// Test the gossipsub NetworkBehaviour peer connection logic. #[test] fn test_inject_connected() { - let (gs, peers, receivers, topic_hashes) = inject_nodes1() + let (gs, peers, queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -865,12 +854,11 @@ fn test_inject_connected() { // check that our subscriptions are sent to each of the peers // collect all the SendEvents - let subscriptions = receivers.into_iter().fold( + let subscriptions = queues.into_iter().fold( HashMap::>::new(), - |mut collected_subscriptions, (peer, c)| { - let priority = c.priority.get_ref(); - while !priority.is_empty() { - if let Ok(RpcOut::Subscribe(topic)) = priority.try_recv() { + |mut collected_subscriptions, (peer, mut queue)| { + while !queue.is_empty() { + if let Ok(RpcOut::Subscribe(topic)) = queue.try_pop() { let mut peer_subs = collected_subscriptions.remove(&peer).unwrap_or_default(); peer_subs.push(topic.into_string()); collected_subscriptions.insert(peer, peer_subs); @@ -913,7 +901,7 @@ fn test_handle_received_subscriptions() { .iter() .map(|&t| String::from(t)) .collect(); - let (mut gs, peers, _receivers, topic_hashes) = inject_nodes1() + let (mut gs, peers, _queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topics) .to_subscribe(false) @@ -1039,7 +1027,7 @@ fn test_get_random_peers() { connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), - sender: Sender::new(gs.config.connection_handler_queue_len()), + messages: Queue::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), }, ); @@ -1073,7 +1061,7 @@ fn test_get_random_peers() { /// Tests that the correct message is sent when a peer asks for a message in our cache. #[test] fn test_handle_iwant_msg_cached() { - let (mut gs, peers, receivers, _) = inject_nodes1() + let (mut gs, peers, queues, _) = inject_nodes1() .peer_no(20) .topics(Vec::new()) .to_subscribe(true) @@ -1101,12 +1089,11 @@ fn test_handle_iwant_msg_cached() { gs.handle_iwant(&peers[7], vec![msg_id.clone()]); // the messages we are sending - let sent_messages = receivers + let sent_messages = queues .into_values() - .fold(vec![], |mut collected_messages, c| { - let non_priority = c.non_priority.get_ref(); - while !non_priority.is_empty() { - if let Ok(RpcOut::Forward { message, .. }) = non_priority.try_recv() { + .fold(vec![], |mut collected_messages, mut queue| { + while !queue.is_empty() { + if let Ok(RpcOut::Forward { message, .. }) = queue.try_pop() { collected_messages.push(message) } } @@ -1125,7 +1112,7 @@ fn test_handle_iwant_msg_cached() { /// Tests that messages are sent correctly depending on the shifting of the message cache. #[test] fn test_handle_iwant_msg_cached_shifted() { - let (mut gs, peers, mut receivers, _) = inject_nodes1() + let (mut gs, peers, mut queues, _) = inject_nodes1() .peer_no(20) .topics(Vec::new()) .to_subscribe(true) @@ -1159,28 +1146,23 @@ fn test_handle_iwant_msg_cached_shifted() { // is the message is being sent? let mut message_exists = false; - receivers = receivers.into_iter().map(|(peer_id, c)| { - let non_priority = c.non_priority.get_ref(); - while !non_priority.is_empty() { - if matches!(non_priority.try_recv(), Ok(RpcOut::Forward{message, timeout: _ }) if + queues = queues + .into_iter() + .map(|(peer_id, mut queue)| { + while !queue.is_empty() { + if matches!(queue.try_pop(), Ok(RpcOut::Forward{message, ..}) if gs.config.message_id( &gs.data_transform .inbound_transform(message.clone()) .unwrap(), ) == msg_id) - { - message_exists = true; + { + message_exists = true; + } } - } - ( - peer_id, - Receiver { - priority_queue_len: c.priority_queue_len, - priority: c.priority, - non_priority: c.non_priority, - }, - ) - }).collect(); + (peer_id, queue) + }) + .collect(); // default history_length is 5, expect no messages after shift > 5 if shift < 5 { assert!( @@ -1217,7 +1199,7 @@ fn test_handle_iwant_msg_not_cached() { #[test] fn test_handle_iwant_msg_but_already_sent_idontwant() { - let (mut gs, peers, receivers, _) = inject_nodes1() + let (mut gs, peers, queues, _) = inject_nodes1() .peer_no(20) .topics(Vec::new()) .to_subscribe(true) @@ -1263,15 +1245,15 @@ fn test_handle_iwant_msg_but_already_sent_idontwant() { gs.handle_iwant(&peers[1], vec![msg_id.clone()]); // Check that no messages are sent. - receivers.iter().for_each(|(_, receiver)| { - assert!(receiver.non_priority.get_ref().is_empty()); + queues.iter().for_each(|(_, receiver_queue)| { + assert!(receiver_queue.is_empty()); }); } /// tests that an event is created when a peer shares that it has a message we want #[test] fn test_handle_ihave_subscribed_and_msg_not_cached() { - let (mut gs, peers, mut receivers, topic_hashes) = inject_nodes1() + let (mut gs, peers, mut queues, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1284,10 +1266,9 @@ fn test_handle_ihave_subscribed_and_msg_not_cached() { // check that we sent an IWANT request for `unknown id` let mut iwant_exists = false; - let receiver = receivers.remove(&peers[7]).unwrap(); - let non_priority = receiver.non_priority.get_ref(); - while !non_priority.is_empty() { - if let Ok(RpcOut::IWant(IWant { message_ids })) = non_priority.try_recv() { + let mut receiver_queue = queues.remove(&peers[7]).unwrap(); + while !receiver_queue.is_empty() { + if let Ok(RpcOut::IWant(IWant { message_ids })) = receiver_queue.try_pop() { if message_ids .iter() .any(|m| *m == MessageId::new(b"unknown id")) @@ -1457,61 +1438,37 @@ fn test_handle_prune_peer_in_mesh() { } fn count_control_msgs( - receivers: HashMap, + queues: HashMap>, mut filter: impl FnMut(&PeerId, &RpcOut) -> bool, -) -> (usize, HashMap) { - let mut new_receivers = HashMap::new(); +) -> (usize, HashMap>) { + let mut new_queues = HashMap::new(); let mut collected_messages = 0; - for (peer_id, c) in receivers.into_iter() { - let priority = c.priority.get_ref(); - let non_priority = c.non_priority.get_ref(); - while !priority.is_empty() || !non_priority.is_empty() { - if let Ok(rpc) = priority.try_recv() { - if filter(&peer_id, &rpc) { - collected_messages += 1; - } - } - if let Ok(rpc) = non_priority.try_recv() { + for (peer_id, mut queue) in queues.into_iter() { + while !queue.is_empty() { + if let Ok(rpc) = queue.try_pop() { if filter(&peer_id, &rpc) { collected_messages += 1; } } } - new_receivers.insert( - peer_id, - Receiver { - priority_queue_len: c.priority_queue_len, - priority: c.priority, - non_priority: c.non_priority, - }, - ); + new_queues.insert(peer_id, queue); } - (collected_messages, new_receivers) + (collected_messages, new_queues) } fn flush_events( gs: &mut Behaviour, - receivers: HashMap, -) -> HashMap { + queues: HashMap>, +) -> HashMap> { gs.events.clear(); - let mut new_receivers = HashMap::new(); - for (peer_id, c) in receivers.into_iter() { - let priority = c.priority.get_ref(); - let non_priority = c.non_priority.get_ref(); - while !priority.is_empty() || !non_priority.is_empty() { - let _ = priority.try_recv(); - let _ = non_priority.try_recv(); + let mut new_queues = HashMap::new(); + for (peer_id, mut queue) in queues.into_iter() { + while !queue.is_empty() { + let _ = queue.try_pop(); } - new_receivers.insert( - peer_id, - Receiver { - priority_queue_len: c.priority_queue_len, - priority: c.priority, - non_priority: c.non_priority, - }, - ); + new_queues.insert(peer_id, queue); } - new_receivers + new_queues } /// tests that a peer added as explicit peer gets connected to @@ -1550,7 +1507,7 @@ fn test_explicit_peer_reconnects() { .check_explicit_peers_ticks(2) .build() .unwrap(); - let (mut gs, others, receivers, _) = inject_nodes1() + let (mut gs, others, queues, _) = inject_nodes1() .peer_no(1) .topics(Vec::new()) .to_subscribe(true) @@ -1562,7 +1519,7 @@ fn test_explicit_peer_reconnects() { // add peer as explicit peer gs.add_explicit_peer(peer); - flush_events(&mut gs, receivers); + flush_events(&mut gs, queues); // disconnect peer disconnect_peer(&mut gs, peer); @@ -1600,7 +1557,7 @@ fn test_explicit_peer_reconnects() { #[test] fn test_handle_graft_explicit_peer() { - let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() + let (mut gs, peers, queues, topic_hashes) = inject_nodes1() .peer_no(1) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1617,7 +1574,7 @@ fn test_handle_graft_explicit_peer() { assert!(gs.mesh[&topic_hashes[1]].is_empty()); // check prunes - let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + let (control_msgs, _) = count_control_msgs(queues, |peer_id, m| { peer_id == peer && match m { RpcOut::Prune(Prune { topic_hash, .. }) => { @@ -1634,7 +1591,7 @@ fn test_handle_graft_explicit_peer() { #[test] fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { - let (gs, peers, receivers, topic_hashes) = inject_nodes1() + let (gs, peers, queues, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1649,7 +1606,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { ); // assert that graft gets created to non-explicit peer - let (control_msgs, receivers) = count_control_msgs(receivers, |peer_id, m| { + let (control_msgs, queues) = count_control_msgs(queues, |peer_id, m| { peer_id == &peers[1] && matches!(m, RpcOut::Graft { .. }) }); assert!( @@ -1658,7 +1615,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { ); // assert that no graft gets created to explicit peer - let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + let (control_msgs, _) = count_control_msgs(queues, |peer_id, m| { peer_id == &peers[0] && matches!(m, RpcOut::Graft { .. }) }); assert_eq!( @@ -1669,7 +1626,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { #[test] fn do_not_graft_explicit_peer() { - let (mut gs, others, receivers, topic_hashes) = inject_nodes1() + let (mut gs, others, queues, topic_hashes) = inject_nodes1() .peer_no(1) .topics(vec![String::from("topic")]) .to_subscribe(true) @@ -1683,7 +1640,7 @@ fn do_not_graft_explicit_peer() { assert_eq!(gs.mesh[&topic_hashes[0]], BTreeSet::new()); // assert that no graft gets created to explicit peer - let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + let (control_msgs, _) = count_control_msgs(queues, |peer_id, m| { peer_id == &others[0] && matches!(m, RpcOut::Graft { .. }) }); assert_eq!( @@ -1694,7 +1651,7 @@ fn do_not_graft_explicit_peer() { #[test] fn do_forward_messages_to_explicit_peers() { - let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() + let (mut gs, peers, queues, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1715,10 +1672,9 @@ fn do_forward_messages_to_explicit_peers() { }; gs.handle_received_message(message.clone(), &local_id); assert_eq!( - receivers.into_iter().fold(0, |mut fwds, (peer_id, c)| { - let non_priority = c.non_priority.get_ref(); - while !non_priority.is_empty() { - if matches!(non_priority.try_recv(), Ok(RpcOut::Forward{message: m, timeout: _}) if peer_id == peers[0] && m.data == message.data) { + queues.into_iter().fold(0, |mut fwds, (peer_id, mut queue)| { + while !queue.is_empty() { + if matches!(queue.try_pop(), Ok(RpcOut::Forward{message: m, ..}) if peer_id == peers[0] && m.data == message.data) { fwds +=1; } } @@ -1731,7 +1687,7 @@ fn do_forward_messages_to_explicit_peers() { #[test] fn explicit_peers_not_added_to_mesh_on_subscribe() { - let (mut gs, peers, receivers, _) = inject_nodes1() + let (mut gs, peers, queues, _) = inject_nodes1() .peer_no(2) .topics(Vec::new()) .to_subscribe(true) @@ -1759,7 +1715,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { assert_eq!(gs.mesh[&topic_hash], vec![peers[1]].into_iter().collect()); // assert that graft gets created to non-explicit peer - let (control_msgs, receivers) = count_control_msgs(receivers, |peer_id, m| { + let (control_msgs, queues) = count_control_msgs(queues, |peer_id, m| { peer_id == &peers[1] && matches!(m, RpcOut::Graft { .. }) }); assert!( @@ -1768,7 +1724,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { ); // assert that no graft gets created to explicit peer - let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + let (control_msgs, _) = count_control_msgs(queues, |peer_id, m| { peer_id == &peers[0] && matches!(m, RpcOut::Graft { .. }) }); assert_eq!( @@ -1779,7 +1735,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { #[test] fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { - let (mut gs, peers, receivers, _) = inject_nodes1() + let (mut gs, peers, queues, _) = inject_nodes1() .peer_no(2) .topics(Vec::new()) .to_subscribe(true) @@ -1810,7 +1766,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { assert_eq!(gs.mesh[&topic_hash], vec![peers[1]].into_iter().collect()); // assert that graft gets created to non-explicit peer - let (control_msgs, receivers) = count_control_msgs(receivers, |peer_id, m| { + let (control_msgs, queues) = count_control_msgs(queues, |peer_id, m| { peer_id == &peers[1] && matches!(m, RpcOut::Graft { .. }) }); assert!( @@ -1819,7 +1775,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { ); // assert that no graft gets created to explicit peer - let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + let (control_msgs, _) = count_control_msgs(queues, |peer_id, m| { peer_id == &peers[0] && matches!(m, RpcOut::Graft { .. }) }); assert_eq!( @@ -1830,7 +1786,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { #[test] fn no_gossip_gets_sent_to_explicit_peers() { - let (mut gs, peers, mut receivers, topic_hashes) = inject_nodes1() + let (mut gs, peers, mut queues, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1859,11 +1815,10 @@ fn no_gossip_gets_sent_to_explicit_peers() { } // assert that no gossip gets sent to explicit peer - let receiver = receivers.remove(&peers[0]).unwrap(); + let mut receiver_queue = queues.remove(&peers[0]).unwrap(); let mut gossips = 0; - let non_priority = receiver.non_priority.get_ref(); - while !non_priority.is_empty() { - if let Ok(RpcOut::IHave(_)) = non_priority.try_recv() { + while !receiver_queue.is_empty() { + if let Ok(RpcOut::IHave(_)) = receiver_queue.try_pop() { gossips += 1; } } @@ -1876,7 +1831,7 @@ fn test_mesh_addition() { let config: Config = Config::default(); // Adds mesh_low peers and PRUNE 2 giving us a deficit. - let (mut gs, peers, _receivers, topics) = inject_nodes1() + let (mut gs, peers, _queues, topics) = inject_nodes1() .peer_no(config.mesh_n() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1912,7 +1867,7 @@ fn test_mesh_subtraction() { // Adds mesh_low peers and PRUNE 2 giving us a deficit. let n = config.mesh_n_high() + 10; // make all outbound connections so that we allow grafting to all - let (mut gs, peers, _receivers, topics) = inject_nodes1() + let (mut gs, peers, _queues, topics) = inject_nodes1() .peer_no(n) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1992,7 +1947,7 @@ fn test_send_px_and_backoff_in_prune() { let config: Config = Config::default(); // build mesh with enough peers for px - let (mut gs, peers, receivers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(config.prune_peers() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2008,7 +1963,7 @@ fn test_send_px_and_backoff_in_prune() { ); // check prune message - let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + let (control_msgs, _) = count_control_msgs(queues, |peer_id, m| { peer_id == &peers[0] && match m { RpcOut::Prune(Prune { @@ -2034,7 +1989,7 @@ fn test_prune_backoffed_peer_on_graft() { let config: Config = Config::default(); // build mesh with enough peers for px - let (mut gs, peers, receivers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(config.prune_peers() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2051,13 +2006,13 @@ fn test_prune_backoffed_peer_on_graft() { ); // ignore all messages until now - let receivers = flush_events(&mut gs, receivers); + let queues = flush_events(&mut gs, queues); // handle graft gs.handle_graft(&peers[0], vec![topics[0].clone()]); // check prune message - let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + let (control_msgs, _) = count_control_msgs(queues, |peer_id, m| { peer_id == &peers[0] && match m { RpcOut::Prune(Prune { @@ -2084,7 +2039,7 @@ fn test_do_not_graft_within_backoff_period() { .build() .unwrap(); // only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, peers, receivers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2095,7 +2050,7 @@ fn test_do_not_graft_within_backoff_period() { gs.handle_prune(&peers[0], vec![(topics[0].clone(), Vec::new(), Some(1))]); // forget all events until now - let receivers = flush_events(&mut gs, receivers); + let queues = flush_events(&mut gs, queues); // call heartbeat gs.heartbeat(); @@ -2108,8 +2063,8 @@ fn test_do_not_graft_within_backoff_period() { // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). - let (control_msgs, receivers) = - count_control_msgs(receivers, |_, m| matches!(m, RpcOut::Graft { .. })); + let (control_msgs, queues) = + count_control_msgs(queues, |_, m| matches!(m, RpcOut::Graft { .. })); assert_eq!( control_msgs, 0, "Graft message created too early within backoff period" @@ -2120,7 +2075,7 @@ fn test_do_not_graft_within_backoff_period() { gs.heartbeat(); // check that graft got created - let (control_msgs, _) = count_control_msgs(receivers, |_, m| matches!(m, RpcOut::Graft { .. })); + let (control_msgs, _) = count_control_msgs(queues, |_, m| matches!(m, RpcOut::Graft { .. })); assert!( control_msgs > 0, "No graft message was created after backoff period" @@ -2137,7 +2092,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without .build() .unwrap(); // only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, peers, receivers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2148,7 +2103,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without gs.handle_prune(&peers[0], vec![(topics[0].clone(), Vec::new(), None)]); // forget all events until now - let receivers = flush_events(&mut gs, receivers); + let queues = flush_events(&mut gs, queues); // call heartbeat gs.heartbeat(); @@ -2159,8 +2114,8 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). - let (control_msgs, receivers) = - count_control_msgs(receivers, |_, m| matches!(m, RpcOut::Graft { .. })); + let (control_msgs, queues) = + count_control_msgs(queues, |_, m| matches!(m, RpcOut::Graft { .. })); assert_eq!( control_msgs, 0, "Graft message created too early within backoff period" @@ -2171,7 +2126,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without gs.heartbeat(); // check that graft got created - let (control_msgs, _) = count_control_msgs(receivers, |_, m| matches!(m, RpcOut::Graft { .. })); + let (control_msgs, _) = count_control_msgs(queues, |_, m| matches!(m, RpcOut::Graft { .. })); assert!( control_msgs > 0, "No graft message was created after backoff period" @@ -2192,7 +2147,7 @@ fn test_unsubscribe_backoff() { let topic = String::from("test"); // only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, _, receivers, topics) = inject_nodes1() + let (mut gs, _, queues, topics) = inject_nodes1() .peer_no(1) .topics(vec![topic.clone()]) .to_subscribe(true) @@ -2201,7 +2156,7 @@ fn test_unsubscribe_backoff() { let _ = gs.unsubscribe(&Topic::new(topic)); - let (control_msgs, receivers) = count_control_msgs(receivers, |_, m| match m { + let (control_msgs, queues) = count_control_msgs(queues, |_, m| match m { RpcOut::Prune(Prune { backoff, .. }) => backoff == &Some(1), _ => false, }); @@ -2213,7 +2168,7 @@ fn test_unsubscribe_backoff() { let _ = gs.subscribe(&Topic::new(topics[0].to_string())); // forget all events until now - let receivers = flush_events(&mut gs, receivers); + let queues = flush_events(&mut gs, queues); // call heartbeat gs.heartbeat(); @@ -2226,8 +2181,8 @@ fn test_unsubscribe_backoff() { // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). - let (control_msgs, receivers) = - count_control_msgs(receivers, |_, m| matches!(m, RpcOut::Graft { .. })); + let (control_msgs, queues) = + count_control_msgs(queues, |_, m| matches!(m, RpcOut::Graft { .. })); assert_eq!( control_msgs, 0, "Graft message created too early within backoff period" @@ -2238,7 +2193,7 @@ fn test_unsubscribe_backoff() { gs.heartbeat(); // check that graft got created - let (control_msgs, _) = count_control_msgs(receivers, |_, m| matches!(m, RpcOut::Graft { .. })); + let (control_msgs, _) = count_control_msgs(queues, |_, m| matches!(m, RpcOut::Graft { .. })); assert!( control_msgs > 0, "No graft message was created after backoff period" @@ -2251,7 +2206,7 @@ fn test_flood_publish() { let topic = "test"; // Adds more peers than mesh can hold to test flood publishing - let (mut gs, _, receivers, _) = inject_nodes1() + let (mut gs, _, queues, _) = inject_nodes1() .peer_no(config.mesh_n_high() + 10) .topics(vec![topic.into()]) .to_subscribe(true) @@ -2262,12 +2217,11 @@ fn test_flood_publish() { gs.publish(Topic::new(topic), publish_data).unwrap(); // Collect all publish messages - let publishes = receivers + let publishes = queues .into_values() - .fold(vec![], |mut collected_publish, c| { - let priority = c.priority.get_ref(); - while !priority.is_empty() { - if let Ok(RpcOut::Publish { message, .. }) = priority.try_recv() { + .fold(vec![], |mut collected_publish, mut queue| { + while !queue.is_empty() { + if let Ok(RpcOut::Publish { message, .. }) = queue.try_pop() { collected_publish.push(message); } } @@ -2306,7 +2260,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { // add more peers than in mesh to test gossipping // by default only mesh_n_low peers will get added to mesh - let (mut gs, _, receivers, topic_hashes) = inject_nodes1() + let (mut gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(config.mesh_n_low() + config.gossip_lazy() + 1) .topics(vec!["topic".into()]) .to_subscribe(true) @@ -2333,7 +2287,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { let msg_id = gs.config.message_id(message); // check that exactly config.gossip_lazy() many gossip messages were sent. - let (control_msgs, _) = count_control_msgs(receivers, |_, action| match action { + let (control_msgs, _) = count_control_msgs(queues, |_, action| match action { RpcOut::IHave(IHave { topic_hash, message_ids, @@ -2349,7 +2303,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() { // add a lot of peers let m = config.mesh_n_low() + config.gossip_lazy() * (2.0 / config.gossip_factor()) as usize; - let (mut gs, _, receivers, topic_hashes) = inject_nodes1() + let (mut gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(m) .topics(vec!["topic".into()]) .to_subscribe(true) @@ -2375,7 +2329,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() { let msg_id = gs.config.message_id(message); // check that exactly config.gossip_lazy() many gossip messages were sent. - let (control_msgs, _) = count_control_msgs(receivers, |_, action| match action { + let (control_msgs, _) = count_control_msgs(queues, |_, action| match action { RpcOut::IHave(IHave { topic_hash, message_ids, @@ -2408,8 +2362,8 @@ fn test_accept_only_outbound_peer_grafts_when_mesh_full() { assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high()); // create an outbound and an inbound peer - let (inbound, _in_receiver) = add_peer(&mut gs, &topics, false, false); - let (outbound, _out_receiver) = add_peer(&mut gs, &topics, true, false); + let (inbound, _in_queue) = add_peer(&mut gs, &topics, false, false); + let (outbound, _out_queue) = add_peer(&mut gs, &topics, true, false); // send grafts gs.handle_graft(&inbound, vec![topics[0].clone()]); @@ -2439,7 +2393,7 @@ fn test_do_not_remove_too_many_outbound_peers() { .unwrap(); // fill the mesh with inbound connections - let (mut gs, peers, _receivers, topics) = inject_nodes1() + let (mut gs, peers, _queues, topics) = inject_nodes1() .peer_no(n) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2512,7 +2466,7 @@ fn test_prune_negative_scored_peers() { let config = Config::default(); // build mesh with one peer - let (mut gs, peers, receivers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2535,7 +2489,7 @@ fn test_prune_negative_scored_peers() { assert!(gs.mesh[&topics[0]].is_empty()); // check prune message - let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + let (control_msgs, _) = count_control_msgs(queues, |peer_id, m| { peer_id == &peers[0] && match m { RpcOut::Prune(Prune { @@ -2570,8 +2524,8 @@ fn test_dont_graft_to_negative_scored_peers() { .create_network(); // add two additional peers that will not be part of the mesh - let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); - let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); + let (p1, _queue1) = add_peer(&mut gs, &topics, false, false); + let (p2, _queue2) = add_peer(&mut gs, &topics, false, false); // reduce score of p1 to negative gs.as_peer_score_mut().add_penalty(&p1, 1); @@ -2644,7 +2598,7 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { .unwrap(); // Build mesh with three peer - let (mut gs, peers, receivers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(3) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2670,7 +2624,7 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { ); // Check that px in prune message only contains third peer - let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + let (control_msgs, _) = count_control_msgs(queues, |peer_id, m| { peer_id == &peers[1] && match m { RpcOut::Prune(Prune { @@ -2698,7 +2652,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { }; // Build full mesh - let (mut gs, peers, mut receivers, topics) = inject_nodes1() + let (mut gs, peers, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2712,10 +2666,10 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { } // Add two additional peers that will not be part of the mesh - let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); - receivers.insert(p1, receiver1); - let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); - receivers.insert(p2, receiver2); + let (p1, queue1) = add_peer(&mut gs, &topics, false, false); + queues.insert(p1, queue1); + let (p2, queue2) = add_peer(&mut gs, &topics, false, false); + queues.insert(p2, queue2); // Reduce score of p1 below peer_score_thresholds.gossip_threshold // note that penalties get squared so two penalties means a score of @@ -2746,7 +2700,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { gs.emit_gossip(); // Check that exactly one gossip messages got sent and it got sent to p2 - let (control_msgs, _) = count_control_msgs(receivers, |peer, action| match action { + let (control_msgs, _) = count_control_msgs(queues, |peer, action| match action { RpcOut::IHave(IHave { topic_hash, message_ids, @@ -2773,7 +2727,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { }; // Build full mesh - let (mut gs, peers, mut receivers, topics) = inject_nodes1() + let (mut gs, peers, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2789,10 +2743,10 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { } // Add two additional peers that will not be part of the mesh - let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); - receivers.insert(p1, receiver1); - let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); - receivers.insert(p2, receiver2); + let (p1, queue1) = add_peer(&mut gs, &topics, false, false); + queues.insert(p1, queue1); + let (p2, queue2) = add_peer(&mut gs, &topics, false, false); + queues.insert(p2, queue2); // Reduce score of p1 below peer_score_thresholds.gossip_threshold // note that penalties get squared so two penalties means a score of @@ -2824,12 +2778,11 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { // the messages we are sending let sent_messages = - receivers + queues .into_iter() - .fold(vec![], |mut collected_messages, (peer_id, c)| { - let non_priority = c.non_priority.get_ref(); - while !non_priority.is_empty() { - if let Ok(RpcOut::Forward { message, .. }) = non_priority.try_recv() { + .fold(vec![], |mut collected_messages, (peer_id, mut queue)| { + while !queue.is_empty() { + if let Ok(RpcOut::Forward { message, .. }) = queue.try_pop() { collected_messages.push((peer_id, message)); } } @@ -2863,7 +2816,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { ..PeerScoreThresholds::default() }; // build full mesh - let (mut gs, peers, mut receivers, topics) = inject_nodes1() + let (mut gs, peers, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2879,10 +2832,10 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { } // add two additional peers that will not be part of the mesh - let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); - receivers.insert(p1, receiver1); - let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); - receivers.insert(p2, receiver2); + let (p1, queue1) = add_peer(&mut gs, &topics, false, false); + queues.insert(p1, queue1); + let (p2, queue2) = add_peer(&mut gs, &topics, false, false); + queues.insert(p2, queue2); // reduce score of p1 below peer_score_thresholds.gossip_threshold // note that penalties get squared so two penalties means a score of @@ -2912,7 +2865,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { gs.handle_ihave(&p2, vec![(topics[0].clone(), vec![msg_id.clone()])]); // check that we sent exactly one IWANT request to p2 - let (control_msgs, _) = count_control_msgs(receivers, |peer, c| match c { + let (control_msgs, _) = count_control_msgs(queues, |peer, c| match c { RpcOut::IWant(IWant { message_ids }) => { if message_ids.iter().any(|m| m == &msg_id) { assert_eq!(peer, &p2); @@ -2940,7 +2893,7 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { }; // build mesh with no peers and no subscribed topics - let (mut gs, _, mut receivers, _) = inject_nodes1() + let (mut gs, _, mut queues, _) = inject_nodes1() .gs_config(config) .scoring(Some((peer_score_params, peer_score_thresholds))) .create_network(); @@ -2950,10 +2903,10 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { let topics = vec![topic.hash()]; // add two additional peers that will be added to the mesh - let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); - receivers.insert(p1, receiver1); - let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); - receivers.insert(p2, receiver2); + let (p1, queue1) = add_peer(&mut gs, &topics, false, false); + queues.insert(p1, queue1); + let (p2, queue2) = add_peer(&mut gs, &topics, false, false); + queues.insert(p2, queue2); // reduce score of p1 below peer_score_thresholds.publish_threshold // note that penalties get squared so two penalties means a score of @@ -2971,17 +2924,17 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { gs.publish(topic, publish_data).unwrap(); // Collect all publish messages - let publishes = receivers - .into_iter() - .fold(vec![], |mut collected_publish, (peer_id, c)| { - let priority = c.priority.get_ref(); - while !priority.is_empty() { - if let Ok(RpcOut::Publish { message, .. }) = priority.try_recv() { - collected_publish.push((peer_id, message)); + let publishes = + queues + .into_iter() + .fold(vec![], |mut collected_publish, (peer_id, mut queue)| { + while !queue.is_empty() { + if let Ok(RpcOut::Publish { message, .. }) = queue.try_pop() { + collected_publish.push((peer_id, message)); + } } - } - collected_publish - }); + collected_publish + }); // assert only published to p2 assert_eq!(publishes.len(), 1); @@ -2998,17 +2951,17 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { ..PeerScoreThresholds::default() }; // build mesh with no peers - let (mut gs, _, mut receivers, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .topics(vec!["test".into()]) .gs_config(config) .scoring(Some((peer_score_params, peer_score_thresholds))) .create_network(); // add two additional peers that will be added to the mesh - let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); - receivers.insert(p1, receiver1); - let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); - receivers.insert(p2, receiver2); + let (p1, queue1) = add_peer(&mut gs, &topics, false, false); + queues.insert(p1, queue1); + let (p2, queue2) = add_peer(&mut gs, &topics, false, false); + queues.insert(p2, queue2); // reduce score of p1 below peer_score_thresholds.publish_threshold // note that penalties get squared so two penalties means a score of @@ -3026,17 +2979,17 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { gs.publish(Topic::new("test"), publish_data).unwrap(); // Collect all publish messages - let publishes = receivers - .into_iter() - .fold(vec![], |mut collected_publish, (peer_id, c)| { - let priority = c.priority.get_ref(); - while !priority.is_empty() { - if let Ok(RpcOut::Publish { message, .. }) = priority.try_recv() { - collected_publish.push((peer_id, message)) + let publishes = + queues + .into_iter() + .fold(vec![], |mut collected_publish, (peer_id, mut queue)| { + while !queue.is_empty() { + if let Ok(RpcOut::Publish { message, .. }) = queue.try_pop() { + collected_publish.push((peer_id, message)) + } } - } - collected_publish - }); + collected_publish + }); // assert only published to p2 assert_eq!(publishes.len(), 1); @@ -3062,8 +3015,8 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { .create_network(); // add two additional peers that will be added to the mesh - let (p1, _receiver1) = add_peer(&mut gs, &topics, false, false); - let (p2, _receiver2) = add_peer(&mut gs, &topics, false, false); + let (p1, _queue1) = add_peer(&mut gs, &topics, false, false); + let (p2, _queue2) = add_peer(&mut gs, &topics, false, false); // reduce score of p1 below peer_score_thresholds.graylist_threshold // note that penalties get squared so two penalties means a score of @@ -3256,7 +3209,7 @@ fn test_keep_best_scoring_peers_on_oversubscription() { // build mesh with more peers than mesh can hold let n = config.mesh_n_high() + 1; - let (mut gs, peers, _receivers, topics) = inject_nodes1() + let (mut gs, peers, _queues, topics) = inject_nodes1() .peer_no(n) .topics(vec!["test".into()]) .to_subscribe(true) @@ -4369,7 +4322,7 @@ fn test_scoring_p7_grafts_before_backoff() { ..Default::default() }; - let (mut gs, peers, _receivers, topics) = inject_nodes1() + let (mut gs, peers, _queues, topics) = inject_nodes1() .peer_no(2) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4446,7 +4399,7 @@ fn test_opportunistic_grafting() { ..Default::default() }; - let (mut gs, peers, _receivers, topics) = inject_nodes1() + let (mut gs, peers, _queues, topics) = inject_nodes1() .peer_no(5) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4475,7 +4428,7 @@ fn test_opportunistic_grafting() { } // set scores for peers in the mesh - for (i, (peer, _receiver)) in others.iter().enumerate().take(5) { + for (i, (peer, _queue)) in others.iter().enumerate().take(5) { gs.set_application_score(peer, 0.0 + i as f64); } @@ -4523,7 +4476,7 @@ fn test_opportunistic_grafting() { #[test] fn test_ignore_graft_from_unknown_topic() { // build gossipsub without subscribing to any topics - let (mut gs, peers, receivers, _) = inject_nodes1() + let (mut gs, peers, queues, _) = inject_nodes1() .peer_no(1) .topics(vec![]) .to_subscribe(false) @@ -4533,7 +4486,7 @@ fn test_ignore_graft_from_unknown_topic() { gs.handle_graft(&peers[0], vec![Topic::new("test").hash()]); // assert that no prune got created - let (control_msgs, _) = count_control_msgs(receivers, |_, a| matches!(a, RpcOut::Prune { .. })); + let (control_msgs, _) = count_control_msgs(queues, |_, a| matches!(a, RpcOut::Prune { .. })); assert_eq!( control_msgs, 0, "we should not prune after graft in unknown topic" @@ -4544,15 +4497,15 @@ fn test_ignore_graft_from_unknown_topic() { fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { let config = Config::default(); // build gossipsub with full mesh - let (mut gs, _, mut receivers, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) .create_network(); // add another peer not in the mesh - let (peer, receiver) = add_peer(&mut gs, &topics, false, false); - receivers.insert(peer, receiver); + let (peer, queue) = add_peer(&mut gs, &topics, false, false); + queues.insert(peer, queue); // receive a message let mut seq = 0; @@ -4566,7 +4519,7 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { gs.handle_received_message(m1, &PeerId::random()); // clear events - let receivers = flush_events(&mut gs, receivers); + let queues = flush_events(&mut gs, queues); // the first gossip_retransimission many iwants return the valid message, all others are // ignored. @@ -4575,10 +4528,9 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { } assert_eq!( - receivers.into_values().fold(0, |mut fwds, c| { - let non_priority = c.non_priority.get_ref(); - while !non_priority.is_empty() { - if let Ok(RpcOut::Forward { .. }) = non_priority.try_recv() { + queues.into_values().fold(0, |mut fwds, mut queue| { + while !queue.is_empty() { + if let Ok(RpcOut::Forward { .. }) = queue.try_pop() { fwds += 1; } } @@ -4596,7 +4548,7 @@ fn test_ignore_too_many_ihaves() { .build() .unwrap(); // build gossipsub with full mesh - let (mut gs, _, mut receivers, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4604,8 +4556,8 @@ fn test_ignore_too_many_ihaves() { .create_network(); // add another peer not in the mesh - let (peer, receiver) = add_peer(&mut gs, &topics, false, false); - receivers.insert(peer, receiver); + let (peer, queue) = add_peer(&mut gs, &topics, false, false); + queues.insert(peer, queue); // peer has 20 messages let mut seq = 0; @@ -4633,7 +4585,7 @@ fn test_ignore_too_many_ihaves() { .collect(); // we send iwant only for the first 10 messages - let (control_msgs, receivers) = count_control_msgs(receivers, |p, action| { + let (control_msgs, queues) = count_control_msgs(queues, |p, action| { p == &peer && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1 && first_ten.contains(&message_ids[0])) }); @@ -4659,7 +4611,7 @@ fn test_ignore_too_many_ihaves() { } // we sent iwant for all 10 messages - let (control_msgs, _) = count_control_msgs(receivers, |p, action| { + let (control_msgs, _) = count_control_msgs(queues, |p, action| { p == &peer && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1) }); @@ -4674,7 +4626,7 @@ fn test_ignore_too_many_messages_in_ihave() { .build() .unwrap(); // build gossipsub with full mesh - let (mut gs, _, mut receivers, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4682,8 +4634,8 @@ fn test_ignore_too_many_messages_in_ihave() { .create_network(); // add another peer not in the mesh - let (peer, receiver) = add_peer(&mut gs, &topics, false, false); - receivers.insert(peer, receiver); + let (peer, queue) = add_peer(&mut gs, &topics, false, false); + queues.insert(peer, queue); // peer has 30 messages let mut seq = 0; @@ -4708,7 +4660,7 @@ fn test_ignore_too_many_messages_in_ihave() { // we send iwant only for the first 10 messages let mut sum = 0; - let (control_msgs, receivers) = count_control_msgs(receivers, |p, rpc| match rpc { + let (control_msgs, queues) = count_control_msgs(queues, |p, rpc| match rpc { RpcOut::IWant(IWant { message_ids }) => { p == &peer && { assert!(first_twelve.is_superset(&message_ids.iter().collect())); @@ -4734,7 +4686,7 @@ fn test_ignore_too_many_messages_in_ihave() { // we sent 10 iwant messages ids via a IWANT rpc. let mut sum = 0; - let (control_msgs, _) = count_control_msgs(receivers, |p, rpc| match rpc { + let (control_msgs, _) = count_control_msgs(queues, |p, rpc| match rpc { RpcOut::IWant(IWant { message_ids }) => { p == &peer && { sum += message_ids.len(); @@ -4755,7 +4707,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { .build() .unwrap(); // build gossipsub with full mesh - let (mut gs, peers, mut receivers, topics) = inject_nodes1() + let (mut gs, peers, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4768,10 +4720,10 @@ fn test_limit_number_of_message_ids_inside_ihave() { } // add two other peers not in the mesh - let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); - receivers.insert(p1, receiver1); - let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); - receivers.insert(p2, receiver2); + let (p1, queue1) = add_peer(&mut gs, &topics, false, false); + queues.insert(p1, queue1); + let (p2, queue2) = add_peer(&mut gs, &topics, false, false); + queues.insert(p2, queue2); // receive 200 messages from another peer let mut seq = 0; @@ -4789,7 +4741,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { let mut ihaves1 = HashSet::new(); let mut ihaves2 = HashSet::new(); - let (control_msgs, _) = count_control_msgs(receivers, |p, action| match action { + let (control_msgs, _) = count_control_msgs(queues, |p, action| match action { RpcOut::IHave(IHave { message_ids, .. }) => { if p == &p1 { ihaves1 = message_ids.iter().cloned().collect(); @@ -4870,7 +4822,7 @@ fn test_iwant_penalties() { let mut first_messages = Vec::new(); let mut second_messages = Vec::new(); let mut seq = 0; - for (peer, _receiver) in &other_peers { + for (peer, _queue) in &other_peers { let msg1 = random_message(&mut seq, &topics); let msg2 = random_message(&mut seq, &topics); @@ -4893,19 +4845,19 @@ fn test_iwant_penalties() { } // the peers send us all the first message ids in time - for (index, (peer, _receiver)) in other_peers.iter().enumerate() { + for (index, (peer, _queue)) in other_peers.iter().enumerate() { gs.handle_received_message(first_messages[index].clone(), peer); } // now we do a heartbeat no penalization should have been applied yet gs.heartbeat(); - for (peer, _receiver) in &other_peers { + for (peer, _queue) in &other_peers { assert_eq!(gs.as_peer_score_mut().score_report(peer).score, 0.0); } // receive the first twenty of the other peers then send their response - for (index, (peer, _receiver)) in other_peers.iter().enumerate().take(20) { + for (index, (peer, _queue)) in other_peers.iter().enumerate().take(20) { gs.handle_received_message(second_messages[index].clone(), peer); } @@ -4916,7 +4868,7 @@ fn test_iwant_penalties() { gs.heartbeat(); // now we get the second messages from the last 80 peers. - for (index, (peer, _receiver)) in other_peers.iter().enumerate() { + for (index, (peer, _queue)) in other_peers.iter().enumerate() { if index > 19 { gs.handle_received_message(second_messages[index].clone(), peer); } @@ -4930,7 +4882,7 @@ fn test_iwant_penalties() { let mut single_penalized = 0; let mut double_penalized = 0; - for (i, (peer, _receiver)) in other_peers.iter().enumerate() { + for (i, (peer, _queue)) in other_peers.iter().enumerate() { let score = gs.as_peer_score_mut().score_report(peer).score; if score == 0.0 { not_penalized += 1; @@ -4958,7 +4910,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { .flood_publish(false) .build() .unwrap(); - let (mut gs, _, mut receivers, topics) = inject_nodes1() + let (mut gs, _, mut queues, topics) = inject_nodes1() .peer_no(config.mesh_n_low() - 1) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4966,7 +4918,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { .create_network(); // add two floodsub peer, one explicit, one implicit - let (p1, receiver1) = add_peer_with_addr_and_kind( + let (p1, queue1) = add_peer_with_addr_and_kind( &mut gs, &topics, false, @@ -4974,11 +4926,11 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { Multiaddr::empty(), Some(PeerKind::Floodsub), ); - receivers.insert(p1, receiver1); + queues.insert(p1, queue1); - let (p2, receiver2) = + let (p2, queue2) = add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); - receivers.insert(p2, receiver2); + queues.insert(p2, queue2); // p1 and p2 are not in the mesh assert!(!gs.mesh[&topics[0]].contains(&p1) && !gs.mesh[&topics[0]].contains(&p2)); @@ -4988,12 +4940,11 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { gs.publish(Topic::new("test"), publish_data).unwrap(); // Collect publish messages to floodsub peers - let publishes = receivers + let publishes = queues .into_iter() - .fold(0, |mut collected_publish, (peer_id, c)| { - let priority = c.priority.get_ref(); - while !priority.is_empty() { - if matches!(priority.try_recv(), + .fold(0, |mut collected_publish, (peer_id, mut queue)| { + while !queue.is_empty() { + if matches!(queue.try_pop(), Ok(RpcOut::Publish{..}) if peer_id == p1 || peer_id == p2) { collected_publish += 1; @@ -5014,7 +4965,7 @@ fn test_do_not_use_floodsub_in_fanout() { .flood_publish(false) .build() .unwrap(); - let (mut gs, _, mut receivers, _) = inject_nodes1() + let (mut gs, _, mut queues, _) = inject_nodes1() .peer_no(config.mesh_n_low() - 1) .topics(Vec::new()) .to_subscribe(false) @@ -5025,7 +4976,7 @@ fn test_do_not_use_floodsub_in_fanout() { let topics = vec![topic.hash()]; // add two floodsub peer, one explicit, one implicit - let (p1, receiver1) = add_peer_with_addr_and_kind( + let (p1, queue1) = add_peer_with_addr_and_kind( &mut gs, &topics, false, @@ -5034,22 +4985,21 @@ fn test_do_not_use_floodsub_in_fanout() { Some(PeerKind::Floodsub), ); - receivers.insert(p1, receiver1); - let (p2, receiver2) = + queues.insert(p1, queue1); + let (p2, queue2) = add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); - receivers.insert(p2, receiver2); + queues.insert(p2, queue2); // publish a message let publish_data = vec![0; 42]; gs.publish(Topic::new("test"), publish_data).unwrap(); // Collect publish messages to floodsub peers - let publishes = receivers + let publishes = queues .into_iter() - .fold(0, |mut collected_publish, (peer_id, c)| { - let priority = c.priority.get_ref(); - while !priority.is_empty() { - if matches!(priority.try_recv(), + .fold(0, |mut collected_publish, (peer_id, mut queue)| { + while !queue.is_empty() { + if matches!(queue.try_pop(), Ok(RpcOut::Publish{..}) if peer_id == p1 || peer_id == p2) { collected_publish += 1; @@ -5101,14 +5051,14 @@ fn test_dont_add_floodsub_peers_to_mesh_on_join() { #[test] fn test_dont_send_px_to_old_gossipsub_peers() { - let (mut gs, _, receivers, topics) = inject_nodes1() + let (mut gs, _, queues, topics) = inject_nodes1() .peer_no(0) .topics(vec!["test".into()]) .to_subscribe(false) .create_network(); // add an old gossipsub peer - let (p1, _receiver1) = add_peer_with_addr_and_kind( + let (p1, _queue1) = add_peer_with_addr_and_kind( &mut gs, &topics, false, @@ -5125,7 +5075,7 @@ fn test_dont_send_px_to_old_gossipsub_peers() { ); // check that prune does not contain px - let (control_msgs, _) = count_control_msgs(receivers, |_, m| match m { + let (control_msgs, _) = count_control_msgs(queues, |_, m| match m { RpcOut::Prune(Prune { peers: px, .. }) => !px.is_empty(), _ => false, }); @@ -5135,7 +5085,7 @@ fn test_dont_send_px_to_old_gossipsub_peers() { #[test] fn test_dont_send_floodsub_peers_in_px() { // build mesh with one peer - let (mut gs, peers, receivers, topics) = inject_nodes1() + let (mut gs, peers, queues, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -5160,7 +5110,7 @@ fn test_dont_send_floodsub_peers_in_px() { ); // check that px in prune message is empty - let (control_msgs, _) = count_control_msgs(receivers, |_, m| match m { + let (control_msgs, _) = count_control_msgs(queues, |_, m| match m { RpcOut::Prune(Prune { peers: px, .. }) => !px.is_empty(), _ => false, }); @@ -5251,14 +5201,14 @@ fn test_subscribe_and_graft_with_negative_score() { ))) .create_network(); - let (mut gs2, _, receivers, _) = inject_nodes1().create_network(); + let (mut gs2, _, queues, _) = inject_nodes1().create_network(); let connection_id = ConnectionId::new_unchecked(0); let topic = Topic::new("test"); - let (p2, _receiver1) = add_peer(&mut gs1, &Vec::new(), true, false); - let (p1, _receiver2) = add_peer(&mut gs2, &topic_hashes, false, false); + let (p2, _queue1) = add_peer(&mut gs1, &Vec::new(), true, false); + let (p1, _queue2) = add_peer(&mut gs2, &topic_hashes, false, false); // add penalty to peer p2 gs1.as_peer_score_mut().add_penalty(&p2, 1); @@ -5272,12 +5222,11 @@ fn test_subscribe_and_graft_with_negative_score() { p1: PeerId, p2: PeerId, connection_id: ConnectionId, - receivers: HashMap| - -> HashMap { - let new_receivers = HashMap::new(); - for (peer_id, receiver) in receivers.into_iter() { - let non_priority = receiver.non_priority.get_ref(); - match non_priority.try_recv() { + queues: HashMap>| + -> HashMap> { + let new_queues = HashMap::new(); + for (peer_id, mut receiver_queue) in queues.into_iter() { + match receiver_queue.try_pop() { Ok(rpc) if peer_id == p1 => { gs1.on_connection_handler_event( p2, @@ -5291,18 +5240,18 @@ fn test_subscribe_and_graft_with_negative_score() { _ => {} } } - new_receivers + new_queues }; // forward the subscribe message - let receivers = forward_messages_to_p1(&mut gs1, p1, p2, connection_id, receivers); + let queues = forward_messages_to_p1(&mut gs1, p1, p2, connection_id, queues); // heartbeats on both gs1.heartbeat(); gs2.heartbeat(); // forward messages again - forward_messages_to_p1(&mut gs1, p1, p2, connection_id, receivers); + forward_messages_to_p1(&mut gs1, p1, p2, connection_id, queues); // nobody got penalized assert!(gs1.as_peer_score_mut().score_report(&p2).score >= original_score); @@ -5344,7 +5293,7 @@ fn test_graft_without_subscribe() { /// that run Gossipsub v1.2. #[test] fn sends_idontwant() { - let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() + let (mut gs, peers, queues, topic_hashes) = inject_nodes1() .peer_no(5) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -5366,12 +5315,11 @@ fn sends_idontwant() { }; gs.handle_received_message(message.clone(), &local_id); assert_eq!( - receivers + queues .into_iter() - .fold(0, |mut idontwants, (peer_id, c)| { - let non_priority = c.non_priority.get_ref(); - while !non_priority.is_empty() { - if let Ok(RpcOut::IDontWant(_)) = non_priority.try_recv() { + .fold(0, |mut idontwants, (peer_id, mut queue)| { + while !queue.is_empty() { + if let Ok(RpcOut::IDontWant(_)) = queue.try_pop() { assert_ne!(peer_id, peers[1]); idontwants += 1; } @@ -5385,7 +5333,7 @@ fn sends_idontwant() { #[test] fn doesnt_sends_idontwant_for_lower_message_size() { - let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() + let (mut gs, peers, queues, topic_hashes) = inject_nodes1() .peer_no(5) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -5408,12 +5356,11 @@ fn doesnt_sends_idontwant_for_lower_message_size() { gs.handle_received_message(message.clone(), &local_id); assert_eq!( - receivers + queues .into_iter() - .fold(0, |mut idontwants, (peer_id, c)| { - let non_priority = c.non_priority.get_ref(); - while !non_priority.is_empty() { - if let Ok(RpcOut::IDontWant(_)) = non_priority.try_recv() { + .fold(0, |mut idontwants, (peer_id, mut queue)| { + while !queue.is_empty() { + if let Ok(RpcOut::IDontWant(_)) = queue.try_pop() { assert_ne!(peer_id, peers[1]); idontwants += 1; } @@ -5429,7 +5376,7 @@ fn doesnt_sends_idontwant_for_lower_message_size() { /// that don't run Gossipsub v1.2. #[test] fn doesnt_send_idontwant() { - let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() + let (mut gs, peers, queues, topic_hashes) = inject_nodes1() .peer_no(5) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -5451,12 +5398,11 @@ fn doesnt_send_idontwant() { }; gs.handle_received_message(message.clone(), &local_id); assert_eq!( - receivers + queues .into_iter() - .fold(0, |mut idontwants, (peer_id, c)| { - let non_priority = c.non_priority.get_ref(); - while !non_priority.is_empty() { - if matches!(non_priority.try_recv(), Ok(RpcOut::IDontWant(_)) if peer_id != peers[1]) { + .fold(0, |mut idontwants, (peer_id, mut queue)| { + while !queue.is_empty() { + if matches!(queue.try_pop(), Ok(RpcOut::IDontWant(_)) if peer_id != peers[1]) { idontwants += 1; } } @@ -5471,7 +5417,7 @@ fn doesnt_send_idontwant() { /// that sent IDONTWANT. #[test] fn doesnt_forward_idontwant() { - let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() + let (mut gs, peers, queues, topic_hashes) = inject_nodes1() .peer_no(4) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -5501,16 +5447,17 @@ fn doesnt_forward_idontwant() { gs.handle_received_message(raw_message.clone(), &local_id); assert_eq!( - receivers.into_iter().fold(0, |mut fwds, (peer_id, c)| { - let non_priority = c.non_priority.get_ref(); - while !non_priority.is_empty() { - if let Ok(RpcOut::Forward { .. }) = non_priority.try_recv() { - assert_ne!(peer_id, peers[2]); - fwds += 1; + queues + .into_iter() + .fold(0, |mut fwds, (peer_id, mut queue)| { + while !queue.is_empty() { + if let Ok(RpcOut::Forward { .. }) = queue.try_pop() { + assert_ne!(peer_id, peers[2]); + fwds += 1; + } } - } - fwds - }), + fwds + }), 2, "IDONTWANT was not sent" ); @@ -5520,7 +5467,7 @@ fn doesnt_forward_idontwant() { /// IDONTWANT message to the respective peer. #[test] fn parses_idontwant() { - let (mut gs, peers, _receivers, _topic_hashes) = inject_nodes1() + let (mut gs, peers, _queues, _topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -5552,7 +5499,7 @@ fn parses_idontwant() { /// Test that a node clears stale IDONTWANT messages. #[test] fn clear_stale_idontwant() { - let (mut gs, peers, _receivers, _topic_hashes) = inject_nodes1() + let (mut gs, peers, _queues, _topic_hashes) = inject_nodes1() .peer_no(4) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -5593,15 +5540,14 @@ fn test_all_queues_full() { connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), - sender: Sender::new(2), + messages: Queue::new(1), dont_send: LinkedHashMap::new(), }, ); - let publish_data = vec![0; 42]; - gs.publish(topic_hash.clone(), publish_data.clone()) - .unwrap(); let publish_data = vec![2; 59]; + let result = gs.publish(topic_hash.clone(), publish_data.clone()); + assert!(result.is_ok()); let err = gs.publish(topic_hash, publish_data).unwrap_err(); assert!(matches!(err, PublishError::AllQueuesFull(f) if f == 1)); } @@ -5622,6 +5568,8 @@ fn test_slow_peer_returns_failed_publish() { let slow_peer_id = PeerId::random(); peers.push(slow_peer_id); + let mesh = gs.mesh.entry(topic_hash.clone()).or_default(); + mesh.insert(slow_peer_id); gs.connected_peers.insert( slow_peer_id, PeerDetails { @@ -5629,7 +5577,7 @@ fn test_slow_peer_returns_failed_publish() { connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), - sender: Sender::new(2), + messages: Queue::new(1), dont_send: LinkedHashMap::new(), }, ); @@ -5642,43 +5590,34 @@ fn test_slow_peer_returns_failed_publish() { connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), - sender: Sender::new(gs.config.connection_handler_queue_len()), + messages: Queue::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), }, ); let publish_data = vec![0; 42]; - gs.publish(topic_hash.clone(), publish_data.clone()) - .unwrap(); - let publish_data = vec![2; 59]; - gs.publish(topic_hash.clone(), publish_data).unwrap(); - gs.heartbeat(); - + let _failed_publish = gs.publish(topic_hash.clone(), publish_data.clone()); + let _failed_publish = gs.publish(topic_hash.clone(), publish_data.clone()); gs.heartbeat(); - let slow_peer_failed_messages = match gs.events.pop_front().unwrap() { - ToSwarm::GenerateEvent(Event::SlowPeer { - peer_id, - failed_messages, - }) if peer_id == slow_peer_id => failed_messages, - _ => panic!("invalid event"), - }; + let slow_peer_failed_messages = gs + .events + .into_iter() + .find_map(|e| match e { + ToSwarm::GenerateEvent(Event::SlowPeer { + peer_id, + failed_messages, + }) if peer_id == slow_peer_id => Some(failed_messages), + _ => None, + }) + .expect("No SlowPeer event found"); let failed_messages = FailedMessages { - publish: 1, - forward: 0, - priority: 1, - non_priority: 0, + queue_full: 1, timeout: 0, }; - assert_eq!(slow_peer_failed_messages.priority, failed_messages.priority); - assert_eq!( - slow_peer_failed_messages.non_priority, - failed_messages.non_priority - ); - assert_eq!(slow_peer_failed_messages.publish, failed_messages.publish); - assert_eq!(slow_peer_failed_messages.forward, failed_messages.forward); + assert_eq!(slow_peer_failed_messages, failed_messages); } #[test] @@ -5703,7 +5642,7 @@ fn test_slow_peer_returns_failed_ihave_handling() { connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), - sender: Sender::new(2), + messages: Queue::new(1), dont_send: LinkedHashMap::new(), }, ); @@ -5720,7 +5659,7 @@ fn test_slow_peer_returns_failed_ihave_handling() { connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), - sender: Sender::new(gs.config.connection_handler_queue_len()), + messages: Queue::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), }, ); @@ -5778,20 +5717,11 @@ fn test_slow_peer_returns_failed_ihave_handling() { .unwrap(); let failed_messages = FailedMessages { - publish: 0, - forward: 0, - priority: 0, - non_priority: 1, timeout: 0, + queue_full: 1, }; - assert_eq!(slow_peer_failed_messages.priority, failed_messages.priority); - assert_eq!( - slow_peer_failed_messages.non_priority, - failed_messages.non_priority - ); - assert_eq!(slow_peer_failed_messages.publish, failed_messages.publish); - assert_eq!(slow_peer_failed_messages.forward, failed_messages.forward); + assert_eq!(slow_peer_failed_messages, failed_messages); } #[test] @@ -5817,7 +5747,7 @@ fn test_slow_peer_returns_failed_iwant_handling() { connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), - sender: Sender::new(2), + messages: Queue::new(1), dont_send: LinkedHashMap::new(), }, ); @@ -5834,7 +5764,7 @@ fn test_slow_peer_returns_failed_iwant_handling() { connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), - sender: Sender::new(gs.config.connection_handler_queue_len()), + messages: Queue::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), }, ); @@ -5872,20 +5802,11 @@ fn test_slow_peer_returns_failed_iwant_handling() { .unwrap(); let failed_messages = FailedMessages { - publish: 0, - forward: 1, - priority: 0, - non_priority: 1, + queue_full: 1, timeout: 0, }; - assert_eq!(slow_peer_failed_messages.priority, failed_messages.priority); - assert_eq!( - slow_peer_failed_messages.non_priority, - failed_messages.non_priority - ); - assert_eq!(slow_peer_failed_messages.publish, failed_messages.publish); - assert_eq!(slow_peer_failed_messages.forward, failed_messages.forward); + assert_eq!(slow_peer_failed_messages, failed_messages); } #[test] @@ -5911,7 +5832,7 @@ fn test_slow_peer_returns_failed_forward() { connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), - sender: Sender::new(2), + messages: Queue::new(1), dont_send: LinkedHashMap::new(), }, ); @@ -5928,7 +5849,7 @@ fn test_slow_peer_returns_failed_forward() { connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), - sender: Sender::new(gs.config.connection_handler_queue_len()), + messages: Queue::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), }, ); @@ -5966,20 +5887,11 @@ fn test_slow_peer_returns_failed_forward() { .unwrap(); let failed_messages = FailedMessages { - publish: 0, - forward: 1, - priority: 0, - non_priority: 1, + queue_full: 1, timeout: 0, }; - assert_eq!(slow_peer_failed_messages.priority, failed_messages.priority); - assert_eq!( - slow_peer_failed_messages.non_priority, - failed_messages.non_priority - ); - assert_eq!(slow_peer_failed_messages.publish, failed_messages.publish); - assert_eq!(slow_peer_failed_messages.forward, failed_messages.forward); + assert_eq!(slow_peer_failed_messages, failed_messages); } #[test] @@ -6010,7 +5922,7 @@ fn test_slow_peer_is_downscored_on_publish() { connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), - sender: Sender::new(2), + messages: Queue::new(1), dont_send: LinkedHashMap::new(), }, ); @@ -6024,7 +5936,7 @@ fn test_slow_peer_is_downscored_on_publish() { connections: vec![ConnectionId::new_unchecked(0)], outbound: false, topics: topics.clone(), - sender: Sender::new(gs.config.connection_handler_queue_len()), + messages: Queue::new(gs.config.connection_handler_queue_len()), dont_send: LinkedHashMap::new(), }, ); @@ -6035,47 +5947,9 @@ fn test_slow_peer_is_downscored_on_publish() { let publish_data = vec![2; 59]; gs.publish(topic_hash.clone(), publish_data).unwrap(); gs.heartbeat(); - let slow_peer_score = gs.as_peer_score_mut().score_report(&slow_peer_id).score; - assert_eq!(slow_peer_score, slow_peer_params.slow_peer_weight); -} - -#[tokio::test] -async fn test_timedout_messages_are_reported() { - let gs_config = ConfigBuilder::default() - .validation_mode(ValidationMode::Permissive) - .build() - .unwrap(); - - let mut gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, gs_config).unwrap(); - - let sender = Sender::new(2); - let topic_hash = Topic::new("Test").hash(); - let publish_data = vec![2; 59]; - let raw_message = gs.build_raw_message(topic_hash, publish_data).unwrap(); - - sender - .send_message(RpcOut::Publish { - message: raw_message, - timeout: Delay::new(Duration::from_nanos(1)), - }) - .unwrap(); - let mut receiver = sender.new_receiver(); - let stale = future::poll_fn(|cx| receiver.poll_stale(cx)).await.unwrap(); - assert!(matches!(stale, RpcOut::Publish { .. })); -} - -#[test] -fn test_priority_messages_are_always_sent() { - let sender = Sender::new(2); - let topic_hash = Topic::new("Test").hash(); - // Fill the buffer with the first message. - assert!(sender - .send_message(RpcOut::Subscribe(topic_hash.clone())) - .is_ok()); - assert!(sender - .send_message(RpcOut::Subscribe(topic_hash.clone())) - .is_ok()); - assert!(sender.send_message(RpcOut::Unsubscribe(topic_hash)).is_ok()); + let slow_peer_score = gs.peer_score(&slow_peer_id).unwrap(); + // There should be two penalties for the two failed messages. + assert_eq!(slow_peer_score, slow_peer_params.slow_peer_weight * 2.0); } /// Test that specific topic configurations are correctly applied @@ -6422,7 +6296,7 @@ fn test_fanout_with_topic_config() { .build() .unwrap(); - let (mut gs, _, receivers, topic_hashes) = inject_nodes1() + let (mut gs, _, queues, topic_hashes) = inject_nodes1() .peer_no(10) // More than mesh_n .topics(vec![topic.clone()]) .to_subscribe(true) @@ -6445,12 +6319,11 @@ fn test_fanout_with_topic_config() { ); // Collect publish messages - let publishes = receivers + let publishes = queues .into_values() - .fold(vec![], |mut collected_publish, c| { - let priority = c.priority.get_ref(); - while !priority.is_empty() { - if let Ok(RpcOut::Publish { message, .. }) = priority.try_recv() { + .fold(vec![], |mut collected_publish, mut queue| { + while !queue.is_empty() { + if let Ok(RpcOut::Publish { message, .. }) = queue.try_pop() { collected_publish.push(message); } } diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index a2d05d8a3ff..1f6b8fe35dc 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -37,7 +37,7 @@ use web_time::Instant; use crate::{ protocol::{GossipsubCodec, ProtocolConfig}, - rpc::Receiver, + queue::Queue, rpc_proto::proto, types::{PeerKind, RawMessage, RpcIn, RpcOut}, ValidationError, @@ -60,7 +60,7 @@ pub enum HandlerEvent { /// which protocol. This message only occurs once per connection. PeerKind(PeerKind), /// A message to be published was dropped because it could not be sent in time. - MessageDropped(RpcOut), + MessagesDropped(RpcOut), } /// A message sent from the behaviour to the handler. @@ -98,8 +98,8 @@ pub struct EnabledHandler { /// The single long-lived inbound substream. inbound_substream: Option, - /// Queue of values that we want to send to the remote - send_queue: Receiver, + /// Queue of dispatched Rpc messages to send. + message_queue: Queue, /// Flag indicating that an outbound substream is being established to prevent duplicate /// requests. @@ -162,7 +162,7 @@ enum OutboundSubstreamState { impl Handler { /// Builds a new [`Handler`]. - pub fn new(protocol_config: ProtocolConfig, message_queue: Receiver) -> Self { + pub(crate) fn new(protocol_config: ProtocolConfig, message_queue: Queue) -> Self { Handler::Enabled(EnabledHandler { listen_protocol: protocol_config, inbound_substream: None, @@ -170,7 +170,7 @@ impl Handler { outbound_substream_establishing: false, outbound_substream_attempts: 0, inbound_substream_attempts: 0, - send_queue: message_queue, + message_queue, peer_kind: None, peer_kind_sent: false, last_io_activity: Instant::now(), @@ -234,7 +234,7 @@ impl EnabledHandler { } // determine if we need to create the outbound stream - if !self.send_queue.poll_is_empty(cx) + if !self.message_queue.is_empty() && self.outbound_substream.is_none() && !self.outbound_substream_establishing { @@ -252,22 +252,25 @@ impl EnabledHandler { { // outbound idle state Some(OutboundSubstreamState::WaitingOutput(substream)) => { - if let Poll::Ready(Some(mut message)) = self.send_queue.poll_next_unpin(cx) { + if let Poll::Ready(mut message) = Pin::new(&mut self.message_queue).poll_pop(cx) + { match message { RpcOut::Publish { message: _, ref mut timeout, + .. } | RpcOut::Forward { message: _, ref mut timeout, + .. } => { if Pin::new(timeout).poll(cx).is_ready() { // Inform the behaviour and end the poll. self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream)); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - HandlerEvent::MessageDropped(message), + HandlerEvent::MessagesDropped(message), )); } } @@ -407,13 +410,6 @@ impl EnabledHandler { } } - // Drop the next message in queue if it's stale. - if let Poll::Ready(Some(rpc)) = self.send_queue.poll_stale(cx) { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - HandlerEvent::MessageDropped(rpc), - )); - } - Poll::Pending } } diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index a116900be0e..f1d42d6cddb 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -105,7 +105,7 @@ mod mcache; mod metrics; mod peer_score; mod protocol; -mod rpc; +mod queue; mod rpc_proto; mod subscription_filter; mod time_cache; diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index 37fe5481689..ed504bafd45 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -133,10 +133,6 @@ pub(crate) struct Metrics { ignored_messages: Family, /// The number of messages rejected by the application (validation result). rejected_messages: Family, - /// The number of publish messages dropped by the sender. - publish_messages_dropped: Family, - /// The number of forward messages dropped by the sender. - forward_messages_dropped: Family, /// The number of messages that timed out and could not be sent. timedout_messages_dropped: Family, @@ -194,9 +190,7 @@ pub(crate) struct Metrics { idontwant_msgs_ids: Counter, /// The size of the priority queue. - priority_queue_size: Histogram, - /// The size of the non-priority queue. - non_priority_queue_size: Histogram, + queue_size: Histogram, } impl Metrics { @@ -245,16 +239,6 @@ impl Metrics { "Number of rejected messages received for each topic" ); - let publish_messages_dropped = register_family!( - "publish_messages_dropped_per_topic", - "Number of publish messages dropped per topic" - ); - - let forward_messages_dropped = register_family!( - "forward_messages_dropped_per_topic", - "Number of forward messages dropped per topic" - ); - let timedout_messages_dropped = register_family!( "timedout_messages_dropped_per_topic", "Number of timedout messages dropped per topic" @@ -361,18 +345,11 @@ impl Metrics { metric }; - let priority_queue_size = Histogram::new(linear_buckets(0.0, 25.0, 100)); + let queue_size = Histogram::new(linear_buckets(0.0, 50.0, 100)); registry.register( "priority_queue_size", "Histogram of observed priority queue sizes", - priority_queue_size.clone(), - ); - - let non_priority_queue_size = Histogram::new(linear_buckets(0.0, 25.0, 100)); - registry.register( - "non_priority_queue_size", - "Histogram of observed non-priority queue sizes", - non_priority_queue_size.clone(), + queue_size.clone(), ); Self { @@ -385,8 +362,6 @@ impl Metrics { accepted_messages, ignored_messages, rejected_messages, - publish_messages_dropped, - forward_messages_dropped, timedout_messages_dropped, mesh_peer_counts, mesh_peer_inclusion_events, @@ -405,8 +380,7 @@ impl Metrics { topic_iwant_msgs, idontwant_msgs, idontwant_msgs_ids, - priority_queue_size, - non_priority_queue_size, + queue_size, } } @@ -537,20 +511,6 @@ impl Metrics { } } - /// Register dropping a Publish message over a topic. - pub(crate) fn publish_msg_dropped(&mut self, topic: &TopicHash) { - if self.register_topic(topic).is_ok() { - self.publish_messages_dropped.get_or_create(topic).inc(); - } - } - - /// Register dropping a Forward message over a topic. - pub(crate) fn forward_msg_dropped(&mut self, topic: &TopicHash) { - if self.register_topic(topic).is_ok() { - self.forward_messages_dropped.get_or_create(topic).inc(); - } - } - /// Register dropping a message that timedout over a topic. pub(crate) fn timeout_msg_dropped(&mut self, topic: &TopicHash) { if self.register_topic(topic).is_ok() { @@ -616,12 +576,7 @@ impl Metrics { /// Observes a priority queue size. pub(crate) fn observe_priority_queue_size(&mut self, len: usize) { - self.priority_queue_size.observe(len as f64); - } - - /// Observes a non-priority queue size. - pub(crate) fn observe_non_priority_queue_size(&mut self, len: usize) { - self.non_priority_queue_size.observe(len as f64); + self.queue_size.observe(len as f64); } /// Observe a score of a mesh peer. diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 821c11d2132..74dcc669f55 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -659,6 +659,7 @@ mod tests { let rpc = RpcOut::Publish { message: message.clone(), timeout: Delay::new(Duration::from_secs(1)), + message_id: MessageId(vec![0, 0]), }; let mut codec = diff --git a/protocols/gossipsub/src/queue.rs b/protocols/gossipsub/src/queue.rs new file mode 100644 index 00000000000..ae6048794cb --- /dev/null +++ b/protocols/gossipsub/src/queue.rs @@ -0,0 +1,148 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::{ + collections::{BinaryHeap, HashMap}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, + task::{Context, Poll, Waker}, +}; + +/// An async priority queue used to dispatch messages from the `NetworkBehaviour` +/// to the `ConnectionHandler`. Inspired by loole and flume. +#[derive(Debug)] +pub(crate) struct Queue { + shared: Arc>>, + capacity: usize, + id: usize, + count: Arc, +} + +/// The shared stated by the `NetworkBehaviour`s and the `ConnectionHandler`s. +#[derive(Debug)] +pub(crate) struct Shared { + queue: BinaryHeap, + pending_pops: HashMap, +} + +impl Queue { + /// Create a new `Queue` with the `capacity`. + pub(crate) fn new(capacity: usize) -> Self { + Self { + shared: Arc::new(Mutex::new(Shared { + queue: BinaryHeap::with_capacity(capacity), + pending_pops: Default::default(), + })), + capacity, + count: Arc::new(AtomicUsize::new(1)), + id: 1, + } + } + + /// Push an item to the queue ignoring if it's full or not. + pub(crate) fn push(&mut self, item: T) { + let mut shared = self.shared.lock().expect("lock to not be poisoned"); + shared.queue.push(item); + + // Wake pending registered pops. + for (_, s) in shared.pending_pops.drain() { + s.wake(); + } + } + + /// Try to add an item to the Queue, return Err if the queue is full. + pub(crate) fn try_push(&mut self, item: T) -> Result<(), T> { + let mut shared = self.shared.lock().expect("lock to not be poisoned"); + if shared.queue.len() >= self.capacity { + return Err(item); + } + shared.queue.push(item); + + // Wake pending registered pops. + for (_, s) in shared.pending_pops.drain() { + s.wake(); + } + Ok(()) + } + + /// Pop an element from the queue. + pub(crate) fn poll_pop(self: std::pin::Pin<&mut Self>, cx: &mut Context) -> Poll { + let mut shared = self.shared.lock().expect("lock to not be poisoned"); + match shared.queue.pop() { + Some(t) => Poll::Ready(t), + None => { + shared + .pending_pops + .entry(self.id) + .or_insert(cx.waker().clone()); + Poll::Pending + } + } + } + + /// Attempts to pop an item from the queue. + /// this method returns an error if the queue is empty. + #[cfg(test)] + pub(crate) fn try_pop(&mut self) -> Result { + let mut shared = self.shared.lock().expect("lock to not be poisoned"); + shared.queue.pop().ok_or(()) + } + + /// Retain only the elements specified by the predicate. + /// In other words, remove all elements e for which f(&e) returns false. The elements are + /// visited in unsorted (and unspecified) order. Returns the cleared items. + pub(crate) fn retain bool>(&mut self, f: F) { + let mut shared = self.shared.lock().expect("lock to not be poisoned"); + shared.queue.retain(f); + } + + /// Returns the length of the queue. + #[cfg(feature = "metrics")] + pub(crate) fn len(&self) -> usize { + let shared = self.shared.lock().expect("lock to not be poisoned"); + shared.queue.len() + } + + /// Check if the queue is empty. + pub(crate) fn is_empty(&self) -> bool { + let shared = self.shared.lock().expect("lock to not be poisoned"); + shared.queue.len() == 0 + } +} + +impl Clone for Queue { + fn clone(&self) -> Self { + Self { + shared: self.shared.clone(), + capacity: self.capacity, + count: self.count.clone(), + id: self.count.fetch_add(1, Ordering::SeqCst), + } + } +} + +impl Drop for Queue { + fn drop(&mut self) { + let mut shared = self.shared.lock().expect("lock to not be poisoned"); + shared.pending_pops.remove(&self.id); + } +} diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 8f8a4f38a88..b1279be7de2 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -19,7 +19,11 @@ // DEALINGS IN THE SOFTWARE. //! A collection of types using the Gossipsub system. -use std::{collections::BTreeSet, fmt, fmt::Debug}; +use std::{ + cmp::Ordering, + collections::BTreeSet, + fmt::{self, Debug}, +}; use futures_timer::Delay; use hashlink::LinkedHashMap; @@ -30,36 +34,18 @@ use quick_protobuf::MessageWrite; use serde::{Deserialize, Serialize}; use web_time::Instant; -use crate::{rpc::Sender, rpc_proto::proto, TopicHash}; +use crate::{queue::Queue, rpc_proto::proto, TopicHash}; /// Messages that have expired while attempting to be sent to a peer. -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct FailedMessages { - /// The number of publish messages that failed to be published in a heartbeat. - pub publish: usize, - /// The number of forward messages that failed to be published in a heartbeat. - pub forward: usize, - /// The number of messages that were failed to be sent to the priority queue as it was full. - pub priority: usize, /// The number of messages that were failed to be sent to the non-priority queue as it was /// full. - pub non_priority: usize, + pub queue_full: usize, /// The number of messages that timed out and could not be sent. pub timeout: usize, } -impl FailedMessages { - /// The total number of messages that failed due to the queue being full. - pub fn total_queue_full(&self) -> usize { - self.priority + self.non_priority - } - - /// The total failed messages in a heartbeat. - pub fn total(&self) -> usize { - self.priority + self.non_priority - } -} - #[derive(Debug)] /// Validation kinds from the application for received messages. pub enum MessageAcceptance { @@ -111,10 +97,10 @@ pub(crate) struct PeerDetails { pub(crate) connections: Vec, /// Subscribed topics. pub(crate) topics: BTreeSet, - /// The rpc sender to the connection handler(s). - pub(crate) sender: Sender, /// Don't send messages. pub(crate) dont_send: LinkedHashMap, + /// Message queue consumed by the connection handler. + pub(crate) messages: Queue, } /// Describes the types of peers that can exist in the gossipsub context. @@ -319,10 +305,18 @@ pub struct IDontWant { pub enum RpcOut { /// Publish a Gossipsub message on network.`timeout` limits the duration the message /// can wait to be sent before it is abandoned. - Publish { message: RawMessage, timeout: Delay }, + Publish { + message_id: MessageId, + message: RawMessage, + timeout: Delay, + }, /// Forward a Gossipsub message on network. `timeout` limits the duration the message /// can wait to be sent before it is abandoned. - Forward { message: RawMessage, timeout: Delay }, + Forward { + message_id: MessageId, + message: RawMessage, + timeout: Delay, + }, /// Subscribe a topic. Subscribe(TopicHash), /// Unsubscribe a topic. @@ -346,24 +340,90 @@ impl RpcOut { pub fn into_protobuf(self) -> proto::RPC { self.into() } + + /// Returns true if the `RpcOut` is high priority. + pub(crate) fn high_priority(&self) -> bool { + matches!( + self, + RpcOut::Subscribe(_) + | RpcOut::Unsubscribe(_) + | RpcOut::Graft(_) + | RpcOut::Prune(_) + | RpcOut::IDontWant(_) + ) + } +} + +impl Eq for RpcOut {} +impl PartialEq for RpcOut { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + ( + Self::Publish { + message_id: l_message_id, + .. + }, + Self::Publish { + message_id: r_message_id, + .. + }, + ) => l_message_id == r_message_id, + ( + Self::Forward { + message_id: l_message_id, + .. + }, + Self::Forward { + message_id: r_message_id, + .. + }, + ) => l_message_id == r_message_id, + (Self::Subscribe(l0), Self::Subscribe(r0)) => l0 == r0, + (Self::Unsubscribe(l0), Self::Unsubscribe(r0)) => l0 == r0, + (Self::Graft(l0), Self::Graft(r0)) => l0 == r0, + (Self::Prune(l0), Self::Prune(r0)) => l0 == r0, + (Self::IHave(l0), Self::IHave(r0)) => l0 == r0, + (Self::IWant(l0), Self::IWant(r0)) => l0 == r0, + (Self::IDontWant(l0), Self::IDontWant(r0)) => l0 == r0, + _ => false, + } + } +} + +impl Ord for RpcOut { + fn cmp(&self, other: &Self) -> Ordering { + match (self.high_priority(), other.high_priority()) { + (true, true) | (false, false) => { + // Among non priority messages, `RpcOut::Publish` has the higher priority. + match (self, other) { + (RpcOut::Publish { .. }, RpcOut::Publish { .. }) => Ordering::Equal, + (RpcOut::Publish { .. }, _) => Ordering::Greater, + (_, RpcOut::Publish { .. }) => Ordering::Less, + _ => Ordering::Equal, + } + } + (true, false) => Ordering::Greater, + (false, true) => Ordering::Less, + } + } +} + +impl PartialOrd for RpcOut { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } } impl From for proto::RPC { /// Converts the RPC into protobuf format. fn from(rpc: RpcOut) -> Self { match rpc { - RpcOut::Publish { - message, - timeout: _, - } => proto::RPC { + RpcOut::Publish { message, .. } => proto::RPC { subscriptions: Vec::new(), publish: vec![message.into()], control: None, }, - RpcOut::Forward { - message, - timeout: _, - } => proto::RPC { + RpcOut::Forward { message, .. } => proto::RPC { publish: vec![message.into()], subscriptions: Vec::new(), control: None,