Skip to content

Commit 85fb1fb

Browse files
apollo_network: fixed memory hoarding in streams
1 parent 5158058 commit 85fb1fb

File tree

3 files changed

+12
-12
lines changed

3 files changed

+12
-12
lines changed

crates/apollo_network/src/sqmr/behaviour.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ pub struct Behaviour {
113113
next_outbound_session_id: OutboundSessionId,
114114
next_inbound_session_id: Arc<AtomicUsize>,
115115
dropped_sessions: HashSet<SessionId>,
116-
wakers_waiting_for_event: Vec<Waker>,
116+
last_waker_waiting_for_event: Option<Waker>,
117117
outbound_sessions_pending_peer_assignment: HashMap<OutboundSessionId, (Bytes, StreamProtocol)>,
118118
supported_inbound_protocols: HashSet<StreamProtocol>,
119119
}
@@ -127,7 +127,7 @@ impl Behaviour {
127127
next_outbound_session_id: Default::default(),
128128
next_inbound_session_id: Arc::new(Default::default()),
129129
dropped_sessions: Default::default(),
130-
wakers_waiting_for_event: Default::default(),
130+
last_waker_waiting_for_event: None,
131131
outbound_sessions_pending_peer_assignment: Default::default(),
132132
supported_inbound_protocols: Default::default(),
133133
}
@@ -214,7 +214,7 @@ impl Behaviour {
214214

215215
fn add_event_to_queue(&mut self, event: ToSwarm<Event, RequestFromBehaviourEvent>) {
216216
self.pending_events.push_back(event);
217-
for waker in self.wakers_waiting_for_event.drain(..) {
217+
if let Some(waker) = self.last_waker_waiting_for_event.take() {
218218
waker.wake();
219219
}
220220
}
@@ -332,7 +332,7 @@ impl NetworkBehaviour for Behaviour {
332332
if let Some(event) = self.pending_events.pop_front() {
333333
return Poll::Ready(event);
334334
}
335-
self.wakers_waiting_for_event.push(cx.waker().clone());
335+
self.last_waker_waiting_for_event = Some(cx.waker().clone());
336336
Poll::Pending
337337
}
338338
}

crates/apollo_network/src/sqmr/handler/inbound_session.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::Bytes;
1616
pub(super) struct InboundSession {
1717
pending_messages: VecDeque<Bytes>,
1818
current_task: WriteMessageTask,
19-
wakers_waiting_for_new_message: Vec<Waker>,
19+
last_waker_waiting_for_new_message: Option<Waker>,
2020
}
2121

2222
enum FinishReason {
@@ -35,13 +35,13 @@ impl InboundSession {
3535
Self {
3636
pending_messages: Default::default(),
3737
current_task: WriteMessageTask::Waiting(write_stream),
38-
wakers_waiting_for_new_message: Default::default(),
38+
last_waker_waiting_for_new_message: None,
3939
}
4040
}
4141

4242
pub fn add_message_to_queue(&mut self, data: Bytes) {
4343
self.pending_messages.push_back(data);
44-
for waker in self.wakers_waiting_for_new_message.drain(..) {
44+
if let Some(waker) = self.last_waker_waiting_for_new_message.take() {
4545
waker.wake();
4646
}
4747
}
@@ -76,7 +76,7 @@ impl InboundSession {
7676
});
7777
Poll::Ready(())
7878
} else {
79-
self.wakers_waiting_for_new_message.push(cx.waker().clone());
79+
self.last_waker_waiting_for_new_message = Some(cx.waker().clone());
8080
Poll::Pending
8181
}
8282
}

crates/apollo_network/src/utils.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ use libp2p::{Multiaddr, PeerId};
1313
// Stream trait from tokio_stream and not from futures.
1414
pub struct StreamMap<K: Unpin + Clone + Ord, V: Stream + Unpin> {
1515
map: BTreeMap<K, V>,
16-
wakers_waiting_for_new_stream: Vec<Waker>,
16+
last_waker_waiting_for_new_stream: Option<Waker>,
1717
next_index_to_poll: Option<usize>,
1818
}
1919

2020
impl<K: Unpin + Clone + Ord, V: Stream + Unpin> StreamMap<K, V> {
2121
#[allow(dead_code)]
2222
pub fn new(map: BTreeMap<K, V>) -> Self {
23-
Self { map, wakers_waiting_for_new_stream: Default::default(), next_index_to_poll: None }
23+
Self { map, last_waker_waiting_for_new_stream: None, next_index_to_poll: None }
2424
}
2525

2626
#[allow(dead_code)]
@@ -40,7 +40,7 @@ impl<K: Unpin + Clone + Ord, V: Stream + Unpin> StreamMap<K, V> {
4040

4141
pub fn insert(&mut self, key: K, value: V) -> Option<V> {
4242
let res = self.map.insert(key, value);
43-
for waker in self.wakers_waiting_for_new_stream.drain(..) {
43+
if let Some(waker) = self.last_waker_waiting_for_new_stream.take() {
4444
waker.wake();
4545
}
4646
res
@@ -89,7 +89,7 @@ impl<K: Unpin + Clone + Ord, V: Stream + Unpin> Stream for StreamMap<K, V> {
8989
unpinned_self.map.remove(&finished_stream_key);
9090
return Poll::Ready(Some((finished_stream_key, None)));
9191
}
92-
unpinned_self.wakers_waiting_for_new_stream.push(cx.waker().clone());
92+
unpinned_self.last_waker_waiting_for_new_stream = Some(cx.waker().clone());
9393
Poll::Pending
9494
}
9595
}

0 commit comments

Comments
 (0)