Skip to content

Commit 02cb2f3

Browse files
apollo_network: two buffer sizes for mpsc channels in register topic
1 parent 8f59fad commit 02cb2f3

File tree

7 files changed

+15
-11
lines changed

7 files changed

+15
-11
lines changed

crates/apollo_consensus_manager/src/consensus_manager.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,15 @@ impl ConsensusManager {
106106
.register_broadcast_topic::<StreamMessage<ProposalPart, HeightAndRound>>(
107107
Topic::new(self.config.proposals_topic.clone()),
108108
self.config.broadcast_buffer_size,
109+
self.config.broadcast_buffer_size,
109110
)
110111
.expect("Failed to register broadcast topic");
111112

112113
let votes_broadcast_channels = network_manager
113114
.register_broadcast_topic::<Vote>(
114115
Topic::new(self.config.votes_topic.clone()),
115116
self.config.broadcast_buffer_size,
117+
self.config.broadcast_buffer_size,
116118
)
117119
.expect("Failed to register broadcast topic");
118120

crates/apollo_mempool_p2p/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ pub fn create_p2p_propagator_and_runner(
6262
.register_broadcast_topic(
6363
Topic::new(MEMPOOL_TOPIC),
6464
mempool_p2p_config.network_buffer_size,
65+
mempool_p2p_config.network_buffer_size,
6566
)
6667
.expect("Failed to register broadcast topic");
6768
let network_future = network_manager.run().instrument(info_span!("[Mempool network]"));

crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ async fn main() {
200200
info!("My PeerId: {peer_id}");
201201

202202
let network_channels = network_manager
203-
.register_broadcast_topic::<StressTestMessage>(TOPIC.clone(), args.buffer_size)
203+
.register_broadcast_topic::<StressTestMessage>(TOPIC.clone(), args.buffer_size, args.buffer_size)
204204
.unwrap();
205205
let BroadcastTopicChannels { broadcasted_messages_receiver, broadcast_topic_client } =
206206
network_channels;

crates/apollo_network/src/e2e_broadcast_test.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,14 @@ async fn broadcast_subscriber_end_to_end_test() {
9696
create_network_manager(create_swarm(Some(bootstrap_peer_multiaddr)).await);
9797

9898
let mut subscriber_channels1_1 =
99-
network_manager1.register_broadcast_topic::<Number>(topic1.clone(), BUFFER_SIZE).unwrap();
99+
network_manager1.register_broadcast_topic::<Number>(topic1.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap();
100100
let mut subscriber_channels1_2 =
101-
network_manager1.register_broadcast_topic::<Number>(topic2.clone(), BUFFER_SIZE).unwrap();
101+
network_manager1.register_broadcast_topic::<Number>(topic2.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap();
102102

103103
let subscriber_channels2_1 =
104-
network_manager2.register_broadcast_topic::<Number>(topic1.clone(), BUFFER_SIZE).unwrap();
104+
network_manager2.register_broadcast_topic::<Number>(topic1.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap();
105105
let subscriber_channels2_2 =
106-
network_manager2.register_broadcast_topic::<Number>(topic2.clone(), BUFFER_SIZE).unwrap();
106+
network_manager2.register_broadcast_topic::<Number>(topic2.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap();
107107

108108
tokio::select! {
109109
_ = network_manager1.run() => panic!("network manager ended"),

crates/apollo_network/src/network_manager/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,8 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
204204
pub fn register_broadcast_topic<T>(
205205
&mut self,
206206
topic: Topic,
207-
buffer_size: usize,
207+
sending_buffer_size: usize,
208+
receiving_buffer_size: usize,
208209
) -> Result<BroadcastTopicChannels<T>, SubscriptionError>
209210
where
210211
T: TryFrom<Bytes> + 'static,
@@ -215,9 +216,9 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
215216
let topic_hash = topic.hash();
216217

217218
let (messages_to_broadcast_sender, messages_to_broadcast_receiver) =
218-
futures::channel::mpsc::channel(buffer_size);
219+
futures::channel::mpsc::channel(sending_buffer_size);
219220
let (broadcasted_messages_sender, broadcasted_messages_receiver) =
220-
futures::channel::mpsc::channel(buffer_size);
221+
futures::channel::mpsc::channel(receiving_buffer_size);
221222

222223
let insert_result = self
223224
.messages_to_broadcast_receivers

crates/apollo_network/src/network_manager/test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ async fn broadcast_message() {
338338
);
339339

340340
let mut broadcast_topic_client = network_manager
341-
.register_broadcast_topic(topic.clone(), BUFFER_SIZE)
341+
.register_broadcast_topic(topic.clone(), BUFFER_SIZE, BUFFER_SIZE)
342342
.unwrap()
343343
.broadcast_topic_client;
344344
broadcast_topic_client.broadcast_message(message.clone()).await.unwrap();
@@ -383,7 +383,7 @@ async fn receive_broadcasted_message_and_report_it() {
383383
mut broadcast_topic_client,
384384
mut broadcasted_messages_receiver,
385385
..
386-
} = network_manager.register_broadcast_topic::<Bytes>(topic.clone(), BUFFER_SIZE).unwrap();
386+
} = network_manager.register_broadcast_topic::<Bytes>(topic.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap();
387387

388388
tokio::select! {
389389
_ = network_manager.run() => panic!("network manager ended"),

crates/apollo_network/src/network_manager/test_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ where
202202

203203
let mut network_manager = NetworkManager::new(network_config, None, None);
204204
let broadcast_channels =
205-
network_manager.register_broadcast_topic(topic.clone(), BUFFER_SIZE).unwrap();
205+
network_manager.register_broadcast_topic(topic.clone(), BUFFER_SIZE, BUFFER_SIZE).unwrap();
206206

207207
tokio::task::spawn(async move {
208208
let result = network_manager.run().await;

0 commit comments

Comments
 (0)