From 608f35a36110924b938e6f690ebc3f99852dc1eb Mon Sep 17 00:00:00 2001 From: Leo Date: Wed, 19 Nov 2025 23:33:13 +0900 Subject: [PATCH 01/26] feat: remove error in type.rs --- src/types.rs | 41 ++++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/src/types.rs b/src/types.rs index d657920..48e4f01 100644 --- a/src/types.rs +++ b/src/types.rs @@ -3,10 +3,11 @@ use std::{error::Error, fmt::Debug}; use tokio::sync::mpsc; use tracing::{info, warn}; +use crate::ScannerError; + #[derive(Copy, Debug, Clone)] -pub enum ScannerMessage { +pub enum ScannerMessage { Data(T), - Error(E), Notification(Notification), } @@ -16,13 +17,13 @@ pub enum Notification { ReorgDetected, } -impl From for ScannerMessage { +impl From for ScannerMessage { fn from(value: Notification) -> Self { ScannerMessage::Notification(value) } } -impl PartialEq for ScannerMessage { +impl PartialEq for ScannerMessage { fn eq(&self, other: &Notification) -> bool { if let ScannerMessage::Notification(notification) = self { notification == other @@ -32,15 +33,33 @@ impl PartialEq for ScannerMessage { - async fn try_stream>>(&self, msg: M) -> bool; +pub(crate) trait TryStream { + async fn try_stream(&self, item: M) -> bool; +} + +impl TryStream for mpsc::Sender> +where + M: Into>, +{ + async fn try_stream(&self, item: M) -> bool { + let item = item.into(); + info!(item = ?item, "Sending message"); + if let Err(err) = self.send(item).await { + warn!(error = %err, "Downstream channel closed, stopping stream"); + return false; + } + true + } } -impl TryStream for mpsc::Sender> { - async fn try_stream>>(&self, msg: M) -> bool { - let msg = msg.into(); - info!(msg = ?msg, "Sending message"); - if let Err(err) = self.send(msg).await { +impl TryStream for mpsc::Sender +where + E: Error + Clone + Into, +{ + async fn try_stream(&self, item: E) -> bool { + let item = item.into(); + info!(item = ?item, "Sending error"); + if let Err(err) = self.send(item).await { warn!(error = %err, "Downstream channel closed, stopping stream"); return false; } From f0a8371deeadac8d5b7e4198e1ab4aede9acded6 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 00:06:59 +0900 Subject: [PATCH 02/26] feat: remove error type - start fixing block range scanner e --- src/block_range_scanner.rs | 80 ++++++++++++++---------------------- src/event_scanner/error.rs | 27 ------------ src/event_scanner/message.rs | 11 +---- src/types.rs | 15 +++++++ 4 files changed, 48 insertions(+), 85 deletions(-) delete mode 100644 src/event_scanner/error.rs diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 211a411..74b1c74 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -67,8 +67,7 @@ use tokio::{ use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use crate::{ - ScannerMessage, - error::ScannerError, + ScannerError, ScannerMessage, robust_provider::{Error as RobustProviderError, IntoRobustProvider, RobustProvider}, types::{Notification, TryStream}, }; @@ -78,7 +77,6 @@ use alloy::{ network::{BlockResponse, Network, primitives::HeaderResponse}, primitives::{B256, BlockNumber}, pubsub::Subscription, - transports::{RpcError, TransportErrorKind}, }; use tracing::{debug, error, info, warn}; @@ -92,7 +90,7 @@ pub const MAX_BUFFERED_MESSAGES: usize = 50000; // is considered final) pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 64; -pub type Message = ScannerMessage, ScannerError>; +pub type Message = ScannerMessage>; impl From> for Message { fn from(logs: RangeInclusive) -> Self { @@ -106,24 +104,6 @@ impl PartialEq> for Message { } } -impl From for Message { - fn from(error: RobustProviderError) -> Self { - Message::Error(error.into()) - } -} - -impl From> for Message { - fn from(error: RpcError) -> Self { - Message::Error(error.into()) - } -} - -impl From for Message { - fn from(error: ScannerError) -> Self { - Message::Error(error) - } -} - #[derive(Clone)] pub struct BlockRangeScanner { pub max_block_range: u64, @@ -190,24 +170,24 @@ impl ConnectedBlockRangeScanner { #[derive(Debug)] pub enum Command { StreamLive { - sender: mpsc::Sender, + sender: mpsc::Sender>, block_confirmations: u64, response: oneshot::Sender>, }, StreamHistorical { - sender: mpsc::Sender, + sender: mpsc::Sender>, start_height: BlockNumberOrTag, end_height: BlockNumberOrTag, response: oneshot::Sender>, }, StreamFrom { - sender: mpsc::Sender, + sender: mpsc::Sender>, start_height: BlockNumberOrTag, block_confirmations: u64, response: oneshot::Sender>, }, Rewind { - sender: mpsc::Sender, + sender: mpsc::Sender>, start_height: BlockNumberOrTag, end_height: BlockNumberOrTag, response: oneshot::Sender>, @@ -288,7 +268,7 @@ impl Service { async fn handle_live( &mut self, block_confirmations: u64, - sender: mpsc::Sender, + sender: mpsc::Sender>, ) -> Result<(), ScannerError> { let max_block_range = self.max_block_range; let latest = self.provider.get_block_number().await?; @@ -320,7 +300,7 @@ impl Service { &mut self, start_height: BlockNumberOrTag, end_height: BlockNumberOrTag, - sender: mpsc::Sender, + sender: mpsc::Sender>, ) -> Result<(), ScannerError> { let max_block_range = self.max_block_range; @@ -356,7 +336,7 @@ impl Service { &mut self, start_height: BlockNumberOrTag, block_confirmations: u64, - sender: mpsc::Sender, + sender: mpsc::Sender>, ) -> Result<(), ScannerError> { let provider = self.provider.clone(); let max_block_range = self.max_block_range; @@ -409,7 +389,7 @@ impl Service { // Step 2: Setup the live streaming buffer // This channel will accumulate while historical sync is running let (live_block_buffer_sender, live_block_buffer_receiver) = - mpsc::channel::(MAX_BUFFERED_MESSAGES); + mpsc::channel::>(MAX_BUFFERED_MESSAGES); // The cutoff is the last block we have synced historically // Any block > cutoff will come from the live stream @@ -459,7 +439,7 @@ impl Service { &mut self, start_height: BlockNumberOrTag, end_height: BlockNumberOrTag, - sender: mpsc::Sender, + sender: mpsc::Sender>, ) -> Result<(), ScannerError> { let max_block_range = self.max_block_range; let provider = self.provider.clone(); @@ -493,7 +473,7 @@ impl Service { from: N::BlockResponse, to: N::BlockResponse, max_block_range: u64, - sender: &mpsc::Sender, + sender: &mpsc::Sender>, provider: &RobustProvider, ) { let mut batch_count = 0; @@ -555,7 +535,7 @@ impl Service { } Err(e) => { error!(error = %e, "Terminal RPC call error, shutting down"); - _ = sender.try_stream(e); + _ = sender.try_stream(e.into()); return; } }; @@ -573,7 +553,7 @@ impl Service { start: BlockNumber, end: BlockNumber, max_block_range: u64, - sender: &mpsc::Sender, + sender: &mpsc::Sender>, ) { let mut batch_count = 0; @@ -610,7 +590,7 @@ impl Service { async fn stream_live_blocks( mut range_start: BlockNumber, subscription: Subscription, - sender: mpsc::Sender, + sender: mpsc::Sender>, block_confirmations: u64, max_block_range: u64, ) { @@ -654,8 +634,8 @@ impl Service { } async fn process_live_block_buffer( - mut buffer_rx: mpsc::Receiver, - sender: mpsc::Sender, + mut buffer_rx: mpsc::Receiver>, + sender: mpsc::Sender>, cutoff: BlockNumber, ) { let mut processed = 0; @@ -664,7 +644,7 @@ impl Service { // Process all buffered messages while let Some(data) = buffer_rx.recv().await { match data { - Message::Data(range) => { + Ok(Message::Data(range)) => { let (start, end) = (*range.start(), *range.end()); if start >= cutoff { if !sender.try_stream(range).await { @@ -683,9 +663,13 @@ impl Service { discarded += end - start; } } - other => { - // Could be error or notification - if !sender.try_stream(other).await { + Ok(Message::Notification(notif)) => { + if !sender.try_stream(notif).await { + break; + } + } + Err(e) => { + if !sender.try_stream(e).await { break; } } @@ -734,7 +718,7 @@ impl BlockRangeScannerClient { pub async fn stream_live( &self, block_confirmations: u64, - ) -> Result, ScannerError> { + ) -> Result>, ScannerError> { let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); let (response_tx, response_rx) = oneshot::channel(); @@ -765,7 +749,7 @@ impl BlockRangeScannerClient { &self, start_height: impl Into, end_height: impl Into, - ) -> Result, ScannerError> { + ) -> Result>, ScannerError> { let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); let (response_tx, response_rx) = oneshot::channel(); @@ -797,7 +781,7 @@ impl BlockRangeScannerClient { &self, start_height: impl Into, block_confirmations: u64, - ) -> Result, ScannerError> { + ) -> Result>, ScannerError> { let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); let (response_tx, response_rx) = oneshot::channel(); @@ -829,7 +813,7 @@ impl BlockRangeScannerClient { &self, start_height: impl Into, end_height: impl Into, - ) -> Result, ScannerError> { + ) -> Result>, ScannerError> { let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); let (response_tx, response_rx) = oneshot::channel(); @@ -955,15 +939,13 @@ mod tests { #[tokio::test] async fn try_send_forwards_errors_to_subscribers() { - let (tx, mut rx) = mpsc::channel::(1); + let (tx, mut rx) = mpsc::channel::(1); _ = tx.try_stream(ScannerError::BlockNotFound(4.into())).await; assert!(matches!( rx.recv().await, - Some(ScannerMessage::Error(ScannerError::BlockNotFound(BlockId::Number( - BlockNumberOrTag::Number(4) - )))) + Some(ScannerError::BlockNotFound(BlockId::Number(BlockNumberOrTag::Number(4)))) )); } } diff --git a/src/event_scanner/error.rs b/src/event_scanner/error.rs deleted file mode 100644 index b689950..0000000 --- a/src/event_scanner/error.rs +++ /dev/null @@ -1,27 +0,0 @@ -use alloy::{ - rpc::types::Log, - transports::{RpcError, TransportErrorKind}, -}; - -use crate::{Message, ScannerError}; - -impl From> for Message { - fn from(e: RpcError) -> Self { - Message::Error(e.into()) - } -} - -impl From for Message { - fn from(error: ScannerError) -> Self { - Message::Error(error) - } -} - -impl From, RpcError>> for Message { - fn from(logs: Result, RpcError>) -> Self { - match logs { - Ok(logs) => Message::Data(logs), - Err(e) => Message::Error(e.into()), - } - } -} diff --git a/src/event_scanner/message.rs b/src/event_scanner/message.rs index ebd1081..252f5fc 100644 --- a/src/event_scanner/message.rs +++ b/src/event_scanner/message.rs @@ -1,8 +1,8 @@ use alloy::{rpc::types::Log, sol_types::SolEvent}; -use crate::{ScannerError, ScannerMessage, robust_provider::Error as RobustProviderError}; +use crate::ScannerMessage; -pub type Message = ScannerMessage, ScannerError>; +pub type Message = ScannerMessage>; impl From> for Message { fn from(logs: Vec) -> Self { @@ -10,13 +10,6 @@ impl From> for Message { } } -impl From for Message { - fn from(error: RobustProviderError) -> Message { - let scanner_error: ScannerError = error.into(); - scanner_error.into() - } -} - impl PartialEq> for Message { fn eq(&self, other: &Vec) -> bool { self.eq(&other.as_slice()) diff --git a/src/types.rs b/src/types.rs index 48e4f01..61086f1 100644 --- a/src/types.rs +++ b/src/types.rs @@ -37,6 +37,21 @@ pub(crate) trait TryStream { async fn try_stream(&self, item: M) -> bool; } +impl TryStream for mpsc::Sender, ScannerError>> +where + M: Into>, +{ + async fn try_stream(&self, item: M) -> bool { + let item = item.into(); + info!(item = ?item, "Sending message"); + if let Err(err) = self.send(Ok(item)).await { + warn!(error = %err, "Downstream channel closed, stopping stream"); + return false; + } + true + } +} + impl TryStream for mpsc::Sender> where M: Into>, From ccd8a652eea6418394d4f58c34c30c8dbea211fc Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 00:31:21 +0900 Subject: [PATCH 03/26] feat: update event_scanner to use Result stream --- src/event_scanner/mod.rs | 1 - src/event_scanner/scanner/common.rs | 81 ++++++++++--------- src/event_scanner/scanner/sync/from_latest.rs | 4 +- src/types.rs | 15 ---- 4 files changed, 46 insertions(+), 55 deletions(-) diff --git a/src/event_scanner/mod.rs b/src/event_scanner/mod.rs index 2e031b7..2da1657 100644 --- a/src/event_scanner/mod.rs +++ b/src/event_scanner/mod.rs @@ -1,4 +1,3 @@ -mod error; mod filter; mod listener; mod message; diff --git a/src/event_scanner/scanner/common.rs b/src/event_scanner/scanner/common.rs index af736a7..35c204b 100644 --- a/src/event_scanner/scanner/common.rs +++ b/src/event_scanner/scanner/common.rs @@ -1,6 +1,7 @@ use std::ops::RangeInclusive; use crate::{ + ScannerError, block_range_scanner::{MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage}, event_scanner::{filter::EventFilter, listener::EventListener}, robust_provider::{Error as RobustProviderError, RobustProvider}, @@ -45,13 +46,17 @@ pub enum ConsumerMode { /// # Note /// /// Assumes it is running in a separate tokio task, so as to be non-blocking. -pub async fn handle_stream + Unpin>( +pub async fn handle_stream< + N: Network, + S: Stream> + Unpin, +>( mut stream: S, provider: &RobustProvider, listeners: &[EventListener], mode: ConsumerMode, ) { - let (range_tx, _) = broadcast::channel::(MAX_BUFFERED_MESSAGES); + let (range_tx, _) = + broadcast::channel::>(MAX_BUFFERED_MESSAGES); let consumers = spawn_log_consumers(provider, listeners, &range_tx, mode); @@ -73,7 +78,7 @@ pub async fn handle_stream + Unp pub fn spawn_log_consumers( provider: &RobustProvider, listeners: &[EventListener], - range_tx: &Sender, + range_tx: &Sender>, mode: ConsumerMode, ) -> JoinSet<()> { listeners.iter().cloned().fold(JoinSet::new(), |mut set, listener| { @@ -92,51 +97,53 @@ pub fn spawn_log_consumers( loop { match range_rx.recv().await { - Ok(BlockRangeMessage::Data(range)) => { - match get_logs(range, &filter, &base_filter, &provider).await { - Ok(logs) => { - if logs.is_empty() { - continue; - } + Ok(message) => { + match message { + Ok(BlockRangeMessage::Data(range)) => { + match get_logs(range, &filter, &base_filter, &provider).await { + Ok(logs) => { + if logs.is_empty() { + continue; + } - match mode { - ConsumerMode::Stream => { - if !sender.try_stream(logs).await { - break; + match mode { + ConsumerMode::Stream => { + if !sender.try_stream(logs).await { + break; + } + } + ConsumerMode::CollectLatest { count } => { + let take = count.saturating_sub(collected.len()); + // if we have enough logs, break + if take == 0 { + break; + } + // take latest within this range + collected.extend(logs.into_iter().rev().take(take)); + // if we have enough logs, break + if collected.len() == count { + break; + } + } } } - ConsumerMode::CollectLatest { count } => { - let take = count.saturating_sub(collected.len()); - // if we have enough logs, break - if take == 0 { - break; - } - // take latest within this range - collected.extend(logs.into_iter().rev().take(take)); - // if we have enough logs, break - if collected.len() == count { + Err(e) => { + if !sender.try_stream(e).await { break; } } } } - Err(e) => { - if !sender.try_stream(e).await { + Ok(BlockRangeMessage::Notification(notification)) => { + info!(notification = ?notification, "Received notification"); + if !sender.try_stream(notification).await { break; } } - } - } - Ok(BlockRangeMessage::Error(e)) => { - error!(error = ?e, "Received error message"); - if !sender.try_stream(e).await { - break; - } - } - Ok(BlockRangeMessage::Notification(notification)) => { - info!(notification = ?notification, "Received notification"); - if !sender.try_stream(notification).await { - break; + Err(e) => { + error!(error = ?e, "Received error message"); + sender.try_stream(e).await + } } } Err(RecvError::Closed) => { diff --git a/src/event_scanner/scanner/sync/from_latest.rs b/src/event_scanner/scanner/sync/from_latest.rs index 7be468d..25086b9 100644 --- a/src/event_scanner/scanner/sync/from_latest.rs +++ b/src/event_scanner/scanner/sync/from_latest.rs @@ -104,9 +104,9 @@ impl EventScanner { info!("Switching to live stream"); // Use a one-off channel for the notification. - let (tx, rx) = mpsc::channel::(1); + let (tx, rx) = mpsc::channel::>(1); let stream = ReceiverStream::new(rx); - tx.send(BlockRangeMessage::Notification(Notification::SwitchingToLive)) + tx.send(Ok(BlockRangeMessage::Notification(Notification::SwitchingToLive))) .await .expect("receiver exists"); diff --git a/src/types.rs b/src/types.rs index 61086f1..48e4f01 100644 --- a/src/types.rs +++ b/src/types.rs @@ -37,21 +37,6 @@ pub(crate) trait TryStream { async fn try_stream(&self, item: M) -> bool; } -impl TryStream for mpsc::Sender, ScannerError>> -where - M: Into>, -{ - async fn try_stream(&self, item: M) -> bool { - let item = item.into(); - info!(item = ?item, "Sending message"); - if let Err(err) = self.send(Ok(item)).await { - warn!(error = %err, "Downstream channel closed, stopping stream"); - return false; - } - true - } -} - impl TryStream for mpsc::Sender> where M: Into>, From 4f01e3ef24543782dcdb2412dd95d4a5c120676c Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 16:45:11 +0900 Subject: [PATCH 04/26] feat: update macro and add try_stream_err --- src/block_range_scanner.rs | 38 +++++++++++++++++----------------- src/test_utils/macros.rs | 36 ++++++++++++++++++++++++++++---- src/types.rs | 42 +++++++++++++++++++++++++++++++++++--- 3 files changed, 90 insertions(+), 26 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 74b1c74..c2205e4 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -69,7 +69,7 @@ use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use crate::{ ScannerError, ScannerMessage, robust_provider::{Error as RobustProviderError, IntoRobustProvider, RobustProvider}, - types::{Notification, TryStream}, + types::{Notification, TryStream, TryStreamError}, }; use alloy::{ consensus::BlockHeader, @@ -513,7 +513,7 @@ impl Service { } Err(e) => { error!(error = %e, "Terminal RPC call error, shutting down"); - _ = sender.try_stream(e); + _ = sender.try_stream_err(e); return; } }; @@ -535,7 +535,7 @@ impl Service { } Err(e) => { error!(error = %e, "Terminal RPC call error, shutting down"); - _ = sender.try_stream(e.into()); + _ = sender.try_stream_err(e); return; } }; @@ -669,7 +669,7 @@ impl Service { } } Err(e) => { - if !sender.try_stream(e).await { + if !sender.try_stream_err(e).await { break; } } @@ -859,9 +859,9 @@ mod tests { async fn buffered_messages_after_cutoff_are_all_passed() { let cutoff = 50; let (buffer_tx, buffer_rx) = mpsc::channel(8); - buffer_tx.send(Message::Data(51..=55)).await.unwrap(); - buffer_tx.send(Message::Data(56..=60)).await.unwrap(); - buffer_tx.send(Message::Data(61..=70)).await.unwrap(); + buffer_tx.send(Ok(Message::Data(51..=55))).await.unwrap(); + buffer_tx.send(Ok(Message::Data(56..=60))).await.unwrap(); + buffer_tx.send(Ok(Message::Data(61..=70))).await.unwrap(); drop(buffer_tx); let (out_tx, out_rx) = mpsc::channel(8); @@ -880,9 +880,9 @@ mod tests { let cutoff = 100; let (buffer_tx, buffer_rx) = mpsc::channel(8); - buffer_tx.send(Message::Data(40..=50)).await.unwrap(); - buffer_tx.send(Message::Data(51..=60)).await.unwrap(); - buffer_tx.send(Message::Data(61..=70)).await.unwrap(); + buffer_tx.send(Ok(Message::Data(40..=50))).await.unwrap(); + buffer_tx.send(Ok(Message::Data(51..=60))).await.unwrap(); + buffer_tx.send(Ok(Message::Data(61..=70))).await.unwrap(); drop(buffer_tx); let (out_tx, out_rx) = mpsc::channel(8); @@ -898,9 +898,9 @@ mod tests { let cutoff = 75; let (buffer_tx, buffer_rx) = mpsc::channel(8); - buffer_tx.send(Message::Data(60..=70)).await.unwrap(); - buffer_tx.send(Message::Data(71..=80)).await.unwrap(); - buffer_tx.send(Message::Data(81..=86)).await.unwrap(); + buffer_tx.send(Ok(Message::Data(60..=70))).await.unwrap(); + buffer_tx.send(Ok(Message::Data(71..=80))).await.unwrap(); + buffer_tx.send(Ok(Message::Data(81..=86))).await.unwrap(); drop(buffer_tx); let (out_tx, out_rx) = mpsc::channel(8); @@ -918,11 +918,11 @@ mod tests { let cutoff = 100; let (buffer_tx, buffer_rx) = mpsc::channel(8); - buffer_tx.send(Message::Data(98..=98)).await.unwrap(); // Just before: discard - buffer_tx.send(Message::Data(99..=100)).await.unwrap(); // Includes cutoff: trim to 100..=100 - buffer_tx.send(Message::Data(100..=100)).await.unwrap(); // Exactly at: forward - buffer_tx.send(Message::Data(100..=101)).await.unwrap(); // Starts at cutoff: forward - buffer_tx.send(Message::Data(102..=102)).await.unwrap(); // After cutoff: forward + buffer_tx.send(Ok(Message::Data(98..=98))).await.unwrap(); // Just before: discard + buffer_tx.send(Ok(Message::Data(99..=100))).await.unwrap(); // Includes cutoff: trim to 100..=100 + buffer_tx.send(Ok(Message::Data(100..=100))).await.unwrap(); // Exactly at: forward + buffer_tx.send(Ok(Message::Data(100..=101))).await.unwrap(); // Starts at cutoff: forward + buffer_tx.send(Ok(Message::Data(102..=102))).await.unwrap(); // After cutoff: forward drop(buffer_tx); let (out_tx, out_rx) = mpsc::channel(8); @@ -941,7 +941,7 @@ mod tests { async fn try_send_forwards_errors_to_subscribers() { let (tx, mut rx) = mpsc::channel::(1); - _ = tx.try_stream(ScannerError::BlockNotFound(4.into())).await; + _ = tx.try_stream_err(ScannerError::BlockNotFound(4.into())).await; assert!(matches!( rx.recv().await, diff --git a/src/test_utils/macros.rs b/src/test_utils/macros.rs index 78e6032..adcb835 100644 --- a/src/test_utils/macros.rs +++ b/src/test_utils/macros.rs @@ -5,10 +5,15 @@ use crate::Message; #[macro_export] macro_rules! assert_next { + // Convenience form with default timeout + ($stream: expr, Err($expected_err:pat)) => { + $crate::assert_next!($stream, Err($expected_err), timeout = 5) + }; ($stream: expr, $expected: expr) => { - assert_next!($stream, $expected, timeout = 5) + $crate::assert_next!($stream, $expected, timeout = 5) }; - ($stream: expr, $expected: expr, timeout = $secs: expr) => { + // Result::Err expectation – assert the next item is an Err matching the pattern + ($stream: expr, Err($expected_err:pat), timeout = $secs: expr) => { let message = tokio::time::timeout( std::time::Duration::from_secs($secs), tokio_stream::StreamExt::next(&mut $stream), @@ -16,9 +21,32 @@ macro_rules! assert_next { .await .expect("timed out"); if let Some(msg) = message { - assert_eq!(msg, $expected) + assert!(matches!(msg, Err($expected_err))); } else { - panic!("Expected {:?}, but channel was closed", $expected) + panic!("Expected Err(..), but channel was closed"); + } + }; + ($stream: expr, $expected: expr, timeout = $secs: expr) => { + let message = tokio::time::timeout( + std::time::Duration::from_secs($secs), + tokio_stream::StreamExt::next(&mut $stream), + ) + .await + .expect("timed out"); + let expected = $expected; + match message { + // Some(Ok($crate::ScannerMessage::Data(data))) => { + // assert_eq!(data, expected, "Expected Data({:?}), got Data({:?})", expected, + // data); } + Some(Ok(msg)) => { + assert_eq!(msg, expected, "Expected {:?}, got {:?}", expected, msg); + } + Some(Err(e)) => { + panic!("Expected Ok({:?}), got Err({:?})", expected, e); + } + None => { + panic!("Expected Ok({:?}), but channel was closed", expected); + } } }; } diff --git a/src/types.rs b/src/types.rs index 48e4f01..e4cc43d 100644 --- a/src/types.rs +++ b/src/types.rs @@ -5,7 +5,7 @@ use tracing::{info, warn}; use crate::ScannerError; -#[derive(Copy, Debug, Clone)] +#[derive(Debug, Clone)] pub enum ScannerMessage { Data(T), Notification(Notification), @@ -37,6 +37,10 @@ pub(crate) trait TryStream { async fn try_stream(&self, item: M) -> bool; } +pub(crate) trait TryStreamError { + async fn try_stream_err(&self, error: E) -> bool; +} + impl TryStream for mpsc::Sender> where M: Into>, @@ -52,11 +56,27 @@ where } } -impl TryStream for mpsc::Sender +impl TryStream for mpsc::Sender, ScannerError>> +where + M: Into>, +{ + async fn try_stream(&self, item: M) -> bool { + let item = item.into(); + let item = Ok(item); + info!(item = ?item, "Sending message"); + if let Err(err) = self.send(item).await { + warn!(error = %err, "Downstream channel closed, stopping stream"); + return false; + } + true + } +} + +impl TryStreamError for mpsc::Sender where E: Error + Clone + Into, { - async fn try_stream(&self, item: E) -> bool { + async fn try_stream_err(&self, item: E) -> bool { let item = item.into(); info!(item = ?item, "Sending error"); if let Err(err) = self.send(item).await { @@ -66,3 +86,19 @@ where true } } + +impl TryStreamError + for mpsc::Sender, ScannerError>> +where + E: Error + Clone + Into, +{ + async fn try_stream_err(&self, item: E) -> bool { + let item = item.into(); + info!(item = ?item, "Sending error"); + if let Err(err) = self.send(Err(item)).await { + warn!(error = %err, "Downstream channel closed, stopping stream"); + return false; + } + true + } +} From ee05db9d9d074b8027e36e0eb0345ad2a39ee9ee Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 16:47:15 +0900 Subject: [PATCH 05/26] feat: make event scanner return result stream --- src/event_scanner/listener.rs | 7 +++++-- src/event_scanner/scanner/mod.rs | 8 ++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/event_scanner/listener.rs b/src/event_scanner/listener.rs index 2e57b77..f76d4b3 100644 --- a/src/event_scanner/listener.rs +++ b/src/event_scanner/listener.rs @@ -1,8 +1,11 @@ -use crate::event_scanner::{filter::EventFilter, message::Message}; +use crate::{ + ScannerError, + event_scanner::{filter::EventFilter, message::Message}, +}; use tokio::sync::mpsc::Sender; #[derive(Clone)] pub(crate) struct EventListener { pub filter: EventFilter, - pub sender: Sender, + pub sender: Sender>, } diff --git a/src/event_scanner/scanner/mod.rs b/src/event_scanner/scanner/mod.rs index b97ee2a..a32ce52 100644 --- a/src/event_scanner/scanner/mod.rs +++ b/src/event_scanner/scanner/mod.rs @@ -405,8 +405,12 @@ impl EventScannerBuilder { impl EventScanner { #[must_use] - pub fn subscribe(&mut self, filter: EventFilter) -> ReceiverStream { - let (sender, receiver) = mpsc::channel::(MAX_BUFFERED_MESSAGES); + pub fn subscribe( + &mut self, + filter: EventFilter, + ) -> ReceiverStream> { + let (sender, receiver) = + mpsc::channel::>(MAX_BUFFERED_MESSAGES); self.listeners.push(EventListener { filter, sender }); ReceiverStream::new(receiver) } From 89b449f7203190edb2418cc6615e6ec046ea5eb4 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 16:49:38 +0900 Subject: [PATCH 06/26] feat: update common fn --- src/event_scanner/scanner/common.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/event_scanner/scanner/common.rs b/src/event_scanner/scanner/common.rs index 35c204b..86d0740 100644 --- a/src/event_scanner/scanner/common.rs +++ b/src/event_scanner/scanner/common.rs @@ -5,7 +5,7 @@ use crate::{ block_range_scanner::{MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage}, event_scanner::{filter::EventFilter, listener::EventListener}, robust_provider::{Error as RobustProviderError, RobustProvider}, - types::TryStream, + types::{TryStream, TryStreamError}, }; use alloy::{ network::Network, @@ -128,7 +128,8 @@ pub fn spawn_log_consumers( } } Err(e) => { - if !sender.try_stream(e).await { + error!(error = ?e, "Received error message"); + if !sender.try_stream_err(e).await { break; } } @@ -142,7 +143,9 @@ pub fn spawn_log_consumers( } Err(e) => { error!(error = ?e, "Received error message"); - sender.try_stream(e).await + if !sender.try_stream_err(e).await { + break; + } } } } From e3bd76df220085655f391ba05baf029f97245145 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 17:27:09 +0900 Subject: [PATCH 07/26] feat: update examples to use result --- examples/historical_scanning/main.rs | 10 +++++----- examples/latest_events_scanning/main.rs | 10 +++++----- examples/live_scanning/main.rs | 10 +++++----- examples/sync_from_block_scanning/main.rs | 6 +++--- examples/sync_from_latest_scanning/main.rs | 10 +++++----- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/examples/historical_scanning/main.rs b/examples/historical_scanning/main.rs index 2bfcdfb..b7224bf 100644 --- a/examples/historical_scanning/main.rs +++ b/examples/historical_scanning/main.rs @@ -69,17 +69,17 @@ async fn main() -> anyhow::Result<()> { while let Some(message) = stream.next().await { match message { - Message::Data(logs) => { + Ok(Message::Data(logs)) => { for log in logs { info!("Callback successfully executed with event {:?}", log.inner.data); } } - Message::Error(e) => { - error!("Received error: {}", e); - } - Message::Notification(info) => { + Ok(Message::Notification(info)) => { info!("Received info: {:?}", info); } + Err(e) => { + error!("Received error: {}", e); + } } } diff --git a/examples/latest_events_scanning/main.rs b/examples/latest_events_scanning/main.rs index 3dfbdaa..2c3d3b7 100644 --- a/examples/latest_events_scanning/main.rs +++ b/examples/latest_events_scanning/main.rs @@ -70,17 +70,17 @@ async fn main() -> anyhow::Result<()> { while let Some(message) = stream.next().await { match message { - Message::Data(logs) => { + Ok(Message::Data(logs)) => { for log in logs { info!("Received event: {:?}", log.inner.data); } } - Message::Error(e) => { - error!("Received error: {}", e); - } - Message::Notification(info) => { + Ok(Message::Notification(info)) => { info!("Received notification: {:?}", info); } + Err(e) => { + error!("Received error: {}", e); + } } } diff --git a/examples/live_scanning/main.rs b/examples/live_scanning/main.rs index 5864b48..7eb8d9c 100644 --- a/examples/live_scanning/main.rs +++ b/examples/live_scanning/main.rs @@ -69,17 +69,17 @@ async fn main() -> anyhow::Result<()> { while let Some(message) = stream.next().await { match message { - Message::Data(logs) => { + Ok(Message::Data(logs)) => { for log in logs { info!("Callback successfully executed with event {:?}", log.inner.data); } } - Message::Error(e) => { - error!("Received error: {}", e); - } - Message::Notification(info) => { + Ok(Message::Notification(info)) => { info!("Received info: {:?}", info); } + Err(e) => { + error!("Received error: {}", e); + } } } diff --git a/examples/sync_from_block_scanning/main.rs b/examples/sync_from_block_scanning/main.rs index 38cf60f..84e94b4 100644 --- a/examples/sync_from_block_scanning/main.rs +++ b/examples/sync_from_block_scanning/main.rs @@ -86,7 +86,7 @@ async fn main() -> anyhow::Result<()> { while let Some(message) = stream.next().await { match message { - Message::Data(logs) => { + Ok(Message::Data(logs)) => { for log in logs { let Counter::CountIncreased { newCount } = log.log_decode().unwrap().inner.data; if newCount <= 3 { @@ -98,10 +98,10 @@ async fn main() -> anyhow::Result<()> { } } } - Message::Error(e) => { + Err(e) => { error!("Received error: {}", e); } - Message::Notification(info) => { + Ok(Message::Notification(info)) => { info!("Received notification: {:?}", info); } } diff --git a/examples/sync_from_latest_scanning/main.rs b/examples/sync_from_latest_scanning/main.rs index 476e84a..ed0216a 100644 --- a/examples/sync_from_latest_scanning/main.rs +++ b/examples/sync_from_latest_scanning/main.rs @@ -77,17 +77,17 @@ async fn main() -> anyhow::Result<()> { // only the last 5 events will be streamed before switching to live mode while let Some(message) = stream.next().await { match message { - Message::Data(logs) => { + Ok(Message::Data(logs)) => { for log in logs { info!("Callback successfully executed with event {:?}", log.inner.data); } } - Message::Error(e) => { - error!("Received error: {}", e); - } - Message::Notification(info) => { + Ok(Message::Notification(info)) => { info!("Received info: {:?}", info); } + Err(e) => { + error!("Received error: {}", e); + } } } From e5871a70655ec8e33cc1d6ff3c9df9e90b5e5a7b Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 17:27:59 +0900 Subject: [PATCH 08/26] doc: update to use result --- src/event_scanner/scanner/sync/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/event_scanner/scanner/sync/mod.rs b/src/event_scanner/scanner/sync/mod.rs index 6f1ec69..f88f49b 100644 --- a/src/event_scanner/scanner/sync/mod.rs +++ b/src/event_scanner/scanner/sync/mod.rs @@ -45,14 +45,14 @@ impl EventScannerBuilder { /// /// while let Some(msg) = stream.next().await { /// match msg { - /// Message::Data(logs) => { + /// Ok(Message::Data(logs)) => { /// println!("Received {} events", logs.len()); /// } - /// Message::Notification(notification) => { + /// Ok(Message::Notification(notification)) => { /// println!("Notification received: {:?}", notification); /// // You'll see Notification::SwitchingToLive when transitioning /// } - /// Message::Error(e) => { + /// Err(e) => { /// eprintln!("Error: {}", e); /// } /// } @@ -143,14 +143,14 @@ impl EventScannerBuilder { /// /// while let Some(msg) = stream.next().await { /// match msg { - /// Message::Data(logs) => { + /// Ok(Message::Data(logs)) => { /// println!("Received {} events", logs.len()); /// } - /// Message::Notification(notification) => { + /// Ok(Message::Notification(notification)) => { /// println!("Notification received: {:?}", notification); /// // You'll see Notification::SwitchingToLive when transitioning /// } - /// Message::Error(e) => { + /// Err(e) => { /// eprintln!("Error: {}", e); /// } /// } From c538cb3c09582b536fc5b46a3e98d3c2cb23a6fa Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 17:28:06 +0900 Subject: [PATCH 09/26] doc: update to use result --- src/event_scanner/scanner/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/event_scanner/scanner/mod.rs b/src/event_scanner/scanner/mod.rs index a32ce52..a67ea18 100644 --- a/src/event_scanner/scanner/mod.rs +++ b/src/event_scanner/scanner/mod.rs @@ -93,7 +93,7 @@ impl EventScannerBuilder { /// /// scanner.start().await?; /// - /// while let Some(Message::Data(logs)) = stream.next().await { + /// while let Some(Ok(Message::Data(logs))) = stream.next().await { /// println!("Received {} logs", logs.len()); /// } /// # Ok(()) @@ -164,13 +164,13 @@ impl EventScannerBuilder { /// /// while let Some(msg) = stream.next().await { /// match msg { - /// Message::Data(logs) => { + /// Ok(Message::Data(logs)) => { /// println!("Received {} new events", logs.len()); /// } - /// Message::Notification(notification) => { + /// Ok(Message::Notification(notification)) => { /// println!("Notification received: {:?}", notification); /// } - /// Message::Error(e) => { + /// Err(e) => { /// eprintln!("Error: {}", e); /// } /// } @@ -248,7 +248,7 @@ impl EventScannerBuilder { /// scanner.start().await?; /// /// // Expect a single message with up to 10 logs, then the stream ends - /// while let Some(Message::Data(logs)) = stream.next().await { + /// while let Some(Ok(Message::Data(logs))) = stream.next().await { /// println!("Latest logs: {}", logs.len()); /// } /// # Ok(()) From 7e8299f3ae33283ce987d5693fa545af8f65925d Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 17:28:18 +0900 Subject: [PATCH 10/26] test: update to use result message stream --- tests/common/setup_scanner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/common/setup_scanner.rs b/tests/common/setup_scanner.rs index 7e9fdf9..8280d34 100644 --- a/tests/common/setup_scanner.rs +++ b/tests/common/setup_scanner.rs @@ -7,7 +7,7 @@ use alloy::{ use alloy_node_bindings::AnvilInstance; use event_scanner::{ EventFilter, EventScanner, EventScannerBuilder, Historic, LatestEvents, Live, Message, - SyncFromBlock, SyncFromLatestEvents, robust_provider::RobustProvider, + ScannerError, SyncFromBlock, SyncFromLatestEvents, robust_provider::RobustProvider, }; use tokio_stream::wrappers::ReceiverStream; @@ -24,7 +24,7 @@ where pub provider: RobustProvider, pub contract: TestCounter::TestCounterInstance

, pub scanner: S, - pub stream: ReceiverStream, + pub stream: ReceiverStream>, #[allow(dead_code)] pub anvil: AnvilInstance, } From 1ac2c736bccfa3b8a2f69d3a6cfb2e89237a89a2 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 17:28:37 +0900 Subject: [PATCH 11/26] ref: doc --- src/block_range_scanner.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index c2205e4..a6b769d 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -35,10 +35,13 @@ //! //! while let Some(message) = stream.next().await { //! match message { -//! Message::Data(range) => { +//! Ok(Message::Data(range)) => { //! // process range //! } -//! Message::Error(e) => { +//! Ok(Message::Notification(notification)) => { +//! info!("Received notification: {:?}", notification); +//! } +//! Err(e) => { //! error!("Received error from subscription: {e}"); //! match e { //! ScannerError::ServiceShutdown => break, @@ -47,9 +50,6 @@ //! } //! } //! } -//! Message::Notification(notification) => { -//! info!("Received notification: {:?}", notification); -//! } //! } //! } //! From e57955fdf26d3c0e809027d83034bb61afa51051 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 17:31:03 +0900 Subject: [PATCH 12/26] feat: update macro to use result --- src/test_utils/macros.rs | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/test_utils/macros.rs b/src/test_utils/macros.rs index adcb835..38a96af 100644 --- a/src/test_utils/macros.rs +++ b/src/test_utils/macros.rs @@ -1,7 +1,7 @@ use alloy::primitives::LogData; use tokio_stream::Stream; -use crate::Message; +use crate::{Message, ScannerError}; #[macro_export] macro_rules! assert_next { @@ -35,16 +35,13 @@ macro_rules! assert_next { .expect("timed out"); let expected = $expected; match message { - // Some(Ok($crate::ScannerMessage::Data(data))) => { - // assert_eq!(data, expected, "Expected Data({:?}), got Data({:?})", expected, - // data); } - Some(Ok(msg)) => { + std::option::Option::Some(std::result::Result::Ok(msg)) => { assert_eq!(msg, expected, "Expected {:?}, got {:?}", expected, msg); } - Some(Err(e)) => { + std::option::Option::Some(std::result::Result::Err(e)) => { panic!("Expected Ok({:?}), got Err({:?})", expected, e); } - None => { + std::option::Option::None => { panic!("Expected Ok({:?}), but channel was closed", expected); } } @@ -190,7 +187,7 @@ macro_rules! assert_event_sequence_final { } #[allow(clippy::missing_panics_doc)] -pub async fn assert_event_sequence + Unpin>( +pub async fn assert_event_sequence> + Unpin>( stream: &mut S, expected_options: impl IntoIterator, timeout_secs: u64, @@ -214,7 +211,7 @@ pub async fn assert_event_sequence + Unpin>( .expect("timed out waiting for next batch"); match message { - Some(Message::Data(batch)) => { + Some(Ok(Message::Data(batch))) => { let mut batch = batch.iter(); let event = batch.next().expect("Streamed batch should not be empty"); assert_eq!( @@ -233,9 +230,12 @@ pub async fn assert_event_sequence + Unpin>( ); } } - Some(other) => { + Some(Ok(other)) => { panic!("Expected Message::Data, got: {other:#?}"); } + Some(Err(e)) => { + panic!("Expected Ok(Message::Data), got Err: {e:#?}"); + } None => { panic!("Stream closed while still expecting: {:#?}", remaining.collect::>()); } @@ -332,7 +332,7 @@ macro_rules! assert_range_coverage { .expect("Timed out waiting for the next block range"); match message { - Some( $crate::block_range_scanner::Message::Data(range)) => { + std::option::Option::Some(std::result::Result::Ok($crate::block_range_scanner::Message::Data(range))) => { let (streamed_start, streamed_end) = bounds(&range); streamed_ranges.push(range.clone()); assert!( @@ -344,10 +344,13 @@ macro_rules! assert_range_coverage { ); start = streamed_end + 1; } - Some(other) => { + std::option::Option::Some(std::result::Result::Ok(other)) => { panic!("Expected a block range, got: {other:#?}"); } - None => { + std::option::Option::Some(std::result::Result::Err(e)) => { + panic!("Expected Ok(Message::Data), got Err: {e:#?}"); + } + std::option::Option::None => { panic!("Stream closed without covering range: {:#?}", start..=end); } } From e09f138dc2aa7303871432ac1893724a8981f8c9 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 17:41:29 +0900 Subject: [PATCH 13/26] ref: use more internal fn in common.rs --- src/event_scanner/scanner/common.rs | 149 ++++++++++++++++++---------- 1 file changed, 98 insertions(+), 51 deletions(-) diff --git a/src/event_scanner/scanner/common.rs b/src/event_scanner/scanner/common.rs index 86d0740..be40963 100644 --- a/src/event_scanner/scanner/common.rs +++ b/src/event_scanner/scanner/common.rs @@ -1,7 +1,7 @@ use std::ops::RangeInclusive; use crate::{ - ScannerError, + Message, ScannerError, block_range_scanner::{MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage}, event_scanner::{filter::EventFilter, listener::EventListener}, robust_provider::{Error as RobustProviderError, RobustProvider}, @@ -12,7 +12,10 @@ use alloy::{ rpc::types::{Filter, Log}, }; use tokio::{ - sync::broadcast::{self, Sender, error::RecvError}, + sync::{ + broadcast::{self, Sender, error::RecvError}, + mpsc, + }, task::JoinSet, }; use tokio_stream::{Stream, StreamExt}; @@ -98,55 +101,18 @@ pub fn spawn_log_consumers( loop { match range_rx.recv().await { Ok(message) => { - match message { - Ok(BlockRangeMessage::Data(range)) => { - match get_logs(range, &filter, &base_filter, &provider).await { - Ok(logs) => { - if logs.is_empty() { - continue; - } - - match mode { - ConsumerMode::Stream => { - if !sender.try_stream(logs).await { - break; - } - } - ConsumerMode::CollectLatest { count } => { - let take = count.saturating_sub(collected.len()); - // if we have enough logs, break - if take == 0 { - break; - } - // take latest within this range - collected.extend(logs.into_iter().rev().take(take)); - // if we have enough logs, break - if collected.len() == count { - break; - } - } - } - } - Err(e) => { - error!(error = ?e, "Received error message"); - if !sender.try_stream_err(e).await { - break; - } - } - } - } - Ok(BlockRangeMessage::Notification(notification)) => { - info!(notification = ?notification, "Received notification"); - if !sender.try_stream(notification).await { - break; - } - } - Err(e) => { - error!(error = ?e, "Received error message"); - if !sender.try_stream_err(e).await { - break; - } - } + if !handle_block_range_message( + message, + &filter, + &base_filter, + &provider, + &sender, + mode, + &mut collected, + ) + .await + { + break; } } Err(RecvError::Closed) => { @@ -206,3 +172,84 @@ async fn get_logs( } } } + +#[must_use] +async fn handle_block_range_message( + message: Result, + filter: &EventFilter, + base_filter: &Filter, + provider: &RobustProvider, + sender: &mpsc::Sender>, + mode: ConsumerMode, + collected: &mut Vec, +) -> bool { + match message { + Ok(BlockRangeMessage::Data(range)) => { + if !handle_block_range(range, filter, base_filter, provider, sender, mode, collected) + .await + { + return false; + } + } + Ok(BlockRangeMessage::Notification(notification)) => { + info!(notification = ?notification, "Received notification"); + if !sender.try_stream(notification).await { + return false; + } + } + Err(e) => { + error!(error = ?e, "Received error message"); + if !sender.try_stream_err(e).await { + return false; + } + } + } + true +} + +#[must_use] +async fn handle_block_range( + range: RangeInclusive, + filter: &EventFilter, + base_filter: &Filter, + provider: &RobustProvider, + sender: &mpsc::Sender>, + mode: ConsumerMode, + collected: &mut Vec, +) -> bool { + match get_logs(range, filter, base_filter, provider).await { + Ok(logs) => { + if logs.is_empty() { + return true; + } + + match mode { + ConsumerMode::Stream => { + if !sender.try_stream(logs).await { + return false; + } + } + ConsumerMode::CollectLatest { count } => { + let take = count.saturating_sub(collected.len()); + // if we have enough logs, break + if take == 0 { + return false; + } + // take latest within this range + collected.extend(logs.into_iter().rev().take(take)); + // if we have enough logs, break + if collected.len() == count { + return false; + } + } + } + } + Err(e) => { + error!(error = ?e, "Received error message"); + if !sender.try_stream_err(e).await { + return false; + } + } + } + true +} From 873a33c795bffaf40f001222e4662d724c0f1471 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 20:54:41 +0900 Subject: [PATCH 14/26] ref: reorder match in example --- examples/sync_from_block_scanning/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/sync_from_block_scanning/main.rs b/examples/sync_from_block_scanning/main.rs index 84e94b4..48ac359 100644 --- a/examples/sync_from_block_scanning/main.rs +++ b/examples/sync_from_block_scanning/main.rs @@ -98,12 +98,12 @@ async fn main() -> anyhow::Result<()> { } } } - Err(e) => { - error!("Received error: {}", e); - } Ok(Message::Notification(info)) => { info!("Received notification: {:?}", info); } + Err(e) => { + error!("Received error: {}", e); + } } if historical_processed && live_processed { From 54f226893ddb455e9d9c1c66bd4223e3a0def0ab Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 21:20:29 +0900 Subject: [PATCH 15/26] feat: delete stream err --- src/block_range_scanner.rs | 28 +++++------ src/event_scanner/scanner/common.rs | 10 ++-- src/types.rs | 75 ++++++++--------------------- 3 files changed, 40 insertions(+), 73 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index d838243..9d6b0b7 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -69,7 +69,7 @@ use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use crate::{ ScannerError, ScannerMessage, robust_provider::{Error as RobustProviderError, IntoRobustProvider, RobustProvider}, - types::{Notification, TryStream, TryStreamError}, + types::{Notification, TryStream}, }; use alloy::{ consensus::BlockHeader, @@ -93,8 +93,8 @@ pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 64; pub type Message = ScannerMessage>; impl From> for Message { - fn from(logs: RangeInclusive) -> Self { - Message::Data(logs) + fn from(range: RangeInclusive) -> Self { + Message::Data(range) } } @@ -491,7 +491,7 @@ impl Service { let batch_to = batch_from.saturating_sub(max_block_range - 1).max(to); // stream the range regularly, i.e. from smaller block number to greater - if !sender.try_stream(batch_to..=batch_from).await { + if !sender.try_stream(Message::Data(batch_to..=batch_from)).await { break; } @@ -513,7 +513,7 @@ impl Service { } Err(e) => { error!(error = %e, "Terminal RPC call error, shutting down"); - _ = sender.try_stream_err(e).await; + _ = sender.try_stream(e).await; return; } }; @@ -535,7 +535,7 @@ impl Service { } Err(e) => { error!(error = %e, "Terminal RPC call error, shutting down"); - _ = sender.try_stream_err(e).await; + _ = sender.try_stream(ScannerError::from(e)).await; return; } }; @@ -565,7 +565,7 @@ impl Service { let batch_end_block_number = next_start_block.saturating_add(max_block_range - 1).min(end); - if !sender.try_stream(next_start_block..=batch_end_block_number).await { + if !sender.try_stream(Message::Data(next_start_block..=batch_end_block_number)).await { break; } @@ -623,7 +623,7 @@ impl Service { info!(range_start = range_start, range_end = range_end, "Sending live block range"); - if !sender.try_stream(range_start..=range_end).await { + if !sender.try_stream(Message::Data(range_start..=range_end)).await { return; } @@ -647,7 +647,7 @@ impl Service { Ok(Message::Data(range)) => { let (start, end) = (*range.start(), *range.end()); if start >= cutoff { - if !sender.try_stream(range).await { + if !sender.try_stream(Message::Data(range)).await { break; } processed += end - start; @@ -655,7 +655,7 @@ impl Service { discarded += cutoff - start; let start = cutoff; - if !sender.try_stream(start..=end).await { + if !sender.try_stream(Message::Data(start..=end)).await { break; } processed += end - start; @@ -669,7 +669,7 @@ impl Service { } } Err(e) => { - if !sender.try_stream_err(e).await { + if !sender.try_stream(e).await { break; } } @@ -939,13 +939,13 @@ mod tests { #[tokio::test] async fn try_send_forwards_errors_to_subscribers() { - let (tx, mut rx) = mpsc::channel::(1); + let (tx, mut rx) = mpsc::channel::>(1); - _ = tx.try_stream_err(ScannerError::BlockNotFound(4.into())).await; + _ = tx.try_stream(ScannerError::BlockNotFound(4.into())).await; assert!(matches!( rx.recv().await, - Some(ScannerError::BlockNotFound(BlockId::Number(BlockNumberOrTag::Number(4)))) + Some(Err(ScannerError::BlockNotFound(BlockId::Number(BlockNumberOrTag::Number(4))))) )); } } diff --git a/src/event_scanner/scanner/common.rs b/src/event_scanner/scanner/common.rs index be40963..25aa1c3 100644 --- a/src/event_scanner/scanner/common.rs +++ b/src/event_scanner/scanner/common.rs @@ -5,7 +5,7 @@ use crate::{ block_range_scanner::{MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage}, event_scanner::{filter::EventFilter, listener::EventListener}, robust_provider::{Error as RobustProviderError, RobustProvider}, - types::{TryStream, TryStreamError}, + types::TryStream, }; use alloy::{ network::Network, @@ -129,7 +129,7 @@ pub fn spawn_log_consumers( } info!("Sending collected logs to consumer"); - _ = sender.try_stream(collected).await; + _ = sender.try_stream(Message::Data(collected)).await; } }); @@ -199,7 +199,7 @@ async fn handle_block_range_message( } Err(e) => { error!(error = ?e, "Received error message"); - if !sender.try_stream_err(e).await { + if !sender.try_stream(e).await { return false; } } @@ -225,7 +225,7 @@ async fn handle_block_range( match mode { ConsumerMode::Stream => { - if !sender.try_stream(logs).await { + if !sender.try_stream(Message::Data(logs)).await { return false; } } @@ -246,7 +246,7 @@ async fn handle_block_range( } Err(e) => { error!(error = ?e, "Received error message"); - if !sender.try_stream_err(e).await { + if !sender.try_stream(ScannerError::from(e)).await { return false; } } diff --git a/src/types.rs b/src/types.rs index e4cc43d..4b5cf0e 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,4 +1,4 @@ -use std::{error::Error, fmt::Debug}; +use std::fmt::Debug; use tokio::sync::mpsc; use tracing::{info, warn}; @@ -33,69 +33,36 @@ impl PartialEq for ScannerMessage { } } -pub(crate) trait TryStream { - async fn try_stream(&self, item: M) -> bool; -} - -pub(crate) trait TryStreamError { - async fn try_stream_err(&self, error: E) -> bool; +impl From> for Result, ScannerError> { + fn from(value: ScannerMessage) -> Self { + Ok(value) + } } -impl TryStream for mpsc::Sender> -where - M: Into>, -{ - async fn try_stream(&self, item: M) -> bool { - let item = item.into(); - info!(item = ?item, "Sending message"); - if let Err(err) = self.send(item).await { - warn!(error = %err, "Downstream channel closed, stopping stream"); - return false; - } - true +impl From for Result, ScannerError> { + fn from(value: ScannerError) -> Self { + Err(value) } } -impl TryStream for mpsc::Sender, ScannerError>> -where - M: Into>, -{ - async fn try_stream(&self, item: M) -> bool { - let item = item.into(); - let item = Ok(item); - info!(item = ?item, "Sending message"); - if let Err(err) = self.send(item).await { - warn!(error = %err, "Downstream channel closed, stopping stream"); - return false; - } - true +impl From for Result, ScannerError> { + fn from(value: Notification) -> Self { + Ok(value.into()) } } -impl TryStreamError for mpsc::Sender -where - E: Error + Clone + Into, -{ - async fn try_stream_err(&self, item: E) -> bool { - let item = item.into(); - info!(item = ?item, "Sending error"); - if let Err(err) = self.send(item).await { - warn!(error = %err, "Downstream channel closed, stopping stream"); - return false; - } - true - } +pub(crate) trait TryStream { + async fn try_stream, ScannerError>>>(&self, msg: M) -> bool; } -impl TryStreamError - for mpsc::Sender, ScannerError>> -where - E: Error + Clone + Into, -{ - async fn try_stream_err(&self, item: E) -> bool { - let item = item.into(); - info!(item = ?item, "Sending error"); - if let Err(err) = self.send(Err(item)).await { +impl TryStream for mpsc::Sender, ScannerError>> { + async fn try_stream, ScannerError>>>(&self, msg: M) -> bool { + let item = msg.into(); + match &item { + Ok(msg) => info!(item = ?msg, "Sending message"), + Err(err) => info!(error = ?err, "Sending error"), + } + if let Err(err) = self.send(item).await { warn!(error = %err, "Downstream channel closed, stopping stream"); return false; } From 93b323677e5f042cfef363491156a048d30d70cb Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 21:49:41 +0900 Subject: [PATCH 16/26] feat: remove need for Message wrapping --- src/block_range_scanner.rs | 19 +++++++++++----- src/event_scanner/message.rs | 11 +++++++++- src/event_scanner/scanner/common.rs | 4 ++-- src/types.rs | 34 +++++++++++++++++++---------- 4 files changed, 47 insertions(+), 21 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 9d6b0b7..4ca0b31 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -69,8 +69,9 @@ use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use crate::{ ScannerError, ScannerMessage, robust_provider::{Error as RobustProviderError, IntoRobustProvider, RobustProvider}, - types::{Notification, TryStream}, + types::{IntoScannerMessageResult, Notification, TryStream}, }; + use alloy::{ consensus::BlockHeader, eips::BlockNumberOrTag, @@ -104,6 +105,12 @@ impl PartialEq> for Message { } } +impl IntoScannerMessageResult> for RangeInclusive { + fn into_scanner_message_result(self) -> Result { + Ok(Message::Data(self)) + } +} + #[derive(Clone)] pub struct BlockRangeScanner { pub max_block_range: u64, @@ -491,7 +498,7 @@ impl Service { let batch_to = batch_from.saturating_sub(max_block_range - 1).max(to); // stream the range regularly, i.e. from smaller block number to greater - if !sender.try_stream(Message::Data(batch_to..=batch_from)).await { + if !sender.try_stream(batch_to..=batch_from).await { break; } @@ -565,7 +572,7 @@ impl Service { let batch_end_block_number = next_start_block.saturating_add(max_block_range - 1).min(end); - if !sender.try_stream(Message::Data(next_start_block..=batch_end_block_number)).await { + if !sender.try_stream(next_start_block..=batch_end_block_number).await { break; } @@ -623,7 +630,7 @@ impl Service { info!(range_start = range_start, range_end = range_end, "Sending live block range"); - if !sender.try_stream(Message::Data(range_start..=range_end)).await { + if !sender.try_stream(range_start..=range_end).await { return; } @@ -647,7 +654,7 @@ impl Service { Ok(Message::Data(range)) => { let (start, end) = (*range.start(), *range.end()); if start >= cutoff { - if !sender.try_stream(Message::Data(range)).await { + if !sender.try_stream(range).await { break; } processed += end - start; @@ -655,7 +662,7 @@ impl Service { discarded += cutoff - start; let start = cutoff; - if !sender.try_stream(Message::Data(start..=end)).await { + if !sender.try_stream(start..=end).await { break; } processed += end - start; diff --git a/src/event_scanner/message.rs b/src/event_scanner/message.rs index 252f5fc..d54e9fa 100644 --- a/src/event_scanner/message.rs +++ b/src/event_scanner/message.rs @@ -1,6 +1,9 @@ use alloy::{rpc::types::Log, sol_types::SolEvent}; -use crate::ScannerMessage; +use crate::{ + ScannerError, ScannerMessage, + types::IntoScannerMessageResult, +}; pub type Message = ScannerMessage>; @@ -10,6 +13,12 @@ impl From> for Message { } } +impl IntoScannerMessageResult> for Vec { + fn into_scanner_message_result(self) -> Result { + Ok(Message::Data(self)) + } +} + impl PartialEq> for Message { fn eq(&self, other: &Vec) -> bool { self.eq(&other.as_slice()) diff --git a/src/event_scanner/scanner/common.rs b/src/event_scanner/scanner/common.rs index 25aa1c3..66a4c75 100644 --- a/src/event_scanner/scanner/common.rs +++ b/src/event_scanner/scanner/common.rs @@ -129,7 +129,7 @@ pub fn spawn_log_consumers( } info!("Sending collected logs to consumer"); - _ = sender.try_stream(Message::Data(collected)).await; + _ = sender.try_stream(collected).await; } }); @@ -225,7 +225,7 @@ async fn handle_block_range( match mode { ConsumerMode::Stream => { - if !sender.try_stream(Message::Data(logs)).await { + if !sender.try_stream(logs).await { return false; } } diff --git a/src/types.rs b/src/types.rs index 4b5cf0e..0821db3 100644 --- a/src/types.rs +++ b/src/types.rs @@ -33,31 +33,41 @@ impl PartialEq for ScannerMessage { } } -impl From> for Result, ScannerError> { - fn from(value: ScannerMessage) -> Self { - Ok(value) +pub trait IntoScannerMessageResult { + fn into_scanner_message_result(self) -> Result, ScannerError>; +} + +impl IntoScannerMessageResult for Result, ScannerError> { + fn into_scanner_message_result(self) -> Result, ScannerError> { + self } } -impl From for Result, ScannerError> { - fn from(value: ScannerError) -> Self { - Err(value) +impl IntoScannerMessageResult for ScannerMessage { + fn into_scanner_message_result(self) -> Result, ScannerError> { + Ok(self) } } -impl From for Result, ScannerError> { - fn from(value: Notification) -> Self { - Ok(value.into()) +impl IntoScannerMessageResult for ScannerError { + fn into_scanner_message_result(self) -> Result, ScannerError> { + Err(self) + } +} + +impl IntoScannerMessageResult for Notification { + fn into_scanner_message_result(self) -> Result, ScannerError> { + Ok(ScannerMessage::Notification(self)) } } pub(crate) trait TryStream { - async fn try_stream, ScannerError>>>(&self, msg: M) -> bool; + async fn try_stream>(&self, msg: M) -> bool; } impl TryStream for mpsc::Sender, ScannerError>> { - async fn try_stream, ScannerError>>>(&self, msg: M) -> bool { - let item = msg.into(); + async fn try_stream>(&self, msg: M) -> bool { + let item = msg.into_scanner_message_result(); match &item { Ok(msg) => info!(item = ?msg, "Sending message"), Err(err) => info!(error = ?err, "Sending error"), From fe6098402aa271ca6aa1452217b8174bf138dd55 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 21:55:51 +0900 Subject: [PATCH 17/26] feat: remove need for scanner from --- src/block_range_scanner.rs | 2 +- src/event_scanner/scanner/common.rs | 2 +- src/types.rs | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 4ca0b31..a5c0a18 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -542,7 +542,7 @@ impl Service { } Err(e) => { error!(error = %e, "Terminal RPC call error, shutting down"); - _ = sender.try_stream(ScannerError::from(e)).await; + _ = sender.try_stream(e).await; return; } }; diff --git a/src/event_scanner/scanner/common.rs b/src/event_scanner/scanner/common.rs index 66a4c75..ace7f66 100644 --- a/src/event_scanner/scanner/common.rs +++ b/src/event_scanner/scanner/common.rs @@ -246,7 +246,7 @@ async fn handle_block_range( } Err(e) => { error!(error = ?e, "Received error message"); - if !sender.try_stream(ScannerError::from(e)).await { + if !sender.try_stream(e).await { return false; } } diff --git a/src/types.rs b/src/types.rs index 0821db3..ac6d90e 100644 --- a/src/types.rs +++ b/src/types.rs @@ -49,9 +49,9 @@ impl IntoScannerMessageResult for ScannerMessage { } } -impl IntoScannerMessageResult for ScannerError { +impl> IntoScannerMessageResult for E { fn into_scanner_message_result(self) -> Result, ScannerError> { - Err(self) + Err(self.into()) } } From 0d3ceaa309b035fd00d6ddac3492c965ca4a218e Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 22:03:08 +0900 Subject: [PATCH 18/26] fix: merge --- src/block_range_scanner.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 53e3259..5234bd0 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -182,19 +182,19 @@ pub enum Command { response: oneshot::Sender>, }, StreamHistorical { - sender: mpsc::Sender, + sender: mpsc::Sender>, start_id: BlockId, end_id: BlockId, response: oneshot::Sender>, }, StreamFrom { - sender: mpsc::Sender, + sender: mpsc::Sender>, start_id: BlockId, block_confirmations: u64, response: oneshot::Sender>, }, Rewind { - sender: mpsc::Sender, + sender: mpsc::Sender>, start_id: BlockId, end_id: BlockId, response: oneshot::Sender>, @@ -307,7 +307,7 @@ impl Service { &mut self, start_id: BlockId, end_id: BlockId, - sender: mpsc::Sender, + sender: mpsc::Sender>, ) -> Result<(), ScannerError> { let max_block_range = self.max_block_range; @@ -444,7 +444,7 @@ impl Service { &mut self, start_id: BlockId, end_id: BlockId, - sender: mpsc::Sender, + sender: mpsc::Sender>, ) -> Result<(), ScannerError> { let max_block_range = self.max_block_range; let provider = self.provider.clone(); @@ -752,7 +752,7 @@ impl BlockRangeScannerClient { &self, start_id: impl Into, end_id: impl Into, - ) -> Result, ScannerError> { + ) -> Result>, ScannerError> { let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); let (response_tx, response_rx) = oneshot::channel(); @@ -816,7 +816,7 @@ impl BlockRangeScannerClient { &self, start_id: impl Into, end_id: impl Into, - ) -> Result, ScannerError> { + ) -> Result>, ScannerError> { let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); let (response_tx, response_rx) = oneshot::channel(); From 1eb52c2a9cac93e48178e0d8670bc05856aa04c6 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 20 Nov 2025 22:04:29 +0900 Subject: [PATCH 19/26] fix: format --- src/event_scanner/message.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/event_scanner/message.rs b/src/event_scanner/message.rs index d54e9fa..aeed903 100644 --- a/src/event_scanner/message.rs +++ b/src/event_scanner/message.rs @@ -1,9 +1,6 @@ use alloy::{rpc::types::Log, sol_types::SolEvent}; -use crate::{ - ScannerError, ScannerMessage, - types::IntoScannerMessageResult, -}; +use crate::{ScannerError, ScannerMessage, types::IntoScannerMessageResult}; pub type Message = ScannerMessage>; From b76bacad46c378647729bea60f83e2dbbe3bab00 Mon Sep 17 00:00:00 2001 From: Leo Date: Fri, 21 Nov 2025 20:48:10 +0900 Subject: [PATCH 20/26] ref: revert match --- src/block_range_scanner.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 5234bd0..86a7f11 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -666,13 +666,9 @@ impl Service { discarded += end - start; } } - Ok(Message::Notification(notif)) => { - if !sender.try_stream(notif).await { - break; - } - } - Err(e) => { - if !sender.try_stream(e).await { + other => { + // Could be error or notification + if !sender.try_stream(other).await { break; } } From cfa0b1aa603a92ca1938278bf81b78013869114a Mon Sep 17 00:00:00 2001 From: Leo Date: Fri, 21 Nov 2025 20:48:52 +0900 Subject: [PATCH 21/26] ref: change name --- src/block_range_scanner.rs | 4 ++-- src/event_scanner/message.rs | 4 ++-- src/types.rs | 14 +++++++------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 86a7f11..9d1443e 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -69,7 +69,7 @@ use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use crate::{ ScannerError, ScannerMessage, robust_provider::{Error as RobustProviderError, IntoRobustProvider, RobustProvider}, - types::{IntoScannerMessageResult, Notification, TryStream}, + types::{IntoScannerResult, Notification, TryStream}, }; use alloy::{ @@ -105,7 +105,7 @@ impl PartialEq> for Message { } } -impl IntoScannerMessageResult> for RangeInclusive { +impl IntoScannerResult> for RangeInclusive { fn into_scanner_message_result(self) -> Result { Ok(Message::Data(self)) } diff --git a/src/event_scanner/message.rs b/src/event_scanner/message.rs index aeed903..63556cb 100644 --- a/src/event_scanner/message.rs +++ b/src/event_scanner/message.rs @@ -1,6 +1,6 @@ use alloy::{rpc::types::Log, sol_types::SolEvent}; -use crate::{ScannerError, ScannerMessage, types::IntoScannerMessageResult}; +use crate::{ScannerError, ScannerMessage, types::IntoScannerResult}; pub type Message = ScannerMessage>; @@ -10,7 +10,7 @@ impl From> for Message { } } -impl IntoScannerMessageResult> for Vec { +impl IntoScannerResult> for Vec { fn into_scanner_message_result(self) -> Result { Ok(Message::Data(self)) } diff --git a/src/types.rs b/src/types.rs index ac6d90e..b315a13 100644 --- a/src/types.rs +++ b/src/types.rs @@ -33,40 +33,40 @@ impl PartialEq for ScannerMessage { } } -pub trait IntoScannerMessageResult { +pub trait IntoScannerResult { fn into_scanner_message_result(self) -> Result, ScannerError>; } -impl IntoScannerMessageResult for Result, ScannerError> { +impl IntoScannerResult for Result, ScannerError> { fn into_scanner_message_result(self) -> Result, ScannerError> { self } } -impl IntoScannerMessageResult for ScannerMessage { +impl IntoScannerResult for ScannerMessage { fn into_scanner_message_result(self) -> Result, ScannerError> { Ok(self) } } -impl> IntoScannerMessageResult for E { +impl> IntoScannerResult for E { fn into_scanner_message_result(self) -> Result, ScannerError> { Err(self.into()) } } -impl IntoScannerMessageResult for Notification { +impl IntoScannerResult for Notification { fn into_scanner_message_result(self) -> Result, ScannerError> { Ok(ScannerMessage::Notification(self)) } } pub(crate) trait TryStream { - async fn try_stream>(&self, msg: M) -> bool; + async fn try_stream>(&self, msg: M) -> bool; } impl TryStream for mpsc::Sender, ScannerError>> { - async fn try_stream>(&self, msg: M) -> bool { + async fn try_stream>(&self, msg: M) -> bool { let item = msg.into_scanner_message_result(); match &item { Ok(msg) => info!(item = ?msg, "Sending message"), From d1924a32bd06eb46ab58d4fc12ca9c4f68975f2c Mon Sep 17 00:00:00 2001 From: Leo Date: Fri, 21 Nov 2025 20:58:31 +0900 Subject: [PATCH 22/26] feat: use type alias for scanner result --- src/types.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/types.rs b/src/types.rs index b315a13..251f3ae 100644 --- a/src/types.rs +++ b/src/types.rs @@ -33,30 +33,32 @@ impl PartialEq for ScannerMessage { } } +pub type ScannerResult = Result, ScannerError>; + pub trait IntoScannerResult { - fn into_scanner_message_result(self) -> Result, ScannerError>; + fn into_scanner_message_result(self) -> ScannerResult; } -impl IntoScannerResult for Result, ScannerError> { - fn into_scanner_message_result(self) -> Result, ScannerError> { +impl IntoScannerResult for ScannerResult { + fn into_scanner_message_result(self) -> ScannerResult { self } } impl IntoScannerResult for ScannerMessage { - fn into_scanner_message_result(self) -> Result, ScannerError> { + fn into_scanner_message_result(self) -> ScannerResult { Ok(self) } } impl> IntoScannerResult for E { - fn into_scanner_message_result(self) -> Result, ScannerError> { + fn into_scanner_message_result(self) -> ScannerResult { Err(self.into()) } } impl IntoScannerResult for Notification { - fn into_scanner_message_result(self) -> Result, ScannerError> { + fn into_scanner_message_result(self) -> ScannerResult { Ok(ScannerMessage::Notification(self)) } } @@ -65,7 +67,7 @@ pub(crate) trait TryStream { async fn try_stream>(&self, msg: M) -> bool; } -impl TryStream for mpsc::Sender, ScannerError>> { +impl TryStream for mpsc::Sender> { async fn try_stream>(&self, msg: M) -> bool { let item = msg.into_scanner_message_result(); match &item { From a62ba569d8fcb4305a66a43baa38b6ea0ab7114a Mon Sep 17 00:00:00 2001 From: Leo Date: Fri, 21 Nov 2025 21:16:10 +0900 Subject: [PATCH 23/26] ref: add better type alias --- src/block_range_scanner.rs | 46 ++++++++++--------- src/event_scanner/listener.rs | 7 +-- src/event_scanner/message.rs | 8 +++- src/event_scanner/mod.rs | 2 +- src/event_scanner/scanner/common.rs | 26 +++++------ src/event_scanner/scanner/mod.rs | 12 ++--- src/event_scanner/scanner/sync/from_latest.rs | 8 ++-- src/lib.rs | 4 +- src/test_utils/macros.rs | 6 +-- src/types.rs | 2 +- tests/common/setup_scanner.rs | 6 +-- 11 files changed, 61 insertions(+), 66 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 9d1443e..10f9831 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -69,7 +69,7 @@ use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use crate::{ ScannerError, ScannerMessage, robust_provider::{Error as RobustProviderError, IntoRobustProvider, RobustProvider}, - types::{IntoScannerResult, Notification, TryStream}, + types::{IntoScannerResult, Notification, ScannerResult, TryStream}, }; use alloy::{ @@ -91,7 +91,9 @@ pub const MAX_BUFFERED_MESSAGES: usize = 50000; // is considered final) pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 64; -pub type Message = ScannerMessage>; +pub type BlockScannerResult = ScannerResult>; + +type Message = ScannerMessage>; impl From> for Message { fn from(range: RangeInclusive) -> Self { @@ -106,7 +108,7 @@ impl PartialEq> for Message { } impl IntoScannerResult> for RangeInclusive { - fn into_scanner_message_result(self) -> Result { + fn into_scanner_message_result(self) -> BlockScannerResult { Ok(Message::Data(self)) } } @@ -177,24 +179,24 @@ impl ConnectedBlockRangeScanner { #[derive(Debug)] pub enum Command { StreamLive { - sender: mpsc::Sender>, + sender: mpsc::Sender, block_confirmations: u64, response: oneshot::Sender>, }, StreamHistorical { - sender: mpsc::Sender>, + sender: mpsc::Sender, start_id: BlockId, end_id: BlockId, response: oneshot::Sender>, }, StreamFrom { - sender: mpsc::Sender>, + sender: mpsc::Sender, start_id: BlockId, block_confirmations: u64, response: oneshot::Sender>, }, Rewind { - sender: mpsc::Sender>, + sender: mpsc::Sender, start_id: BlockId, end_id: BlockId, response: oneshot::Sender>, @@ -275,7 +277,7 @@ impl Service { async fn handle_live( &mut self, block_confirmations: u64, - sender: mpsc::Sender>, + sender: mpsc::Sender, ) -> Result<(), ScannerError> { let max_block_range = self.max_block_range; let latest = self.provider.get_block_number().await?; @@ -307,7 +309,7 @@ impl Service { &mut self, start_id: BlockId, end_id: BlockId, - sender: mpsc::Sender>, + sender: mpsc::Sender, ) -> Result<(), ScannerError> { let max_block_range = self.max_block_range; @@ -341,7 +343,7 @@ impl Service { &mut self, start_id: BlockId, block_confirmations: u64, - sender: mpsc::Sender>, + sender: mpsc::Sender, ) -> Result<(), ScannerError> { let provider = self.provider.clone(); let max_block_range = self.max_block_range; @@ -394,7 +396,7 @@ impl Service { // Step 2: Setup the live streaming buffer // This channel will accumulate while historical sync is running let (live_block_buffer_sender, live_block_buffer_receiver) = - mpsc::channel::>(MAX_BUFFERED_MESSAGES); + mpsc::channel::(MAX_BUFFERED_MESSAGES); // The cutoff is the last block we have synced historically // Any block > cutoff will come from the live stream @@ -444,7 +446,7 @@ impl Service { &mut self, start_id: BlockId, end_id: BlockId, - sender: mpsc::Sender>, + sender: mpsc::Sender, ) -> Result<(), ScannerError> { let max_block_range = self.max_block_range; let provider = self.provider.clone(); @@ -476,7 +478,7 @@ impl Service { from: N::BlockResponse, to: N::BlockResponse, max_block_range: u64, - sender: &mpsc::Sender>, + sender: &mpsc::Sender, provider: &RobustProvider, ) { let mut batch_count = 0; @@ -556,7 +558,7 @@ impl Service { start: BlockNumber, end: BlockNumber, max_block_range: u64, - sender: &mpsc::Sender>, + sender: &mpsc::Sender, ) { let mut batch_count = 0; @@ -593,7 +595,7 @@ impl Service { async fn stream_live_blocks( mut range_start: BlockNumber, subscription: Subscription, - sender: mpsc::Sender>, + sender: mpsc::Sender, block_confirmations: u64, max_block_range: u64, ) { @@ -637,8 +639,8 @@ impl Service { } async fn process_live_block_buffer( - mut buffer_rx: mpsc::Receiver>, - sender: mpsc::Sender>, + mut buffer_rx: mpsc::Receiver, + sender: mpsc::Sender, cutoff: BlockNumber, ) { let mut processed = 0; @@ -717,7 +719,7 @@ impl BlockRangeScannerClient { pub async fn stream_live( &self, block_confirmations: u64, - ) -> Result>, ScannerError> { + ) -> Result, ScannerError> { let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); let (response_tx, response_rx) = oneshot::channel(); @@ -748,7 +750,7 @@ impl BlockRangeScannerClient { &self, start_id: impl Into, end_id: impl Into, - ) -> Result>, ScannerError> { + ) -> Result, ScannerError> { let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); let (response_tx, response_rx) = oneshot::channel(); @@ -780,7 +782,7 @@ impl BlockRangeScannerClient { &self, start_id: impl Into, block_confirmations: u64, - ) -> Result>, ScannerError> { + ) -> Result, ScannerError> { let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); let (response_tx, response_rx) = oneshot::channel(); @@ -812,7 +814,7 @@ impl BlockRangeScannerClient { &self, start_id: impl Into, end_id: impl Into, - ) -> Result>, ScannerError> { + ) -> Result, ScannerError> { let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); let (response_tx, response_rx) = oneshot::channel(); @@ -941,7 +943,7 @@ mod tests { #[tokio::test] async fn try_send_forwards_errors_to_subscribers() { - let (tx, mut rx) = mpsc::channel::>(1); + let (tx, mut rx) = mpsc::channel::(1); _ = tx.try_stream(ScannerError::BlockNotFound(4.into())).await; diff --git a/src/event_scanner/listener.rs b/src/event_scanner/listener.rs index f76d4b3..10562b7 100644 --- a/src/event_scanner/listener.rs +++ b/src/event_scanner/listener.rs @@ -1,11 +1,8 @@ -use crate::{ - ScannerError, - event_scanner::{filter::EventFilter, message::Message}, -}; +use crate::event_scanner::{EventScannerResult, filter::EventFilter}; use tokio::sync::mpsc::Sender; #[derive(Clone)] pub(crate) struct EventListener { pub filter: EventFilter, - pub sender: Sender>, + pub sender: Sender, } diff --git a/src/event_scanner/message.rs b/src/event_scanner/message.rs index 63556cb..df61a21 100644 --- a/src/event_scanner/message.rs +++ b/src/event_scanner/message.rs @@ -1,8 +1,12 @@ use alloy::{rpc::types::Log, sol_types::SolEvent}; -use crate::{ScannerError, ScannerMessage, types::IntoScannerResult}; +use crate::{ + ScannerMessage, + types::{IntoScannerResult, ScannerResult}, +}; pub type Message = ScannerMessage>; +pub type EventScannerResult = ScannerResult>; impl From> for Message { fn from(logs: Vec) -> Self { @@ -11,7 +15,7 @@ impl From> for Message { } impl IntoScannerResult> for Vec { - fn into_scanner_message_result(self) -> Result { + fn into_scanner_message_result(self) -> EventScannerResult { Ok(Message::Data(self)) } } diff --git a/src/event_scanner/mod.rs b/src/event_scanner/mod.rs index 2da1657..74cd78a 100644 --- a/src/event_scanner/mod.rs +++ b/src/event_scanner/mod.rs @@ -4,7 +4,7 @@ mod message; mod scanner; pub use filter::EventFilter; -pub use message::Message; +pub use message::{EventScannerResult, Message}; pub use scanner::{ EventScanner, EventScannerBuilder, Historic, LatestEvents, Live, SyncFromBlock, SyncFromLatestEvents, diff --git a/src/event_scanner/scanner/common.rs b/src/event_scanner/scanner/common.rs index ace7f66..b30cbc8 100644 --- a/src/event_scanner/scanner/common.rs +++ b/src/event_scanner/scanner/common.rs @@ -1,9 +1,9 @@ use std::ops::RangeInclusive; use crate::{ - Message, ScannerError, - block_range_scanner::{MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage}, - event_scanner::{filter::EventFilter, listener::EventListener}, + ScannerMessage, + block_range_scanner::{BlockScannerResult, MAX_BUFFERED_MESSAGES}, + event_scanner::{EventScannerResult, filter::EventFilter, listener::EventListener}, robust_provider::{Error as RobustProviderError, RobustProvider}, types::TryStream, }; @@ -49,17 +49,13 @@ pub enum ConsumerMode { /// # Note /// /// Assumes it is running in a separate tokio task, so as to be non-blocking. -pub async fn handle_stream< - N: Network, - S: Stream> + Unpin, ->( +pub async fn handle_stream + Unpin>( mut stream: S, provider: &RobustProvider, listeners: &[EventListener], mode: ConsumerMode, ) { - let (range_tx, _) = - broadcast::channel::>(MAX_BUFFERED_MESSAGES); + let (range_tx, _) = broadcast::channel::(MAX_BUFFERED_MESSAGES); let consumers = spawn_log_consumers(provider, listeners, &range_tx, mode); @@ -81,7 +77,7 @@ pub async fn handle_stream< pub fn spawn_log_consumers( provider: &RobustProvider, listeners: &[EventListener], - range_tx: &Sender>, + range_tx: &Sender, mode: ConsumerMode, ) -> JoinSet<()> { listeners.iter().cloned().fold(JoinSet::new(), |mut set, listener| { @@ -175,23 +171,23 @@ async fn get_logs( #[must_use] async fn handle_block_range_message( - message: Result, + message: BlockScannerResult, filter: &EventFilter, base_filter: &Filter, provider: &RobustProvider, - sender: &mpsc::Sender>, + sender: &mpsc::Sender, mode: ConsumerMode, collected: &mut Vec, ) -> bool { match message { - Ok(BlockRangeMessage::Data(range)) => { + Ok(ScannerMessage::Data(range)) => { if !handle_block_range(range, filter, base_filter, provider, sender, mode, collected) .await { return false; } } - Ok(BlockRangeMessage::Notification(notification)) => { + Ok(ScannerMessage::Notification(notification)) => { info!(notification = ?notification, "Received notification"); if !sender.try_stream(notification).await { return false; @@ -213,7 +209,7 @@ async fn handle_block_range( filter: &EventFilter, base_filter: &Filter, provider: &RobustProvider, - sender: &mpsc::Sender>, + sender: &mpsc::Sender, mode: ConsumerMode, collected: &mut Vec, ) -> bool { diff --git a/src/event_scanner/scanner/mod.rs b/src/event_scanner/scanner/mod.rs index 75c8bb5..8b91bc5 100644 --- a/src/event_scanner/scanner/mod.rs +++ b/src/event_scanner/scanner/mod.rs @@ -6,12 +6,12 @@ use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use crate::{ - EventFilter, Message, ScannerError, + EventFilter, ScannerError, block_range_scanner::{ BlockRangeScanner, ConnectedBlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS, MAX_BUFFERED_MESSAGES, }, - event_scanner::listener::EventListener, + event_scanner::{EventScannerResult, listener::EventListener}, robust_provider::IntoRobustProvider, }; @@ -411,12 +411,8 @@ impl EventScannerBuilder { impl EventScanner { #[must_use] - pub fn subscribe( - &mut self, - filter: EventFilter, - ) -> ReceiverStream> { - let (sender, receiver) = - mpsc::channel::>(MAX_BUFFERED_MESSAGES); + pub fn subscribe(&mut self, filter: EventFilter) -> ReceiverStream { + let (sender, receiver) = mpsc::channel::(MAX_BUFFERED_MESSAGES); self.listeners.push(EventListener { filter, sender }); ReceiverStream::new(receiver) } diff --git a/src/event_scanner/scanner/sync/from_latest.rs b/src/event_scanner/scanner/sync/from_latest.rs index 25086b9..f34e1b8 100644 --- a/src/event_scanner/scanner/sync/from_latest.rs +++ b/src/event_scanner/scanner/sync/from_latest.rs @@ -9,8 +9,8 @@ use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use tracing::info; use crate::{ - EventScannerBuilder, Notification, ScannerError, - block_range_scanner::Message as BlockRangeMessage, + EventScannerBuilder, Notification, ScannerError, ScannerMessage, + block_range_scanner::BlockScannerResult, event_scanner::{ EventScanner, scanner::{ @@ -104,9 +104,9 @@ impl EventScanner { info!("Switching to live stream"); // Use a one-off channel for the notification. - let (tx, rx) = mpsc::channel::>(1); + let (tx, rx) = mpsc::channel::(1); let stream = ReceiverStream::new(rx); - tx.send(Ok(BlockRangeMessage::Notification(Notification::SwitchingToLive))) + tx.send(Ok(ScannerMessage::Notification(Notification::SwitchingToLive))) .await .expect("receiver exists"); diff --git a/src/lib.rs b/src/lib.rs index 4af7fa6..ff37a8d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,6 @@ pub use error::ScannerError; pub use types::{Notification, ScannerMessage}; pub use event_scanner::{ - EventFilter, EventScanner, EventScannerBuilder, Historic, LatestEvents, Live, Message, - SyncFromBlock, SyncFromLatestEvents, + EventFilter, EventScanner, EventScannerBuilder, EventScannerResult, Historic, LatestEvents, + Live, Message, SyncFromBlock, SyncFromLatestEvents, }; diff --git a/src/test_utils/macros.rs b/src/test_utils/macros.rs index 38a96af..d081eb2 100644 --- a/src/test_utils/macros.rs +++ b/src/test_utils/macros.rs @@ -1,7 +1,7 @@ use alloy::primitives::LogData; use tokio_stream::Stream; -use crate::{Message, ScannerError}; +use crate::{Message, event_scanner::EventScannerResult}; #[macro_export] macro_rules! assert_next { @@ -187,7 +187,7 @@ macro_rules! assert_event_sequence_final { } #[allow(clippy::missing_panics_doc)] -pub async fn assert_event_sequence> + Unpin>( +pub async fn assert_event_sequence + Unpin>( stream: &mut S, expected_options: impl IntoIterator, timeout_secs: u64, @@ -332,7 +332,7 @@ macro_rules! assert_range_coverage { .expect("Timed out waiting for the next block range"); match message { - std::option::Option::Some(std::result::Result::Ok($crate::block_range_scanner::Message::Data(range))) => { + std::option::Option::Some(std::result::Result::Ok(event_scanner::ScannerMessage::Data(range))) => { let (streamed_start, streamed_end) = bounds(&range); streamed_ranges.push(range.clone()); assert!( diff --git a/src/types.rs b/src/types.rs index 251f3ae..c21f037 100644 --- a/src/types.rs +++ b/src/types.rs @@ -33,7 +33,7 @@ impl PartialEq for ScannerMessage { } } -pub type ScannerResult = Result, ScannerError>; +pub type ScannerResult = Result, ScannerError>; pub trait IntoScannerResult { fn into_scanner_message_result(self) -> ScannerResult; diff --git a/tests/common/setup_scanner.rs b/tests/common/setup_scanner.rs index 6f7bccf..9471f63 100644 --- a/tests/common/setup_scanner.rs +++ b/tests/common/setup_scanner.rs @@ -6,8 +6,8 @@ use alloy::{ }; use alloy_node_bindings::AnvilInstance; use event_scanner::{ - EventFilter, EventScanner, EventScannerBuilder, Historic, LatestEvents, Live, Message, - ScannerError, SyncFromBlock, SyncFromLatestEvents, robust_provider::RobustProvider, + EventFilter, EventScanner, EventScannerBuilder, EventScannerResult, Historic, LatestEvents, + Live, SyncFromBlock, SyncFromLatestEvents, robust_provider::RobustProvider, }; use tokio_stream::wrappers::ReceiverStream; @@ -24,7 +24,7 @@ where pub provider: RobustProvider, pub contract: TestCounter::TestCounterInstance

, pub scanner: S, - pub stream: ReceiverStream>, + pub stream: ReceiverStream, #[allow(dead_code)] pub anvil: AnvilInstance, } From 29779981467fb2a85990a50dd37037572d178673 Mon Sep 17 00:00:00 2001 From: Leo Date: Fri, 21 Nov 2025 21:20:26 +0900 Subject: [PATCH 24/26] fix: doc --- src/block_range_scanner.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 10f9831..4f1d2e8 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -7,10 +7,10 @@ //! //! use alloy::providers::{Provider, ProviderBuilder}; //! use event_scanner::{ -//! ScannerError, +//! ScannerError, ScannerMessage, //! block_range_scanner::{ //! BlockRangeScanner, BlockRangeScannerClient, DEFAULT_BLOCK_CONFIRMATIONS, -//! DEFAULT_MAX_BLOCK_RANGE, Message, +//! DEFAULT_MAX_BLOCK_RANGE, //! }, //! robust_provider::RobustProviderBuilder, //! }; @@ -35,10 +35,10 @@ //! //! while let Some(message) = stream.next().await { //! match message { -//! Ok(Message::Data(range)) => { +//! Ok(ScannerMessage::Data(range)) => { //! // process range //! } -//! Ok(Message::Notification(notification)) => { +//! Ok(ScannerMessage::Notification(notification)) => { //! info!("Received notification: {:?}", notification); //! } //! Err(e) => { From ae5bede53985a4c0336bd8f48dd4925ea3bda51f Mon Sep 17 00:00:00 2001 From: Leo Date: Fri, 21 Nov 2025 21:49:33 +0900 Subject: [PATCH 25/26] feat: udpate macro and fix doc --- src/error.rs | 12 ++++++++++-- src/test_utils/macros.rs | 32 +++++++++++++++++--------------- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/src/error.rs b/src/error.rs index c9a83af..11277fa 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{mem::discriminant, sync::Arc}; use alloy::{ eips::BlockId, @@ -6,7 +6,7 @@ use alloy::{ }; use thiserror::Error; -use crate::robust_provider::Error as RobustProviderError; +use crate::{robust_provider::Error as RobustProviderError, types::ScannerResult}; #[derive(Error, Debug, Clone)] pub enum ScannerError { @@ -47,3 +47,11 @@ impl From> for ScannerError { ScannerError::RpcError(Arc::new(error)) } } +impl PartialEq for ScannerResult { + fn eq(&self, other: &ScannerError) -> bool { + match self { + Ok(_) => false, + Err(err) => discriminant(err) == discriminant(other), + } + } +} diff --git a/src/test_utils/macros.rs b/src/test_utils/macros.rs index d081eb2..8a82889 100644 --- a/src/test_utils/macros.rs +++ b/src/test_utils/macros.rs @@ -1,19 +1,15 @@ use alloy::primitives::LogData; use tokio_stream::Stream; -use crate::{Message, event_scanner::EventScannerResult}; +use crate::{ScannerMessage, event_scanner::EventScannerResult}; #[macro_export] macro_rules! assert_next { - // Convenience form with default timeout - ($stream: expr, Err($expected_err:pat)) => { + // 1. Explicit Error Matching (Value based) - uses the new PartialEq implementation + ($stream: expr, Err($expected_err:expr)) => { $crate::assert_next!($stream, Err($expected_err), timeout = 5) }; - ($stream: expr, $expected: expr) => { - $crate::assert_next!($stream, $expected, timeout = 5) - }; - // Result::Err expectation – assert the next item is an Err matching the pattern - ($stream: expr, Err($expected_err:pat), timeout = $secs: expr) => { + ($stream: expr, Err($expected_err:expr), timeout = $secs: expr) => { let message = tokio::time::timeout( std::time::Duration::from_secs($secs), tokio_stream::StreamExt::next(&mut $stream), @@ -21,11 +17,17 @@ macro_rules! assert_next { .await .expect("timed out"); if let Some(msg) = message { - assert!(matches!(msg, Err($expected_err))); + let expected = &$expected_err; + assert_eq!(&msg, expected, "Expected error {:?}, got {:?}", expected, msg); } else { - panic!("Expected Err(..), but channel was closed"); + panic!("Expected error {:?}, but channel was closed", $expected_err); } }; + + // 2. Success Matching (Implicit unwrapping) - existing behavior + ($stream: expr, $expected: expr) => { + $crate::assert_next!($stream, $expected, timeout = 5) + }; ($stream: expr, $expected: expr, timeout = $secs: expr) => { let message = tokio::time::timeout( std::time::Duration::from_secs($secs), @@ -211,7 +213,7 @@ pub async fn assert_event_sequence + Unpin> .expect("timed out waiting for next batch"); match message { - Some(Ok(Message::Data(batch))) => { + Some(Ok(ScannerMessage::Data(batch))) => { let mut batch = batch.iter(); let event = batch.next().expect("Streamed batch should not be empty"); assert_eq!( @@ -250,7 +252,7 @@ pub async fn assert_event_sequence + Unpin> /// range must start exactly where the previous one ended, and all ranges must fit within /// the expected bounds. /// -/// The macro expects the stream to yield `Message::Data(range)` variants containing +/// The macro expects the stream to yield `ScannerMessage::Data(range)` variants containing /// `RangeInclusive` values representing block ranges. It tracks coverage by ensuring /// each new range starts at the next expected block number and doesn't exceed the end of /// the expected range. Once the entire range is covered, the assertion succeeds. @@ -258,7 +260,7 @@ pub async fn assert_event_sequence + Unpin> /// # Example /// /// ```rust -/// use event_scanner::{assert_range_coverage, block_range_scanner::Message}; +/// use event_scanner::{ScannerMessage, assert_range_coverage}; /// use tokio::sync::mpsc; /// use tokio_stream::wrappers::ReceiverStream; /// @@ -269,8 +271,8 @@ pub async fn assert_event_sequence + Unpin> /// /// // Simulate a scanner that splits blocks 100-199 into chunks /// tokio::spawn(async move { -/// tx.send(Message::Data(100..=149)).await.unwrap(); -/// tx.send(Message::Data(150..=199)).await.unwrap(); +/// tx.send(ScannerMessage::Data(100..=149)).await.unwrap(); +/// tx.send(ScannerMessage::Data(150..=199)).await.unwrap(); /// }); /// /// // Assert that the stream covers blocks 100-199 From 30b933ded018baaf8444ccb44c9b275b147b7109 Mon Sep 17 00:00:00 2001 From: Nenad Date: Fri, 21 Nov 2025 14:16:05 +0100 Subject: [PATCH 26/26] Update src/block_range_scanner.rs --- src/block_range_scanner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 4f1d2e8..dab8054 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -93,7 +93,7 @@ pub const DEFAULT_REORG_REWIND_DEPTH: u64 = 64; pub type BlockScannerResult = ScannerResult>; -type Message = ScannerMessage>; +pub type Message = ScannerMessage>; impl From> for Message { fn from(range: RangeInclusive) -> Self {