@@ -7,34 +7,49 @@ use std::fs::File;
7
7
use std:: {
8
8
io:: { self , BufRead } ,
9
9
path:: Path ,
10
+ time:: Duration ,
10
11
} ;
11
12
12
13
use anyhow:: anyhow;
13
14
use ethportal_api:: {
14
- utils:: bytes:: hex_decode, BlockBodyKey , BlockReceiptsKey , ContentValue , HistoryContentKey ,
15
- HistoryContentValue ,
15
+ jsonrpsee:: http_client:: { HttpClient , HttpClientBuilder } ,
16
+ types:: { cli:: DEFAULT_WEB3_HTTP_ADDRESS , network:: Subnetwork , query_trace:: QueryTrace } ,
17
+ utils:: bytes:: hex_decode,
18
+ BlockBodyKey , BlockReceiptsKey , ContentValue , HistoryContentKey , HistoryContentValue ,
16
19
} ;
17
20
use futures:: { channel:: oneshot, future:: join_all} ;
18
- use portalnet:: overlay:: command:: OverlayCommand ;
21
+ use portal_bridge:: census:: Census ;
22
+ use portalnet:: overlay:: { command:: OverlayCommand , config:: FindContentConfig } ;
19
23
use tokio:: sync:: mpsc:: UnboundedSender ;
20
24
use tracing:: { error, info, warn} ;
21
25
22
26
/// The number of blocks to download in a single batch.
23
- const BATCH_SIZE : usize = 40 ;
27
+ const BATCH_SIZE : usize = 3 ;
24
28
/// The path to the CSV file with block numbers and block hashes.
25
29
const CSV_PATH : & str = "ethereum_blocks_14000000_merge.csv" ;
26
30
27
31
#[ derive( Clone ) ]
28
32
pub struct Downloader {
33
+ pub census : Census ,
29
34
pub overlay_tx : UnboundedSender < OverlayCommand < HistoryContentKey > > ,
30
35
}
31
36
32
37
impl Downloader {
33
38
pub fn new ( overlay_tx : UnboundedSender < OverlayCommand < HistoryContentKey > > ) -> Self {
34
- Self { overlay_tx }
39
+ let http_client: HttpClient = HttpClientBuilder :: default ( )
40
+ // increase default timeout to allow for trace_gossip requests that can take a long
41
+ // time
42
+ . request_timeout ( Duration :: from_secs ( 120 ) )
43
+ . build ( DEFAULT_WEB3_HTTP_ADDRESS )
44
+ . map_err ( |e| e. to_string ( ) )
45
+ . expect ( "Failed to build http client" ) ;
46
+
47
+ // BUild hhtp client binded to the current node web3rpc
48
+ let census = Census :: new ( http_client, 0 , vec ! [ ] ) ;
49
+ Self { overlay_tx, census }
35
50
}
36
51
37
- pub async fn start ( self ) -> io:: Result < ( ) > {
52
+ pub async fn start ( mut self ) -> io:: Result < ( ) > {
38
53
// set the csv path to a file in the root trin-history directory
39
54
info ! ( "Opening CSV file" ) ;
40
55
let csv_path = Path :: new ( CSV_PATH ) ;
@@ -47,6 +62,14 @@ impl Downloader {
47
62
// skip the header of the csv file
48
63
let lines = & lines[ 1 ..] ;
49
64
let blocks: Vec < ( u64 , String ) > = lines. iter ( ) . map ( |line| parse_line ( line) ) . collect ( ) ;
65
+ // Initialize the census with the history subnetwork
66
+ let _ = Some (
67
+ self . census
68
+ . init ( [ Subnetwork :: History ] )
69
+ . await
70
+ . expect ( "Failed to initialize Census" ) ,
71
+ ) ;
72
+
50
73
info ! ( "Processing blocks" ) ;
51
74
let batches = blocks. chunks ( BATCH_SIZE ) ;
52
75
@@ -91,7 +114,10 @@ impl Downloader {
91
114
let overlay_command = OverlayCommand :: FindContentQuery {
92
115
target : content_key. clone ( ) ,
93
116
callback : tx,
94
- config : Default :: default ( ) ,
117
+ config : FindContentConfig {
118
+ is_trace : true ,
119
+ ..Default :: default ( )
120
+ } ,
95
121
} ;
96
122
97
123
if let Err ( err) = self . overlay_tx . send ( overlay_command) {
@@ -104,7 +130,14 @@ impl Downloader {
104
130
Ok ( result) => match result {
105
131
Ok ( result) => {
106
132
HistoryContentValue :: decode ( & content_key, & result. 0 ) ?;
107
- info ! ( block_number = block_number, "Downloaded content for block" ) ;
133
+ let duration_ms = QueryTrace :: timestamp_millis_u64 (
134
+ result. 2 . expect ( "QueryTrace not found" ) . started_at_ms ,
135
+ ) ;
136
+ info ! (
137
+ block_number = block_number,
138
+ query_duration = duration_ms,
139
+ "Downloaded content for block"
140
+ ) ;
108
141
Ok ( ( ) )
109
142
}
110
143
Err ( err) => {
0 commit comments