From 5017a3078358eceff9fb30c131dc09d49ca66f7b Mon Sep 17 00:00:00 2001 From: Marc Nijdam Date: Thu, 31 Aug 2023 16:34:17 -0700 Subject: [PATCH] Use an lru cache to drop duplicate beacons (#453) * Use an lru cache to drop duplicate beacons - default --- lorawan/src/lib.rs | 1 + src/beaconer.rs | 86 ++++++++++++++++++++++++++++++++++---------- src/message_cache.rs | 86 ++++++++++++++++++++++++++++++++++++-------- src/packet.rs | 8 ++--- 4 files changed, 142 insertions(+), 39 deletions(-) diff --git a/lorawan/src/lib.rs b/lorawan/src/lib.rs index f5d6063a..08ee9fba 100644 --- a/lorawan/src/lib.rs +++ b/lorawan/src/lib.rs @@ -3,6 +3,7 @@ use bytes::{Buf, BufMut, Bytes}; use std::{convert::From, fmt, mem::size_of, result}; pub mod error; +pub use bytes; pub use error::LoraWanError; #[derive(Debug, Clone, Copy, PartialEq, Eq)] diff --git a/src/beaconer.rs b/src/beaconer.rs index 8250e69b..30b82edb 100644 --- a/src/beaconer.rs +++ b/src/beaconer.rs @@ -1,7 +1,9 @@ //! This module provides proof-of-coverage (PoC) beaconing support. use crate::{ + error::DecodeError, gateway::{self, BeaconResp}, + message_cache::MessageCache, region_watcher, service::{entropy::EntropyService, poc::PocIotService}, settings::Settings, @@ -55,8 +57,8 @@ pub struct Beaconer { interval: Duration, // Time next beacon attempt is o be made next_beacon_time: Instant, - /// The last beacon that was transitted - last_beacon: Option, + /// Last seen beacons + last_seen: MessageCache>, /// Use for channel plan and FR parameters region_params: RegionParams, poc_ingest_uri: Uri, @@ -83,7 +85,7 @@ impl Beaconer { messages, region_watch, interval, - last_beacon: None, + last_seen: MessageCache::new(15), // Set a beacon at least an interval out... arrival of region_params // will recalculate this time and no arrival of region_params will // cause the beacon to not occur @@ -157,7 +159,11 @@ impl Beaconer { /// /// See [`gateway::MessageSender::transmit_beacon`] pub async fn send_beacon(&self, beacon: beacon::Beacon) -> Result { - let beacon_id = beacon.beacon_id(); + let beacon_id = beacon + .beacon_data() + .map(|data| data.to_b64()) + .ok_or_else(DecodeError::not_beacon)?; + info!(beacon_id, "transmitting beacon"); let (powe, tmst) = self @@ -205,8 +211,10 @@ impl Beaconer { async fn mk_witness_report( &self, packet: PacketUp, + payload: Vec, ) -> Result { let mut report = poc_lora::LoraWitnessReportReqV1::try_from(packet)?; + report.data = payload; report.pub_key = self.keypair.public_key().to_vec(); report.signature = sign(self.keypair.clone(), report.encode_to_vec()).await?; Ok(report) @@ -230,28 +238,39 @@ impl Beaconer { .await; self.next_beacon_time = next_beacon_time; - self.last_beacon = last_beacon; + + if let Some(data) = last_beacon.beacon_data() { + self.last_seen.tag_now(data); + } } async fn handle_received_beacon(&mut self, packet: PacketUp) { + // Check if poc reporting is disabled if self.disabled { return; } - if let Some(last_beacon) = &self.last_beacon { - if packet.payload() == last_beacon.data { - info!("ignoring last self beacon witness"); - return; - } + + // Check that there is beacon data present + let Some(beacon_data) = packet.beacon_data() else { + warn!("ignoring invalid received beacon"); + return; + }; + + let beacon_id = beacon_data.to_b64(); + + // Check if we've seen this beacon before + if self.last_seen.tag_now(beacon_data.clone()) { + info!(%beacon_id, "ignoring duplicate or self beacon witness"); + return; } // Construct concurrent futures for connecting to the poc ingester and // signing the report - let report_fut = self.mk_witness_report(packet); + let report_fut = self.mk_witness_report(packet, beacon_data); let service_fut = PocIotService::connect(self.poc_ingest_uri.clone()); match tokio::try_join!(report_fut, service_fut) { Ok((report, mut poc_service)) => { - let beacon_id = report.data.to_b64(); let _ = poc_service .submit_witness(report) .inspect_err(|err| warn!(beacon_id, %err, "submit poc witness report")) @@ -279,12 +298,41 @@ impl Beaconer { } } -#[test] -fn test_beacon_roundtrip() { - use lorawan::PHYPayload; +trait BeaconData { + fn beacon_data(&self) -> Option>; +} - let phy_payload_a = PHYPayload::proprietary(b"poc_beacon_data"); - let payload: Vec = phy_payload_a.clone().try_into().expect("beacon packet"); - let phy_payload_b = PHYPayload::read(lorawan::Direction::Uplink, &mut &payload[..]).unwrap(); - assert_eq!(phy_payload_a, phy_payload_b); +impl BeaconData for PacketUp { + fn beacon_data(&self) -> Option> { + match PacketUp::parse_frame(lorawan::Direction::Uplink, self.payload()) { + Ok(lorawan::PHYPayloadFrame::Proprietary(payload)) => Some(payload.into()), + _ => None, + } + } +} + +impl BeaconData for beacon::Beacon { + fn beacon_data(&self) -> Option> { + Some(self.data.clone()) + } +} + +impl BeaconData for Option { + fn beacon_data(&self) -> Option> { + self.as_ref().and_then(|beacon| beacon.beacon_data()) + } +} + +#[cfg(test)] +mod test { + #[test] + fn test_beacon_roundtrip() { + use lorawan::PHYPayload; + + let phy_payload_a = PHYPayload::proprietary(b"poc_beacon_data"); + let payload: Vec = phy_payload_a.clone().try_into().expect("beacon packet"); + let phy_payload_b = + PHYPayload::read(lorawan::Direction::Uplink, &mut &payload[..]).unwrap(); + assert_eq!(phy_payload_a, phy_payload_b); + } } diff --git a/src/message_cache.rs b/src/message_cache.rs index 0e875027..075bae3f 100644 --- a/src/message_cache.rs +++ b/src/message_cache.rs @@ -4,18 +4,19 @@ use std::{ time::{Duration, Instant}, }; -pub struct MessageCache { - waiting: VecDeque>, +#[derive(Debug)] +pub struct MessageCache { + cache: VecDeque>, max_messages: u16, } #[derive(Debug, Clone)] -pub struct CacheMessage { +pub struct CacheMessage { received: Instant, message: T, } -impl CacheMessage { +impl CacheMessage { pub fn new(message: T, received: Instant) -> Self { Self { message, received } } @@ -25,7 +26,7 @@ impl CacheMessage { } } -impl Deref for CacheMessage { +impl Deref for CacheMessage { type Target = T; fn deref(&self) -> &Self::Target { @@ -33,11 +34,11 @@ impl Deref for CacheMessage { } } -impl MessageCache { +impl MessageCache { pub fn new(max_messages: u16) -> Self { let waiting = VecDeque::new(); Self { - waiting, + cache: waiting, max_messages, } } @@ -48,13 +49,36 @@ impl MessageCache { /// /// Pushing a packet onto the back of a full cache will cause the oldest /// (first) message in the cache to be dropped. - pub fn push_back(&mut self, message: T, received: Instant) { - self.waiting.push_back(CacheMessage::new(message, received)); + pub fn push_back(&mut self, message: T, received: Instant) -> Option> { + self.cache.push_back(CacheMessage::new(message, received)); if self.len() > self.max_messages as usize { - self.waiting.pop_front(); + self.cache.pop_front() + } else { + None } } + /// Returns the index of the first matching message in the cache or None if + /// not present + pub fn index_of(&self, message: &T) -> Option { + self.cache.iter().position(|m| m.message == *message) + } + + /// Promotes the given message to the back of the queue, effectively + /// recreating an LRU cache. Returns true if a cache hit was found + pub fn tag(&mut self, message: T, received: Instant) -> bool { + let result = self + .index_of(&message) + .and_then(|index| self.cache.remove(index)) + .is_some(); + self.push_back(message, received); + result + } + + pub fn tag_now(&mut self, message: T) -> bool { + self.tag(message, Instant::now()) + } + /// Pushes a CacheMessage back on the front of the queue. This is useful to /// push a packet back at the front after a failed delivery attempt. /// @@ -64,13 +88,13 @@ impl MessageCache { if self.len() > self.max_messages as usize { return; } - self.waiting.push_front(cache_message); + self.cache.push_front(cache_message); } pub fn pop_front(&mut self, duration: Duration) -> (usize, Option>) { let mut dropped = 0; let mut front = None; - while let Some(msg) = self.waiting.pop_front() { + while let Some(msg) = self.cache.pop_front() { if msg.hold_time() <= duration { front = Some(msg); break; @@ -81,11 +105,45 @@ impl MessageCache { (dropped, front) } + /// Returns a reference to the first (and oldest/first to be removed) + /// message in the cache + pub fn peek_front(&self) -> Option<&CacheMessage> { + self.cache.front() + } + pub fn len(&self) -> usize { - self.waiting.len() + self.cache.len() } pub fn is_empty(&self) -> bool { - self.waiting.is_empty() + self.cache.is_empty() + } +} + +#[cfg(test)] +mod test { + use super::MessageCache; + + #[test] + fn test_cache_tagging() { + let mut cache = MessageCache::>::new(2); + + // First should trigger a "not in cache" + assert!(!cache.tag_now(vec![1])); + // Second should trigger a "not in cache" and make the first least + // recently used + assert!(!cache.tag_now(vec![2])); + // Second tag should promote the old entry but remove none + assert!(cache.tag_now(vec![1])); + assert_eq!( + cache.peek_front().map(|entry| entry.message.as_ref()), + Some([2u8].as_ref()) + ); + + // Third tag should evict the least recently used entry (2) + assert!(!cache.tag_now(vec![3])); + assert_eq!(Some(0), cache.index_of(&vec![1u8])); + assert_eq!(Some(1), cache.index_of(&vec![3u8])); + assert!(cache.index_of(&vec![2u8]).is_none()); } } diff --git a/src/packet.rs b/src/packet.rs index f2dd3b50..a6ddf776 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -17,7 +17,7 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct PacketUp(PacketRouterPacketUpV1); #[derive(Debug, Clone)] @@ -65,12 +65,8 @@ impl fmt::Display for PacketUp { impl TryFrom for poc_lora::LoraWitnessReportReqV1 { type Error = Error; fn try_from(value: PacketUp) -> Result { - let payload = match PacketUp::parse_frame(Direction::Uplink, value.payload()) { - Ok(PHYPayloadFrame::Proprietary(payload)) => payload, - _ => return Err(DecodeError::not_beacon()), - }; let report = poc_lora::LoraWitnessReportReqV1 { - data: payload.to_vec(), + data: vec![], tmst: value.0.timestamp as u32, timestamp: SystemTime::now() .duration_since(UNIX_EPOCH)