Skip to content

Commit 0181d45

Browse files
apollo_network: stress-test: broadcast in sqmr mode
1 parent e49d986 commit 0181d45

File tree

8 files changed

+150
-35
lines changed

8 files changed

+150
-35
lines changed

crates/apollo_network/src/bin/broadcast_network_stress_test_node/message_handling.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -94,19 +94,22 @@ impl MessageSender {
9494
client.broadcast_message(message).await.unwrap();
9595
}
9696
MessageSender::Sqmr(client) => {
97-
// Send query and properly handle the response manager to avoid session warnings
98-
match client.send_new_query(message).await {
99-
Ok(mut response_manager) => {
100-
// Consume the response manager to properly close the session
101-
// This prevents the "finished with no messages" warning
102-
tokio::spawn(async move {
103-
while let Some(_response) = response_manager.next().await {
104-
// Process any responses if they come, but don't block the sender
105-
}
106-
});
107-
}
108-
Err(e) => {
109-
error!("Failed to send SQMR query: {:?}", e);
97+
// Send query to all specified peers
98+
for peer_id in _peers {
99+
match client.send_new_query_to_peer(message.clone(), *peer_id).await {
100+
Ok(mut response_manager) => {
101+
// Consume the response manager to properly close the session
102+
// This prevents the "finished with no messages" warning
103+
tokio::spawn(async move {
104+
while let Some(_response) = response_manager.next().await {
105+
// Process any responses if they come, but don't block the
106+
// sender
107+
}
108+
});
109+
}
110+
Err(e) => {
111+
error!("Failed to send SQMR query to peer {:?}: {:?}", peer_id, e);
112+
}
110113
}
111114
}
112115
}

crates/apollo_network/src/network_manager/mod.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -715,8 +715,10 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
715715
protocol: StreamProtocol,
716716
client_payload: SqmrClientPayload,
717717
) {
718-
let SqmrClientPayload { query, report_receiver, responses_sender } = client_payload;
719-
let outbound_session_id = self.swarm.send_query(query, protocol.clone());
718+
let SqmrClientPayload { query, report_receiver, responses_sender, peer_id } =
719+
client_payload;
720+
// Use the peer_id from the payload (None for round-robin, Some for specific peer)
721+
let outbound_session_id = self.swarm.send_query(query, protocol.clone(), peer_id);
720722
if let Some(sqmr_metrics) =
721723
self.metrics.as_ref().and_then(|metrics| metrics.sqmr_metrics.as_ref())
722724
{
@@ -952,7 +954,25 @@ where
952954
let query = Bytes::from(query);
953955
let responses_sender =
954956
Box::new(responses_sender.with(|response| ready(Ok(Response::try_from(response)))));
955-
let payload = SqmrClientPayload { query, report_receiver, responses_sender };
957+
let payload = SqmrClientPayload { query, report_receiver, responses_sender, peer_id: None };
958+
self.sender.send(payload).await?;
959+
Ok(ClientResponsesManager { report_sender, responses_receiver })
960+
}
961+
962+
pub async fn send_new_query_to_peer(
963+
&mut self,
964+
query: Query,
965+
peer_id: PeerId,
966+
) -> Result<ClientResponsesManager<Response>, SendError> {
967+
let (report_sender, report_receiver) = oneshot::channel::<()>();
968+
let (responses_sender, responses_receiver) =
969+
futures::channel::mpsc::channel(self.buffer_size);
970+
let responses_receiver = Box::new(responses_receiver);
971+
let query = Bytes::from(query);
972+
let responses_sender =
973+
Box::new(responses_sender.with(|response| ready(Ok(Response::try_from(response)))));
974+
let payload =
975+
SqmrClientPayload { query, report_receiver, responses_sender, peer_id: Some(peer_id) };
956976
self.sender.send(payload).await?;
957977
Ok(ClientResponsesManager { report_sender, responses_receiver })
958978
}
@@ -987,6 +1007,7 @@ pub struct SqmrClientPayload {
9871007
query: Bytes,
9881008
report_receiver: ReportReceiver,
9891009
responses_sender: ResponsesSender,
1010+
peer_id: Option<PeerId>,
9901011
}
9911012

9921013
pub struct SqmrServerReceiver<Query, Response>

crates/apollo_network/src/network_manager/swarm_trait.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@ pub trait SwarmTrait: Stream<Item = Event> + Unpin {
2121
inbound_session_id: InboundSessionId,
2222
) -> Result<(), SessionIdNotFoundError>;
2323

24-
fn send_query(&mut self, query: Vec<u8>, protocol: StreamProtocol) -> OutboundSessionId;
24+
fn send_query(
25+
&mut self,
26+
query: Vec<u8>,
27+
protocol: StreamProtocol,
28+
peer_id: Option<PeerId>,
29+
) -> OutboundSessionId;
2530

2631
fn dial(&mut self, peer_multiaddr: Multiaddr) -> Result<(), DialError>;
2732

@@ -74,8 +79,13 @@ impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
7479
self.behaviour_mut().sqmr.send_response(response, inbound_session_id)
7580
}
7681

77-
fn send_query(&mut self, query: Vec<u8>, protocol: StreamProtocol) -> OutboundSessionId {
78-
self.behaviour_mut().sqmr.start_query(query, protocol)
82+
fn send_query(
83+
&mut self,
84+
query: Vec<u8>,
85+
protocol: StreamProtocol,
86+
peer_id: Option<PeerId>,
87+
) -> OutboundSessionId {
88+
self.behaviour_mut().sqmr.start_query(query, protocol, peer_id)
7989
}
8090

8191
fn dial(&mut self, peer_multiaddr: Multiaddr) -> Result<(), DialError> {

crates/apollo_network/src/network_manager/test.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,12 @@ impl SwarmTrait for MockSwarm {
136136
Ok(())
137137
}
138138

139-
fn send_query(&mut self, query: Vec<u8>, _protocol: StreamProtocol) -> OutboundSessionId {
139+
fn send_query(
140+
&mut self,
141+
query: Vec<u8>,
142+
_protocol: StreamProtocol,
143+
_peer_id: Option<PeerId>,
144+
) -> OutboundSessionId {
140145
let outbound_session_id = OutboundSessionId { value: self.next_outbound_session_id };
141146
self.create_response_events_for_query_each_num_becomes_response(
142147
query,

crates/apollo_network/src/network_manager/test_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ where
247247
Bytes: From<Response>,
248248
{
249249
fn from(payload: SqmrClientPayload) -> Self {
250-
let SqmrClientPayload { query, report_receiver, responses_sender } = payload;
250+
let SqmrClientPayload { query, report_receiver, responses_sender, peer_id: _ } = payload;
251251
let query = Query::try_from(query);
252252
let responses_sender =
253253
Box::new(responses_sender.with(|response: Response| ready(Ok(Bytes::from(response)))));

crates/apollo_network/src/sqmr/behaviour.rs

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -129,24 +129,55 @@ impl Behaviour {
129129
}
130130
}
131131

132-
/// Assign some peer and start a query. Return the id of the new session.
132+
/// Start a query and return the id of the new session.
133+
///
134+
/// If `peer_id` is `Some`, the query will be sent to that specific peer using any available
135+
/// connection. If `peer_id` is `None`, a peer will be assigned via the peer manager (typically
136+
/// using round-robin selection).
133137
pub fn start_query(
134138
&mut self,
135139
query: Bytes,
136140
protocol_name: StreamProtocol,
141+
peer_id: Option<PeerId>,
137142
) -> OutboundSessionId {
138143
let outbound_session_id = self.next_outbound_session_id;
139144
self.next_outbound_session_id.value += 1;
140145

141-
self.outbound_sessions_pending_peer_assignment
142-
.insert(outbound_session_id, (query, protocol_name));
143-
debug!(
144-
"Network received new outbound query. Requesting peer assignment for {:?}.",
145-
outbound_session_id
146-
);
147-
self.add_event_to_queue(ToSwarm::GenerateEvent(Event::ToOtherBehaviourEvent(
148-
ToOtherBehaviourEvent::RequestPeerAssignment { outbound_session_id },
149-
)));
146+
if let Some(peer_id) = peer_id {
147+
// Peer specified directly - send the query immediately without peer assignment
148+
debug!(
149+
"Network received new outbound query for peer {:?}. Sending directly. {:?}",
150+
peer_id, outbound_session_id
151+
);
152+
153+
// We don't have a specific connection ID, so we use NotifyHandler::Any to send to
154+
// any connection with this peer
155+
self.add_event_to_queue(ToSwarm::NotifyHandler {
156+
peer_id,
157+
handler: NotifyHandler::Any,
158+
event: RequestFromBehaviourEvent::CreateOutboundSession {
159+
query,
160+
outbound_session_id,
161+
protocol_name,
162+
},
163+
});
164+
165+
// Store the mapping without a specific connection_id (using a dummy value)
166+
// The actual connection will be determined by NotifyHandler::Any
167+
self.session_id_to_peer_id_and_connection_id
168+
.insert(outbound_session_id.into(), (peer_id, ConnectionId::new_unchecked(0)));
169+
} else {
170+
// No peer specified - use peer assignment (round-robin)
171+
self.outbound_sessions_pending_peer_assignment
172+
.insert(outbound_session_id, (query, protocol_name));
173+
debug!(
174+
"Network received new outbound query. Requesting peer assignment for {:?}.",
175+
outbound_session_id
176+
);
177+
self.add_event_to_queue(ToSwarm::GenerateEvent(Event::ToOtherBehaviourEvent(
178+
ToOtherBehaviourEvent::RequestPeerAssignment { outbound_session_id },
179+
)));
180+
}
150181

151182
outbound_session_id
152183
}

crates/apollo_network/src/sqmr/behaviour_test.rs

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,14 @@ use futures::{FutureExt, Stream, StreamExt};
77
use lazy_static::lazy_static;
88
use libp2p::core::transport::PortUse;
99
use libp2p::core::{ConnectedPoint, Endpoint};
10-
use libp2p::swarm::{ConnectionClosed, ConnectionId, FromSwarm, NetworkBehaviour, ToSwarm};
10+
use libp2p::swarm::{
11+
ConnectionClosed,
12+
ConnectionId,
13+
FromSwarm,
14+
NetworkBehaviour,
15+
NotifyHandler,
16+
ToSwarm,
17+
};
1118
use libp2p::{Multiaddr, PeerId, StreamProtocol};
1219

1320
use super::super::handler::{RequestFromBehaviourEvent, RequestToBehaviourEvent};
@@ -319,7 +326,7 @@ async fn create_and_process_outbound_session() {
319326

320327
let peer_id = *DUMMY_PEER_ID;
321328

322-
let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone());
329+
let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone(), None);
323330

324331
validate_request_peer_assignment_event(&mut behaviour, outbound_session_id).await;
325332
validate_no_events(&mut behaviour);
@@ -354,7 +361,7 @@ async fn connection_closed() {
354361
let peer_id = *DUMMY_PEER_ID;
355362

356363
// Add an outbound session on the connection.
357-
let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone());
364+
let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone(), None);
358365
// Consume the event to request peer assignment.
359366
behaviour.next().await.unwrap();
360367
simulate_peer_assigned(&mut behaviour, peer_id, outbound_session_id);
@@ -400,7 +407,7 @@ async fn drop_outbound_session() {
400407

401408
let peer_id = *DUMMY_PEER_ID;
402409

403-
let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone());
410+
let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone(), None);
404411
// Consume the event to request peer assignment.
405412
behaviour.next().await.unwrap();
406413
simulate_peer_assigned(&mut behaviour, peer_id, outbound_session_id);
@@ -464,3 +471,40 @@ fn send_response_non_existing_session_fails() {
464471
behaviour.send_response(response, InboundSessionId::default()).unwrap_err();
465472
}
466473
}
474+
475+
#[tokio::test]
476+
async fn create_outbound_session_with_specified_peer() {
477+
let mut behaviour = Behaviour::new(Config::get_test_config());
478+
479+
let peer_id = *DUMMY_PEER_ID;
480+
481+
// Start a query with a specific peer ID
482+
let outbound_session_id =
483+
behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone(), Some(peer_id));
484+
485+
// Should not generate a RequestPeerAssignment event since peer is specified
486+
// Instead, should directly generate a NotifyHandler event
487+
let event = behaviour.next().await.unwrap();
488+
assert_matches!(
489+
event,
490+
ToSwarm::NotifyHandler {
491+
peer_id: event_peer_id,
492+
handler: NotifyHandler::Any,
493+
event: RequestFromBehaviourEvent::CreateOutboundSession {
494+
query: event_query,
495+
outbound_session_id: event_outbound_session_id,
496+
protocol_name,
497+
},
498+
} if event_peer_id == peer_id
499+
&& event_outbound_session_id == outbound_session_id
500+
&& event_query == QUERY.clone()
501+
&& protocol_name == PROTOCOL_NAME.clone()
502+
);
503+
validate_no_events(&mut behaviour);
504+
505+
// Verify the session is tracked
506+
let (tracked_peer_id, _connection_id) = behaviour
507+
.get_peer_id_and_connection_id_from_session_id(outbound_session_id.into())
508+
.unwrap();
509+
assert_eq!(tracked_peer_id, peer_id);
510+
}

crates/apollo_network/src/sqmr/flow_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ fn start_query_and_update_map(
7070
let outbound_session_id = outbound_swarm.behaviour_mut().start_query(
7171
get_bytes_from_query_indices(outbound_peer_id, inbound_peer_id),
7272
PROTOCOL_NAME,
73+
None,
7374
);
7475
outbound_session_id_to_peer_id.insert((outbound_peer_id, outbound_session_id), inbound_peer_id);
7576
}

0 commit comments

Comments
 (0)