Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/event_scanner/scanner/common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::ops::RangeInclusive;

use crate::{
ScannerMessage,
Notification, ScannerMessage,
block_range_scanner::{BlockScannerResult, MAX_BUFFERED_MESSAGES},
event_scanner::{EventScannerResult, filter::EventFilter, listener::EventListener},
robust_provider::{Error as RobustProviderError, RobustProvider},
Expand Down Expand Up @@ -120,12 +120,16 @@ pub fn spawn_log_consumers<N: Network>(
}

if let ConsumerMode::CollectLatest { .. } = mode {
if !collected.is_empty() {
if collected.is_empty() {
info!("No logs found");
_ = sender.try_stream(Notification::NoPastLogsFound).await;
} else {
info!(count = collected.len(), "Logs found");
collected.reverse(); // restore chronological order
}

info!("Sending collected logs to consumer");
_ = sender.try_stream(collected).await;
info!("Sending collected logs to consumer");
_ = sender.try_stream(collected).await;
}
Comment on lines +123 to +132
Copy link
Collaborator

@LeoPatOZ LeoPatOZ Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
if collected.is_empty() {
info!("No logs found");
_ = sender.try_stream(Notification::NoPastLogsFound).await;
} else {
info!(count = collected.len(), "Logs found");
collected.reverse(); // restore chronological order
}
info!("Sending collected logs to consumer");
_ = sender.try_stream(collected).await;
info!("Sending collected logs to consumer");
_ = sender.try_stream(collected).await;
}
if collected.is_empty() {
info!("No logs found");
_ = sender.try_stream(Notification::NoPastLogsFound).await;
break;
}
info!(count = collected.len(), "Logs found");
collected.reverse(); // restore chronological order
info!("Sending collected logs to consumer");
_ = sender.try_stream(collected).await;
}

}
});

Expand Down
8 changes: 4 additions & 4 deletions src/event_scanner/scanner/historic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ impl EventScannerBuilder<Historic> {
}
};

if from_num > latest_block {
Err(ScannerError::BlockExceedsLatest("from_block", from_num, latest_block))?;
}

let to_num = match scanner.config.to_block {
BlockId::Number(to_block) => to_block.as_number().unwrap_or(0),
BlockId::Hash(to_hash) => {
provider.get_block_by_hash(to_hash.into()).await?.header().number()
}
};

if from_num > latest_block {
Err(ScannerError::BlockExceedsLatest("from_block", from_num, latest_block))?;
}

if to_num > latest_block {
Err(ScannerError::BlockExceedsLatest("to_block", to_num, latest_block))?;
}
Expand Down
95 changes: 90 additions & 5 deletions src/event_scanner/scanner/latest.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use alloy::{eips::BlockId, network::Network};
use alloy::{
consensus::BlockHeader,
eips::BlockId,
network::{BlockResponse, Network},
};

use super::common::{ConsumerMode, handle_stream};
use crate::{
Expand Down Expand Up @@ -45,13 +49,28 @@ impl EventScannerBuilder<LatestEvents> {
let scanner = self.build(provider).await?;

let provider = scanner.block_range_scanner.provider();
let latest_block = provider.get_block_number().await?;

let from_num = match scanner.config.from_block {
BlockId::Number(from_block) => from_block.as_number().unwrap_or(0),
BlockId::Hash(from_hash) => {
provider.get_block_by_hash(from_hash.into()).await?.header().number()
}
};

if let BlockId::Hash(from_hash) = scanner.config.from_block {
provider.get_block_by_hash(from_hash.into()).await?;
if from_num > latest_block {
Err(ScannerError::BlockExceedsLatest("from_block", from_num, latest_block))?;
}

if let BlockId::Hash(to_hash) = scanner.config.to_block {
provider.get_block_by_hash(to_hash.into()).await?;
let to_num = match scanner.config.to_block {
BlockId::Number(to_block) => to_block.as_number().unwrap_or(0),
BlockId::Hash(to_hash) => {
provider.get_block_by_hash(to_hash.into()).await?.header().number()
}
};

if to_num > latest_block {
Err(ScannerError::BlockExceedsLatest("to_block", to_num, latest_block))?;
}

Ok(scanner)
Expand Down Expand Up @@ -281,4 +300,70 @@ mod tests {

assert!(result.is_ok());
}

#[tokio::test]
async fn test_from_block_above_latest_returns_error() {
let anvil = Anvil::new().try_spawn().unwrap();
let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());

let latest_block = provider.get_block_number().await.unwrap();

let result = EventScannerBuilder::latest(1)
.from_block(latest_block + 100)
.to_block(latest_block)
.connect(provider)
.await;

match result {
Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => {
assert_eq!(max, latest_block + 100);
assert_eq!(latest, latest_block);
}
_ => panic!("Expected BlockExceedsLatest error"),
}
}

#[tokio::test]
async fn test_to_block_above_latest_returns_error() {
let anvil = Anvil::new().try_spawn().unwrap();
let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());

let latest_block = provider.get_block_number().await.unwrap();

let result = EventScannerBuilder::latest(1)
.from_block(0)
.to_block(latest_block + 100)
.connect(provider)
.await;

match result {
Err(ScannerError::BlockExceedsLatest("to_block", max, latest)) => {
assert_eq!(max, latest_block + 100);
assert_eq!(latest, latest_block);
}
_ => panic!("Expected BlockExceedsLatest error"),
}
}

#[tokio::test]
async fn test_to_and_from_block_above_latest_returns_error() {
let anvil = Anvil::new().try_spawn().unwrap();
let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());

let latest_block = provider.get_block_number().await.unwrap();

let result = EventScannerBuilder::latest(1)
.from_block(latest_block + 50)
.to_block(latest_block + 100)
.connect(provider)
.await;

match result {
Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => {
assert_eq!(max, latest_block + 50);
assert_eq!(latest, latest_block);
}
_ => panic!("Expected BlockExceedsLatest error for 'from_block'"),
}
}
}
8 changes: 8 additions & 0 deletions src/event_scanner/scanner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,13 @@ impl EventScannerBuilder<Unspecified> {
/// - **Default range**: By default, scans from `Earliest` to `Latest` block
/// - **Reorg handling**: Periodically checks the tip to detect reorgs during the scan
///
/// # Notifications
///
/// The scanner emits the following notification before delivering log data:
///
/// - **[`Notification::NoPastLogsFound`][no_logs]**: Emitted when no matching logs are found in
/// the scanned range.
///
/// # Arguments
///
/// * `count` - Maximum number of recent events to collect per listener (must be greater than 0)
Expand All @@ -323,6 +330,7 @@ impl EventScannerBuilder<Unspecified> {
/// [start]: EventScanner::start
/// [sync_from_latest]: EventScannerBuilder::from_latest
/// [reorg]: crate::Notification::ReorgDetected
/// [no_logs]: crate::Notification::NoPastLogsFound
#[must_use]
pub fn latest(count: usize) -> EventScannerBuilder<LatestEvents> {
EventScannerBuilder::<LatestEvents>::new(count)
Expand Down
12 changes: 12 additions & 0 deletions src/event_scanner/scanner/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ impl EventScannerBuilder<Synchronize> {
/// block confirmations
/// - **Continuous operation**: Live phase continues indefinitely until the scanner is dropped
///
/// # Notifications
///
/// During the **latest events phase**, the scanner can emit the following notification
/// before transitioning to live mode:
///
/// - **[`Notification::NoPastLogsFound`][no_logs]**: Emitted when no matching logs are found in
/// the scanned range
///
/// After the latest events phase completes, [`Notification::SwitchingToLive`][switch_to_live]
/// is emitted before transitioning to the live streaming phase.
///
/// # Arguments
///
/// * `count` - Maximum number of recent events to collect per listener before switching to live
Expand All @@ -102,6 +113,7 @@ impl EventScannerBuilder<Synchronize> {
/// [start]: crate::event_scanner::EventScanner::start
/// [reorg]: crate::types::Notification::ReorgDetected
/// [switch_to_live]: crate::types::Notification::SwitchingToLive
/// [no_logs]: crate::types::Notification::NoPastLogsFound
#[must_use]
pub fn from_latest(self, count: usize) -> EventScannerBuilder<SyncFromLatestEvents> {
EventScannerBuilder::<SyncFromLatestEvents>::new(count)
Expand Down
18 changes: 17 additions & 1 deletion src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,32 @@ use tracing::{info, warn};

use crate::ScannerError;

#[derive(Debug, Clone)]
/// Messages streamed by the scanner to subscribers.
///
/// Each message represents either data (logs), an error, or a notification about the scanner's
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
/// Each message represents either data (logs), an error, or a notification about the scanner's
/// Each message represents either data or a notification about the scanner's

No more error :D

Also not always logs?

/// state or behavior.
#[derive(Copy, Debug, Clone)]
pub enum ScannerMessage<T: Clone> {
/// Data streamed to the subscriber.
Data(T),

/// Notification about scanner state changes or important events.
Comment on lines +14 to +17
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Also wonder how useful these are

Data is self explanatory and Notification comment is the same above the notification enum

Notification(Notification),
}

/// Notifications emitted by the scanner to signal state changes or important events.
#[derive(Copy, Debug, Clone, PartialEq)]
pub enum Notification {
/// Emitted when transitioning from the latest events phase to live streaming mode
/// in sync scanners.
SwitchingToLive,

/// Emitted when a blockchain reorganization is detected during scanning.
ReorgDetected,

/// Emitted during the latest events phase when no matching logs are found in the
/// scanned range.
NoPastLogsFound,
}

impl<T: Clone> From<Notification> for ScannerMessage<T> {
Expand Down
10 changes: 3 additions & 7 deletions tests/latest_events/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ use alloy::{
};

use crate::common::{TestCounter, deploy_counter, setup_common, setup_latest_scanner};
use event_scanner::{EventFilter, EventScannerBuilder, assert_closed, assert_next};
use event_scanner::{EventFilter, EventScannerBuilder, Notification, assert_closed, assert_next};

#[tokio::test]
async fn exact_count_returns_last_events_in_order() -> anyhow::Result<()> {
let count = 5;
let setup = setup_latest_scanner(None, None, count, None, None).await?;
let setup = setup_latest_scanner(None, None, 5, None, None).await?;
let contract = setup.contract;
let scanner = setup.scanner;
let mut stream = setup.stream;
Expand All @@ -17,7 +16,6 @@ async fn exact_count_returns_last_events_in_order() -> anyhow::Result<()> {
contract.increase().send().await?.watch().await?;
}

// Ask for the latest 5
scanner.start().await?;

assert_next!(
Expand Down Expand Up @@ -72,9 +70,7 @@ async fn no_events_returns_empty() -> anyhow::Result<()> {

scanner.start().await?;

let expected: &[TestCounter::CountIncreased] = &[];

assert_next!(stream, expected);
assert_next!(stream, Notification::NoPastLogsFound);
assert_closed!(stream);

Ok(())
Expand Down
3 changes: 1 addition & 2 deletions tests/sync/from_latest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ async fn no_historical_only_live_streams() -> anyhow::Result<()> {
scanner.start().await?;

// Latest is empty
let expected: &[TestCounter::CountIncreased] = &[];
assert_next!(stream, expected);
assert_next!(stream, Notification::NoPastLogsFound);
assert_next!(stream, Notification::SwitchingToLive);
let mut stream = assert_empty!(stream);

Expand Down