Skip to content

Commit 5a19c9a

Browse files
authored
From / To accepts hash (#195)
1 parent b09a42e commit 5a19c9a

File tree

9 files changed

+430
-121
lines changed

9 files changed

+430
-121
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ Always handle all message types in your stream processing loop to ensure robust
252252

253253
- `examples/live_scanning` – minimal live-mode scanner using `EventScannerBuilder::live()`
254254
- `examples/historical_scanning` – demonstrates replaying historical data using `EventScannerBuilder::historic()`
255-
- `examples/sync_from_block_scanning` – demonstrates replaying from genesis (block 0) before continuing to stream the latest blocks using `EventScannerBuilder::sync().from_block(block_tag_or_number)`
255+
- `examples/sync_from_block_scanning` – demonstrates replaying from genesis (block 0) before continuing to stream the latest blocks using `EventScannerBuilder::sync().from_block(block_id)`
256256
- `examples/latest_events_scanning` – demonstrates scanning the latest events using `EventScannerBuilder::latest()`
257257
- `examples/sync_from_latest_scanning` – demonstrates scanning the latest events before switching to live mode using `EventScannerBuilder::sync().from_latest(count)`.
258258

src/block_range_scanner.rs

Lines changed: 49 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ use crate::{
7474
};
7575
use alloy::{
7676
consensus::BlockHeader,
77-
eips::BlockNumberOrTag,
77+
eips::{BlockId, BlockNumberOrTag},
7878
network::{BlockResponse, Network, primitives::HeaderResponse},
7979
primitives::{B256, BlockNumber},
8080
pubsub::Subscription,
@@ -196,20 +196,20 @@ pub enum Command {
196196
},
197197
StreamHistorical {
198198
sender: mpsc::Sender<Message>,
199-
start_height: BlockNumberOrTag,
200-
end_height: BlockNumberOrTag,
199+
start_id: BlockId,
200+
end_id: BlockId,
201201
response: oneshot::Sender<Result<(), ScannerError>>,
202202
},
203203
StreamFrom {
204204
sender: mpsc::Sender<Message>,
205-
start_height: BlockNumberOrTag,
205+
start_id: BlockId,
206206
block_confirmations: u64,
207207
response: oneshot::Sender<Result<(), ScannerError>>,
208208
},
209209
Rewind {
210210
sender: mpsc::Sender<Message>,
211-
start_height: BlockNumberOrTag,
212-
end_height: BlockNumberOrTag,
211+
start_id: BlockId,
212+
end_id: BlockId,
213213
response: oneshot::Sender<Result<(), ScannerError>>,
214214
},
215215
}
@@ -266,19 +266,19 @@ impl<N: Network> Service<N> {
266266
let result = self.handle_live(block_confirmations, sender).await;
267267
let _ = response.send(result);
268268
}
269-
Command::StreamHistorical { sender, start_height, end_height, response } => {
270-
info!(start_height = ?start_height, end_height = ?end_height, "Starting historical stream");
271-
let result = self.handle_historical(start_height, end_height, sender).await;
269+
Command::StreamHistorical { sender, start_id, end_id, response } => {
270+
info!(start_id = ?start_id, end_id = ?end_id, "Starting historical stream");
271+
let result = self.handle_historical(start_id, end_id, sender).await;
272272
let _ = response.send(result);
273273
}
274-
Command::StreamFrom { sender, start_height, block_confirmations, response } => {
275-
info!(start_height = ?start_height, "Starting streaming from");
276-
let result = self.handle_sync(start_height, block_confirmations, sender).await;
274+
Command::StreamFrom { sender, start_id, block_confirmations, response } => {
275+
info!(start_id = ?start_id, "Starting streaming from");
276+
let result = self.handle_sync(start_id, block_confirmations, sender).await;
277277
let _ = response.send(result);
278278
}
279-
Command::Rewind { sender, start_height, end_height, response } => {
280-
info!(start_height = ?start_height, end_height = ?end_height, "Starting rewind");
281-
let result = self.handle_rewind(start_height, end_height, sender).await;
279+
Command::Rewind { sender, start_id, end_id, response } => {
280+
info!(start_id = ?start_id, end_id = ?end_id, "Starting rewind");
281+
let result = self.handle_rewind(start_id, end_id, sender).await;
282282
let _ = response.send(result);
283283
}
284284
}
@@ -318,16 +318,14 @@ impl<N: Network> Service<N> {
318318

319319
async fn handle_historical(
320320
&mut self,
321-
start_height: BlockNumberOrTag,
322-
end_height: BlockNumberOrTag,
321+
start_id: BlockId,
322+
end_id: BlockId,
323323
sender: mpsc::Sender<Message>,
324324
) -> Result<(), ScannerError> {
325325
let max_block_range = self.max_block_range;
326326

327-
let (start_block, end_block) = tokio::try_join!(
328-
self.provider.get_block_by_number(start_height),
329-
self.provider.get_block_by_number(end_height)
330-
)?;
327+
let (start_block, end_block) =
328+
tokio::try_join!(self.provider.get_block(start_id), self.provider.get_block(end_id))?;
331329

332330
let start_block_num = start_block.header().number();
333331
let end_block_num = end_block.header().number();
@@ -354,17 +352,17 @@ impl<N: Network> Service<N> {
354352

355353
async fn handle_sync(
356354
&mut self,
357-
start_height: BlockNumberOrTag,
355+
start_id: BlockId,
358356
block_confirmations: u64,
359357
sender: mpsc::Sender<Message>,
360358
) -> Result<(), ScannerError> {
361359
let provider = self.provider.clone();
362360
let max_block_range = self.max_block_range;
363361

364362
let get_start_block = async || -> Result<BlockNumber, ScannerError> {
365-
let block = match start_height {
366-
BlockNumberOrTag::Number(num) => num,
367-
block_tag => provider.get_block_by_number(block_tag).await?.header().number(),
363+
let block = match start_id {
364+
BlockId::Number(BlockNumberOrTag::Number(num)) => num,
365+
_ => provider.get_block(start_id).await?.header().number(),
368366
};
369367
Ok(block)
370368
};
@@ -457,17 +455,15 @@ impl<N: Network> Service<N> {
457455

458456
async fn handle_rewind(
459457
&mut self,
460-
start_height: BlockNumberOrTag,
461-
end_height: BlockNumberOrTag,
458+
start_id: BlockId,
459+
end_id: BlockId,
462460
sender: mpsc::Sender<Message>,
463461
) -> Result<(), ScannerError> {
464462
let max_block_range = self.max_block_range;
465463
let provider = self.provider.clone();
466464

467-
let (start_block, end_block) = try_join!(
468-
self.provider.get_block_by_number(start_height),
469-
self.provider.get_block_by_number(end_height),
470-
)?;
465+
let (start_block, end_block) =
466+
try_join!(self.provider.get_block(start_id), self.provider.get_block(end_id),)?;
471467

472468
// normalize block range
473469
let (from, to) = match start_block.header().number().cmp(&end_block.header().number()) {
@@ -751,28 +747,28 @@ impl BlockRangeScannerClient {
751747
Ok(ReceiverStream::new(blocks_receiver))
752748
}
753749

754-
/// Streams a batch of historical blocks from `start_height` to `end_height`.
750+
/// Streams a batch of historical blocks from `start_id` to `end_id`.
755751
///
756752
/// # Arguments
757753
///
758-
/// * `start_height` - The starting block number or tag.
759-
/// * `end_height` - The ending block number or tag.
754+
/// * `start_id` - The starting block id
755+
/// * `end_id` - The ending block id
760756
///
761757
/// # Errors
762758
///
763759
/// * `ScannerError::ServiceShutdown` - if the service is already shutting down.
764760
pub async fn stream_historical(
765761
&self,
766-
start_height: impl Into<BlockNumberOrTag>,
767-
end_height: impl Into<BlockNumberOrTag>,
762+
start_id: impl Into<BlockId>,
763+
end_id: impl Into<BlockId>,
768764
) -> Result<ReceiverStream<Message>, ScannerError> {
769765
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
770766
let (response_tx, response_rx) = oneshot::channel();
771767

772768
let command = Command::StreamHistorical {
773769
sender: blocks_sender,
774-
start_height: start_height.into(),
775-
end_height: end_height.into(),
770+
start_id: start_id.into(),
771+
end_id: end_id.into(),
776772
response: response_tx,
777773
};
778774

@@ -783,27 +779,27 @@ impl BlockRangeScannerClient {
783779
Ok(ReceiverStream::new(blocks_receiver))
784780
}
785781

786-
/// Streams blocks starting from `start_height` and transitions to live mode.
782+
/// Streams blocks starting from `start_id` and transitions to live mode.
787783
///
788784
/// # Arguments
789785
///
790-
/// * `start_height` - The starting block number or tag.
786+
/// * `start_id` - The starting block id.
791787
/// * `block_confirmations` - Number of confirmations to apply once in live mode.
792788
///
793789
/// # Errors
794790
///
795791
/// * `ScannerError::ServiceShutdown` - if the service is already shutting down.
796792
pub async fn stream_from(
797793
&self,
798-
start_height: impl Into<BlockNumberOrTag>,
794+
start_id: impl Into<BlockId>,
799795
block_confirmations: u64,
800796
) -> Result<ReceiverStream<Message>, ScannerError> {
801797
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
802798
let (response_tx, response_rx) = oneshot::channel();
803799

804800
let command = Command::StreamFrom {
805801
sender: blocks_sender,
806-
start_height: start_height.into(),
802+
start_id: start_id.into(),
807803
block_confirmations,
808804
response: response_tx,
809805
};
@@ -815,28 +811,28 @@ impl BlockRangeScannerClient {
815811
Ok(ReceiverStream::new(blocks_receiver))
816812
}
817813

818-
/// Streams blocks in reverse order from `start_height` to `end_height`.
814+
/// Streams blocks in reverse order from `start_id` to `end_id`.
819815
///
820816
/// # Arguments
821817
///
822-
/// * `start_height` - The starting block number or tag (defaults to Latest if None).
823-
/// * `end_height` - The ending block number or tag (defaults to Earliest if None).
818+
/// * `start_id` - The starting block id (defaults to Latest if None).
819+
/// * `end_id` - The ending block id (defaults to Earliest if None).
824820
///
825821
/// # Errors
826822
///
827823
/// * `ScannerError::ServiceShutdown` - if the service is already shutting down.
828824
pub async fn rewind(
829825
&self,
830-
start_height: impl Into<BlockNumberOrTag>,
831-
end_height: impl Into<BlockNumberOrTag>,
826+
start_id: impl Into<BlockId>,
827+
end_id: impl Into<BlockId>,
832828
) -> Result<ReceiverStream<Message>, ScannerError> {
833829
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
834830
let (response_tx, response_rx) = oneshot::channel();
835831

836832
let command = Command::Rewind {
837833
sender: blocks_sender,
838-
start_height: start_height.into(),
839-
end_height: end_height.into(),
834+
start_id: start_id.into(),
835+
end_id: end_id.into(),
840836
response: response_tx,
841837
};
842838

@@ -852,7 +848,10 @@ impl BlockRangeScannerClient {
852848
mod tests {
853849
use super::*;
854850
use crate::{assert_closed, assert_next};
855-
use alloy::{eips::BlockId, network::Ethereum};
851+
use alloy::{
852+
eips::{BlockId, BlockNumberOrTag},
853+
network::Ethereum,
854+
};
856855
use tokio::sync::mpsc;
857856

858857
#[test]

0 commit comments

Comments
 (0)