|
1 | 1 | //! Delays and deduplicates [`Stream`](futures::stream::Stream) items
|
2 | 2 |
|
3 | 3 | use futures::{stream::Fuse, Stream, StreamExt};
|
4 |
| -use hashbrown::{hash_map::Entry, HashMap}; |
| 4 | +use hashbrown::{hash_map::RawEntryMut, HashMap}; |
5 | 5 | use pin_project::pin_project;
|
6 | 6 | use std::{
|
7 | 7 | collections::HashSet,
|
@@ -78,24 +78,24 @@ impl<T: Hash + Eq + Clone, R> SchedulerProj<'_, T, R> {
|
78 | 78 | .run_at
|
79 | 79 | .checked_add(*self.debounce)
|
80 | 80 | .unwrap_or_else(far_future);
|
81 |
| - match self.scheduled.entry(request.message) { |
| 81 | + match self.scheduled.raw_entry_mut().from_key(&request.message) { |
82 | 82 | // If new request is supposed to be earlier than the current entry's scheduled
|
83 | 83 | // time (for eg: the new request is user triggered and the current entry is the
|
84 | 84 | // reconciler's usual retry), then give priority to the new request.
|
85 |
| - Entry::Occupied(mut old_entry) if old_entry.get().run_at >= request.run_at => { |
| 85 | + RawEntryMut::Occupied(mut old_entry) if old_entry.get().run_at >= request.run_at => { |
86 | 86 | // Old entry will run after the new request, so replace it..
|
87 | 87 | let entry = old_entry.get_mut();
|
88 | 88 | self.queue.reset_at(&entry.queue_key, next_time);
|
89 | 89 | entry.run_at = next_time;
|
90 |
| - old_entry.replace_key(); |
| 90 | + old_entry.insert_key(request.message); |
91 | 91 | }
|
92 |
| - Entry::Occupied(_old_entry) => { |
| 92 | + RawEntryMut::Occupied(_old_entry) => { |
93 | 93 | // Old entry will run before the new request, so ignore the new request..
|
94 | 94 | }
|
95 |
| - Entry::Vacant(entry) => { |
| 95 | + RawEntryMut::Vacant(entry) => { |
96 | 96 | // No old entry, we're free to go!
|
97 |
| - let message = entry.key().clone(); |
98 |
| - entry.insert(ScheduledEntry { |
| 97 | + let message = request.message.clone(); |
| 98 | + entry.insert(request.message, ScheduledEntry { |
99 | 99 | run_at: next_time,
|
100 | 100 | queue_key: self.queue.insert_at(message, next_time),
|
101 | 101 | });
|
|
0 commit comments