diff --git a/src/event_scanner/scanner/common.rs b/src/event_scanner/scanner/common.rs index b30cbc8..d3dc3a0 100644 --- a/src/event_scanner/scanner/common.rs +++ b/src/event_scanner/scanner/common.rs @@ -1,7 +1,7 @@ use std::ops::RangeInclusive; use crate::{ - ScannerMessage, + Notification, ScannerMessage, block_range_scanner::{BlockScannerResult, MAX_BUFFERED_MESSAGES}, event_scanner::{EventScannerResult, filter::EventFilter, listener::EventListener}, robust_provider::{Error as RobustProviderError, RobustProvider}, @@ -120,12 +120,16 @@ pub fn spawn_log_consumers( } if let ConsumerMode::CollectLatest { .. } = mode { - if !collected.is_empty() { + if collected.is_empty() { + info!("No logs found"); + _ = sender.try_stream(Notification::NoPastLogsFound).await; + } else { + info!(count = collected.len(), "Logs found"); collected.reverse(); // restore chronological order - } - info!("Sending collected logs to consumer"); - _ = sender.try_stream(collected).await; + info!("Sending collected logs to consumer"); + _ = sender.try_stream(collected).await; + } } }); diff --git a/src/event_scanner/scanner/historic.rs b/src/event_scanner/scanner/historic.rs index 4d35a8e..95db8b1 100644 --- a/src/event_scanner/scanner/historic.rs +++ b/src/event_scanner/scanner/historic.rs @@ -51,6 +51,10 @@ impl EventScannerBuilder { } }; + if from_num > latest_block { + Err(ScannerError::BlockExceedsLatest("from_block", from_num, latest_block))?; + } + let to_num = match scanner.config.to_block { BlockId::Number(to_block) => to_block.as_number().unwrap_or(0), BlockId::Hash(to_hash) => { @@ -58,10 +62,6 @@ impl EventScannerBuilder { } }; - if from_num > latest_block { - Err(ScannerError::BlockExceedsLatest("from_block", from_num, latest_block))?; - } - if to_num > latest_block { Err(ScannerError::BlockExceedsLatest("to_block", to_num, latest_block))?; } diff --git a/src/event_scanner/scanner/latest.rs b/src/event_scanner/scanner/latest.rs index 79994c0..b32e7f6 100644 --- a/src/event_scanner/scanner/latest.rs +++ b/src/event_scanner/scanner/latest.rs @@ -1,4 +1,8 @@ -use alloy::{eips::BlockId, network::Network}; +use alloy::{ + consensus::BlockHeader, + eips::BlockId, + network::{BlockResponse, Network}, +}; use super::common::{ConsumerMode, handle_stream}; use crate::{ @@ -45,13 +49,28 @@ impl EventScannerBuilder { let scanner = self.build(provider).await?; let provider = scanner.block_range_scanner.provider(); + let latest_block = provider.get_block_number().await?; + + let from_num = match scanner.config.from_block { + BlockId::Number(from_block) => from_block.as_number().unwrap_or(0), + BlockId::Hash(from_hash) => { + provider.get_block_by_hash(from_hash.into()).await?.header().number() + } + }; - if let BlockId::Hash(from_hash) = scanner.config.from_block { - provider.get_block_by_hash(from_hash.into()).await?; + if from_num > latest_block { + Err(ScannerError::BlockExceedsLatest("from_block", from_num, latest_block))?; } - if let BlockId::Hash(to_hash) = scanner.config.to_block { - provider.get_block_by_hash(to_hash.into()).await?; + let to_num = match scanner.config.to_block { + BlockId::Number(to_block) => to_block.as_number().unwrap_or(0), + BlockId::Hash(to_hash) => { + provider.get_block_by_hash(to_hash.into()).await?.header().number() + } + }; + + if to_num > latest_block { + Err(ScannerError::BlockExceedsLatest("to_block", to_num, latest_block))?; } Ok(scanner) @@ -281,4 +300,70 @@ mod tests { assert!(result.is_ok()); } + + #[tokio::test] + async fn test_from_block_above_latest_returns_error() { + let anvil = Anvil::new().try_spawn().unwrap(); + let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); + + let latest_block = provider.get_block_number().await.unwrap(); + + let result = EventScannerBuilder::latest(1) + .from_block(latest_block + 100) + .to_block(latest_block) + .connect(provider) + .await; + + match result { + Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => { + assert_eq!(max, latest_block + 100); + assert_eq!(latest, latest_block); + } + _ => panic!("Expected BlockExceedsLatest error"), + } + } + + #[tokio::test] + async fn test_to_block_above_latest_returns_error() { + let anvil = Anvil::new().try_spawn().unwrap(); + let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); + + let latest_block = provider.get_block_number().await.unwrap(); + + let result = EventScannerBuilder::latest(1) + .from_block(0) + .to_block(latest_block + 100) + .connect(provider) + .await; + + match result { + Err(ScannerError::BlockExceedsLatest("to_block", max, latest)) => { + assert_eq!(max, latest_block + 100); + assert_eq!(latest, latest_block); + } + _ => panic!("Expected BlockExceedsLatest error"), + } + } + + #[tokio::test] + async fn test_to_and_from_block_above_latest_returns_error() { + let anvil = Anvil::new().try_spawn().unwrap(); + let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); + + let latest_block = provider.get_block_number().await.unwrap(); + + let result = EventScannerBuilder::latest(1) + .from_block(latest_block + 50) + .to_block(latest_block + 100) + .connect(provider) + .await; + + match result { + Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => { + assert_eq!(max, latest_block + 50); + assert_eq!(latest, latest_block); + } + _ => panic!("Expected BlockExceedsLatest error for 'from_block'"), + } + } } diff --git a/src/event_scanner/scanner/mod.rs b/src/event_scanner/scanner/mod.rs index 8b91bc5..a97e011 100644 --- a/src/event_scanner/scanner/mod.rs +++ b/src/event_scanner/scanner/mod.rs @@ -299,6 +299,13 @@ impl EventScannerBuilder { /// - **Default range**: By default, scans from `Earliest` to `Latest` block /// - **Reorg handling**: Periodically checks the tip to detect reorgs during the scan /// + /// # Notifications + /// + /// The scanner emits the following notification before delivering log data: + /// + /// - **[`Notification::NoPastLogsFound`][no_logs]**: Emitted when no matching logs are found in + /// the scanned range. + /// /// # Arguments /// /// * `count` - Maximum number of recent events to collect per listener (must be greater than 0) @@ -323,6 +330,7 @@ impl EventScannerBuilder { /// [start]: EventScanner::start /// [sync_from_latest]: EventScannerBuilder::from_latest /// [reorg]: crate::Notification::ReorgDetected + /// [no_logs]: crate::Notification::NoPastLogsFound #[must_use] pub fn latest(count: usize) -> EventScannerBuilder { EventScannerBuilder::::new(count) diff --git a/src/event_scanner/scanner/sync/mod.rs b/src/event_scanner/scanner/sync/mod.rs index 45ffb8f..dc2b407 100644 --- a/src/event_scanner/scanner/sync/mod.rs +++ b/src/event_scanner/scanner/sync/mod.rs @@ -79,6 +79,17 @@ impl EventScannerBuilder { /// block confirmations /// - **Continuous operation**: Live phase continues indefinitely until the scanner is dropped /// + /// # Notifications + /// + /// During the **latest events phase**, the scanner can emit the following notification + /// before transitioning to live mode: + /// + /// - **[`Notification::NoPastLogsFound`][no_logs]**: Emitted when no matching logs are found in + /// the scanned range + /// + /// After the latest events phase completes, [`Notification::SwitchingToLive`][switch_to_live] + /// is emitted before transitioning to the live streaming phase. + /// /// # Arguments /// /// * `count` - Maximum number of recent events to collect per listener before switching to live @@ -102,6 +113,7 @@ impl EventScannerBuilder { /// [start]: crate::event_scanner::EventScanner::start /// [reorg]: crate::types::Notification::ReorgDetected /// [switch_to_live]: crate::types::Notification::SwitchingToLive + /// [no_logs]: crate::types::Notification::NoPastLogsFound #[must_use] pub fn from_latest(self, count: usize) -> EventScannerBuilder { EventScannerBuilder::::new(count) diff --git a/src/types.rs b/src/types.rs index c21f037..aeec5f8 100644 --- a/src/types.rs +++ b/src/types.rs @@ -5,16 +5,32 @@ use tracing::{info, warn}; use crate::ScannerError; -#[derive(Debug, Clone)] +/// Messages streamed by the scanner to subscribers. +/// +/// Each message represents either data (logs), an error, or a notification about the scanner's +/// state or behavior. +#[derive(Copy, Debug, Clone)] pub enum ScannerMessage { + /// Data streamed to the subscriber. Data(T), + + /// Notification about scanner state changes or important events. Notification(Notification), } +/// Notifications emitted by the scanner to signal state changes or important events. #[derive(Copy, Debug, Clone, PartialEq)] pub enum Notification { + /// Emitted when transitioning from the latest events phase to live streaming mode + /// in sync scanners. SwitchingToLive, + + /// Emitted when a blockchain reorganization is detected during scanning. ReorgDetected, + + /// Emitted during the latest events phase when no matching logs are found in the + /// scanned range. + NoPastLogsFound, } impl From for ScannerMessage { diff --git a/tests/latest_events/basic.rs b/tests/latest_events/basic.rs index 20e4446..a09b158 100644 --- a/tests/latest_events/basic.rs +++ b/tests/latest_events/basic.rs @@ -3,12 +3,11 @@ use alloy::{ }; use crate::common::{TestCounter, deploy_counter, setup_common, setup_latest_scanner}; -use event_scanner::{EventFilter, EventScannerBuilder, assert_closed, assert_next}; +use event_scanner::{EventFilter, EventScannerBuilder, Notification, assert_closed, assert_next}; #[tokio::test] async fn exact_count_returns_last_events_in_order() -> anyhow::Result<()> { - let count = 5; - let setup = setup_latest_scanner(None, None, count, None, None).await?; + let setup = setup_latest_scanner(None, None, 5, None, None).await?; let contract = setup.contract; let scanner = setup.scanner; let mut stream = setup.stream; @@ -17,7 +16,6 @@ async fn exact_count_returns_last_events_in_order() -> anyhow::Result<()> { contract.increase().send().await?.watch().await?; } - // Ask for the latest 5 scanner.start().await?; assert_next!( @@ -72,9 +70,7 @@ async fn no_events_returns_empty() -> anyhow::Result<()> { scanner.start().await?; - let expected: &[TestCounter::CountIncreased] = &[]; - - assert_next!(stream, expected); + assert_next!(stream, Notification::NoPastLogsFound); assert_closed!(stream); Ok(()) diff --git a/tests/sync/from_latest.rs b/tests/sync/from_latest.rs index 622535d..a465002 100644 --- a/tests/sync/from_latest.rs +++ b/tests/sync/from_latest.rs @@ -132,8 +132,7 @@ async fn no_historical_only_live_streams() -> anyhow::Result<()> { scanner.start().await?; // Latest is empty - let expected: &[TestCounter::CountIncreased] = &[]; - assert_next!(stream, expected); + assert_next!(stream, Notification::NoPastLogsFound); assert_next!(stream, Notification::SwitchingToLive); let mut stream = assert_empty!(stream);