1
1
use again:: RetryPolicy ;
2
+ use blob_archiver_beacon:: beacon_client:: BeaconClient ;
2
3
use blob_archiver_storage:: {
3
4
BackfillProcess , BackfillProcesses , BlobData , BlobSidecars , Header , LockFile , Storage ,
4
5
} ;
6
+ use eth2:: types:: Slot ;
5
7
use eth2:: types:: { BlockHeaderData , BlockId , Hash256 } ;
6
- use eyre:: Result ;
8
+ use eth2:: Error ;
9
+ use eyre:: { eyre, Result } ;
7
10
use serde:: { Deserialize , Serialize } ;
8
11
use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
9
12
use std:: sync:: Arc ;
10
13
use std:: time:: Duration ;
11
14
use tokio:: sync:: watch:: Receiver ;
12
15
use tokio:: time:: { interval, sleep} ;
13
16
use tracing:: log:: { debug, error, info, trace} ;
14
- use blob_archiver_beacon:: beacon_client:: BeaconClient ;
15
17
16
18
#[ allow( dead_code) ]
17
19
const LIVE_FETCH_BLOB_MAXIMUM_RETRIES : usize = 10 ;
18
20
#[ allow( dead_code) ]
19
- const STARTUP_FETCH_BLOB_MAXIMUM_RETRIES : i32 = 3 ;
21
+ const STARTUP_FETCH_BLOB_MAXIMUM_RETRIES : usize = 3 ;
20
22
#[ allow( dead_code) ]
21
- const REARCHIVE_MAXIMUM_RETRIES : i32 = 3 ;
23
+ const REARCHIVE_MAXIMUM_RETRIES : usize = 3 ;
22
24
#[ allow( dead_code) ]
23
25
const BACKFILL_ERROR_RETRY_INTERVAL : Duration = Duration :: from_secs ( 5 ) ;
24
26
#[ allow( dead_code) ]
@@ -30,6 +32,13 @@ const OBTAIN_LOCK_RETRY_INTERVAL_SECS: u64 = 10;
30
32
#[ allow( dead_code) ]
31
33
static OBTAIN_LOCK_RETRY_INTERVAL : AtomicU64 = AtomicU64 :: new ( OBTAIN_LOCK_RETRY_INTERVAL_SECS ) ;
32
34
35
+ #[ derive( Debug , Serialize , Deserialize ) ]
36
+ pub struct RearchiveResp {
37
+ pub from : u64 ,
38
+ pub to : u64 ,
39
+ pub error : Option < String > ,
40
+ }
41
+
33
42
#[ derive( Debug , PartialEq , Eq , Clone , Default , Serialize , Deserialize ) ]
34
43
pub struct Config {
35
44
pub poll_interval : Duration ,
@@ -43,9 +52,9 @@ pub struct Archiver {
43
52
pub beacon_client : Arc < dyn BeaconClient > ,
44
53
45
54
storage : Arc < dyn Storage > ,
46
- # [ allow ( dead_code ) ]
55
+
47
56
id : String ,
48
- # [ allow ( dead_code ) ]
57
+
49
58
pub config : Config ,
50
59
51
60
shutdown_rx : Receiver < bool > ,
@@ -70,20 +79,20 @@ impl Archiver {
70
79
& self ,
71
80
block_id : BlockId ,
72
81
overwrite : bool ,
73
- ) -> Result < ( BlockHeaderData , bool ) > {
82
+ ) -> Result < Option < ( BlockHeaderData , bool ) > > {
74
83
let header_resp_opt = self
75
84
. beacon_client
76
85
. get_beacon_headers_block_id ( block_id)
77
86
. await
78
87
. map_err ( |e| eyre:: eyre!( e) ) ?;
79
88
80
89
match header_resp_opt {
81
- None => Err ( eyre :: eyre! ( "No header response" ) ) ,
90
+ None => Ok ( None ) ,
82
91
Some ( header) => {
83
92
let exists = self . storage . exists ( & header. data . root ) . await ;
84
93
85
94
if exists && !overwrite {
86
- return Ok ( ( header. data , true ) ) ;
95
+ return Ok ( Some ( ( header. data , true ) ) ) ;
87
96
}
88
97
89
98
let blobs_resp_opt = self
@@ -103,9 +112,9 @@ impl Archiver {
103
112
) ;
104
113
self . storage . write_blob_data ( blob_data) . await ?;
105
114
trace ! ( "Persisting blobs for block: {:?}" , blob_data) ;
106
- return Ok ( ( header. data , exists) ) ;
115
+ return Ok ( Some ( ( header. data , exists) ) ) ;
107
116
}
108
- Ok ( ( header. data , exists) )
117
+ Ok ( Some ( ( header. data , exists) ) )
109
118
}
110
119
}
111
120
}
@@ -228,7 +237,7 @@ impl Archiver {
228
237
& process. current_block ,
229
238
& mut processes,
230
239
)
231
- . await ;
240
+ . await ;
232
241
}
233
242
}
234
243
Err ( e) => {
@@ -248,7 +257,7 @@ impl Archiver {
248
257
let mut curr = current. clone ( ) ;
249
258
let mut already_exists = false ;
250
259
let mut count = 0 ;
251
- let mut res: Result < ( BlockHeaderData , bool ) > ;
260
+ let mut res: Result < Option < ( BlockHeaderData , bool ) > > ;
252
261
let shutdown_rx = self . shutdown_rx . clone ( ) ;
253
262
info ! ( "backfill process initiated, curr_hash: {:#?}, curr_slot: {:#?}, start_hash: {:#?},start_slot: {:#?}" , curr. root, curr. header. message. slot. clone( ) , start. root, start. header. message. slot. clone( ) ) ;
254
263
@@ -276,7 +285,14 @@ impl Archiver {
276
285
continue ;
277
286
} ;
278
287
279
- let ( parent, parent_exists) = res. unwrap ( ) ;
288
+ let Some ( ( parent, parent_exists) ) = res. unwrap ( ) else {
289
+ error ! (
290
+ "failed to persist blobs for block, will retry, hash: {:#?}" ,
291
+ curr. header. message. parent_root
292
+ ) ;
293
+ sleep ( BACKFILL_ERROR_RETRY_INTERVAL ) . await ;
294
+ continue ;
295
+ } ;
280
296
curr = parent;
281
297
already_exists = parent_exists;
282
298
@@ -322,7 +338,7 @@ impl Archiver {
322
338
let mut current_block_id = BlockId :: Head ;
323
339
324
340
loop {
325
- let retry_policy = RetryPolicy :: exponential ( Duration :: from_secs ( 1 ) )
341
+ let retry_policy = RetryPolicy :: exponential ( Duration :: from_millis ( 250 ) )
326
342
. with_jitter ( true )
327
343
. with_max_delay ( Duration :: from_secs ( 10 ) )
328
344
. with_max_retries ( LIVE_FETCH_BLOB_MAXIMUM_RETRIES ) ;
@@ -335,7 +351,11 @@ impl Archiver {
335
351
return ;
336
352
}
337
353
338
- let ( curr, already_exists) = res. unwrap ( ) ;
354
+ let Some ( ( curr, already_exists) ) = res. unwrap ( ) else {
355
+ error ! ( "Error fetching blobs for block" ) ;
356
+ return ;
357
+ } ;
358
+
339
359
if start. is_none ( ) {
340
360
start = Some ( curr. clone ( ) ) ;
341
361
}
@@ -376,6 +396,52 @@ impl Archiver {
376
396
377
397
#[ allow( dead_code) ]
378
398
async fn start ( & self ) { }
399
+
400
+ #[ allow( dead_code) ]
401
+ async fn rearchive_range ( & self , from : u64 , to : u64 ) -> RearchiveResp {
402
+ for i in from..=to {
403
+ info ! ( "rearchiving block: {}" , i) ;
404
+ let retry_policy = RetryPolicy :: exponential ( Duration :: from_millis ( 250 ) )
405
+ . with_jitter ( true )
406
+ . with_max_delay ( Duration :: from_secs ( 10 ) )
407
+ . with_max_retries ( REARCHIVE_MAXIMUM_RETRIES ) ;
408
+ let r = retry_policy. retry ( || self . rearchive ( i) ) . await ;
409
+
410
+ match r {
411
+ Err ( e) => {
412
+ error ! ( "Error fetching blobs for block: {:#?}" , e) ;
413
+ return RearchiveResp {
414
+ from,
415
+ to,
416
+ error : Some ( e. downcast :: < Error > ( ) . unwrap ( ) . to_string ( ) ) ,
417
+ } ;
418
+ }
419
+ Ok ( false ) => {
420
+ info ! ( "block not found, skipping" ) ;
421
+ }
422
+ Ok ( true ) => {
423
+ info ! ( "block rearchived successfully" )
424
+ }
425
+ }
426
+ }
427
+ RearchiveResp {
428
+ from,
429
+ to,
430
+ error : None ,
431
+ }
432
+ }
433
+
434
+ async fn rearchive ( & self , i : u64 ) -> Result < bool > {
435
+ let res = self
436
+ . persist_blobs_for_block ( BlockId :: Slot ( Slot :: new ( i) ) , true )
437
+ . await ;
438
+
439
+ match res {
440
+ Err ( e) => Err ( eyre ! ( e) ) ,
441
+ Ok ( None ) => Ok ( false ) ,
442
+ Ok ( Some ( _) ) => Ok ( true ) ,
443
+ }
444
+ }
379
445
}
380
446
381
447
#[ cfg( test) ]
@@ -385,9 +451,9 @@ mod tests {
385
451
use std:: time:: Duration ;
386
452
387
453
use super :: * ;
454
+ use blob_archiver_beacon:: beacon_client:: BeaconClientEth2 ;
388
455
use blob_archiver_storage:: fs:: FSStorage ;
389
456
use eth2:: { BeaconNodeHttpClient , SensitiveUrl , Timeouts } ;
390
- use blob_archiver_beacon:: beacon_client:: BeaconClientEth2 ;
391
457
392
458
#[ tokio:: test]
393
459
async fn test_persist_blobs_for_block ( ) {
@@ -399,9 +465,7 @@ mod tests {
399
465
let storage = FSStorage :: new ( dir. clone ( ) ) . await . unwrap ( ) ;
400
466
tokio:: fs:: create_dir_all ( dir) . await . unwrap ( ) ;
401
467
let ( _, rx) = tokio:: sync:: watch:: channel ( false ) ;
402
- let beacon_client_eth2 = BeaconClientEth2 {
403
- beacon_client,
404
- } ;
468
+ let beacon_client_eth2 = BeaconClientEth2 { beacon_client } ;
405
469
let archiver = Archiver :: new ( Arc :: new ( beacon_client_eth2) , Arc :: new ( storage) , rx) ;
406
470
407
471
let block_id = BlockId :: Head ;
0 commit comments