Skip to content

Commit e1a0dfd

Browse files
committed
feat:enhance archiver
Signed-off-by: Chen Kai <[email protected]>
1 parent 640844e commit e1a0dfd

File tree

8 files changed

+334
-91
lines changed

8 files changed

+334
-91
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bin/archiver/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@ serde_json.workspace = true
1818
serde.workspace = true
1919
tracing.workspace = true
2020
tokio.workspace = true
21+
eyre.workspace = true
2122
blob-archiver-storage = { path = "../../crates/storage" }
2223

bin/archiver/src/archiver.rs

Lines changed: 260 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,282 @@
1-
use blob_archiver_storage::{BlobData, BlobSidecars, Header};
2-
use eth2::types::{BlockId, MainnetEthSpec};
3-
use eth2::{BeaconNodeHttpClient, Error};
4-
use tracing::log::trace;
1+
#![allow(incomplete_features)]
2+
3+
use std::sync::Arc;
4+
use std::sync::atomic::{AtomicU64, Ordering};
5+
use std::time::Duration;
6+
use blob_archiver_storage::{BackfillProcess, BackfillProcesses, BlobData, BlobSidecars, Header, LockFile, Storage};
7+
use eth2::types::{BlockHeaderData, BlockId, Hash256, MainnetEthSpec};
8+
use eth2::{BeaconNodeHttpClient};
9+
use eyre::Result;
10+
use serde::{Deserialize, Serialize};
11+
use tokio::sync::watch::Receiver;
12+
use tokio::time::{interval, sleep};
13+
use tracing::log::{error, info, trace};
14+
15+
#[allow(dead_code)]
16+
const LIVE_FETCH_BLOB_MAXIMUM_RETRIES: i32 = 10;
17+
#[allow(dead_code)]
18+
const STARTUP_FETCH_BLOB_MAXIMUM_RETRIES: i32 = 3;
19+
#[allow(dead_code)]
20+
const REARCHIVE_MAXIMUM_RETRIES: i32 = 3;
21+
#[allow(dead_code)]
22+
const BACKFILL_ERROR_RETRY_INTERVAL: Duration = Duration::from_secs(5);
23+
#[allow(dead_code)]
24+
const LOCK_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
25+
#[allow(dead_code)]
26+
const LOCK_TIMEOUT: Duration = Duration::from_secs(20);
27+
#[allow(dead_code)]
28+
const OBTAIN_LOCK_RETRY_INTERVAL_SECS: u64 = 10;
29+
#[allow(dead_code)]
30+
static OBTAIN_LOCK_RETRY_INTERVAL: AtomicU64 = AtomicU64::new(OBTAIN_LOCK_RETRY_INTERVAL_SECS);
31+
32+
#[derive(Debug, PartialEq, Eq, Clone, Default, Serialize, Deserialize)]
33+
pub struct Config {
34+
pub poll_interval: Duration,
35+
36+
pub listen_addr: String,
37+
38+
pub origin_block: Hash256,
39+
}
540

641
pub struct Archiver {
742
pub beacon_client: BeaconNodeHttpClient,
43+
44+
storage: Arc<dyn Storage>,
45+
#[allow(dead_code)]
46+
id: String,
47+
#[allow(dead_code)]
48+
pub config: Config,
49+
50+
shutdown_rx: Receiver<bool>,
851
}
952

1053
impl Archiver {
11-
pub fn new(beacon_client: BeaconNodeHttpClient) -> Self {
12-
Self { beacon_client }
54+
pub fn new(beacon_client: BeaconNodeHttpClient, storage: Arc<dyn Storage>, shutdown_rx: Receiver<bool>) -> Self {
55+
Self { beacon_client, storage, id: "".to_string(), config: Default::default(), shutdown_rx }
1356
}
1457

15-
pub async fn persist_blobs_for_block(&self, block_id: BlockId) -> Result<(), Error> {
58+
pub async fn persist_blobs_for_block(&self, block_id: BlockId, overwrite: bool) -> Result<(BlockHeaderData, bool)> {
1659
let header_resp_opt = self
1760
.beacon_client
1861
.get_beacon_headers_block_id(block_id)
19-
.await?;
20-
if let Some(header) = header_resp_opt {
21-
let beacon_client = self.beacon_client.clone();
22-
let blobs_resp_opt = beacon_client
23-
.get_blobs::<MainnetEthSpec>(BlockId::Root(header.data.root), None)
24-
.await?;
25-
if let Some(blob_sidecars) = blobs_resp_opt {
26-
let blob_sidecar_list = blob_sidecars.data;
27-
let blob_data = BlobData::new(
28-
Header {
29-
beacon_block_hash: header.data.root,
30-
},
31-
BlobSidecars {
32-
data: blob_sidecar_list,
33-
},
34-
);
35-
trace!("Persisting blobs for block: {:?}", blob_data);
36-
return Ok(());
62+
.await.map_err(|e| eyre::eyre!(e))?;
63+
64+
match header_resp_opt {
65+
None => { Err(eyre::eyre!("No header response")) }
66+
Some(header) => {
67+
let exists = self.storage.exists(&header.data.root).await;
68+
69+
if exists && !overwrite {
70+
return Ok((header.data, true));
71+
}
72+
73+
let blobs_resp_opt = self.beacon_client
74+
.get_blobs::<MainnetEthSpec>(BlockId::Root(header.data.root), None)
75+
.await.map_err(|e| eyre::eyre!(e))?;
76+
if let Some(blob_sidecars) = blobs_resp_opt {
77+
let blob_sidecar_list = blob_sidecars.data;
78+
let blob_data = &BlobData::new(
79+
Header {
80+
beacon_block_hash: header.data.root,
81+
},
82+
BlobSidecars {
83+
data: blob_sidecar_list,
84+
},
85+
);
86+
self.storage.write_blob_data(blob_data).await?;
87+
trace!("Persisting blobs for block: {:?}", blob_data);
88+
return Ok((header.data, exists));
89+
}
90+
Ok((header.data, exists))
91+
}
92+
}
93+
}
94+
95+
#[allow(dead_code)]
96+
async fn wait_obtain_storage_lock(&self) {
97+
let mut lock_file_res = self.storage.read_lock_file().await;
98+
let mut now = std::time::SystemTime::now()
99+
.duration_since(std::time::UNIX_EPOCH)
100+
.unwrap()
101+
.as_secs();
102+
let mut shutdown = self.shutdown_rx.clone();
103+
match lock_file_res {
104+
Ok(mut lock_file) => {
105+
trace!("Lock file: {:#?}", lock_file);
106+
if lock_file == LockFile::default() {
107+
while lock_file.archiver_id != self.id && lock_file.timestamp + LOCK_TIMEOUT.as_secs() > now {
108+
tokio::select! {
109+
_ = shutdown.changed() => {
110+
info!("Received shutdown signal, exiting wait_obtain_storage_lock");
111+
return;
112+
}
113+
_ = sleep(Duration::from_secs(OBTAIN_LOCK_RETRY_INTERVAL.load(Ordering::Relaxed))) => {
114+
lock_file_res = self.storage.read_lock_file().await;
115+
match lock_file_res {
116+
Ok(new_lock_file) => {
117+
lock_file = new_lock_file;
118+
now = std::time::SystemTime::now()
119+
.duration_since(std::time::UNIX_EPOCH)
120+
.unwrap()
121+
.as_secs();
122+
}
123+
Err(e) => {
124+
error!("Error reading lock file: {:#?}", e);
125+
panic!("Error reading lock file: {:#?}", e);
126+
}
127+
}
128+
}
129+
}
130+
}
131+
}
132+
133+
let written_res = self.storage.write_lock_file(&LockFile {
134+
archiver_id: lock_file.archiver_id.clone(),
135+
timestamp: now,
136+
}).await;
137+
138+
match written_res {
139+
Ok(_) => {
140+
info!("Obtained storage lock");
141+
}
142+
Err(e) => {
143+
error!("Error writing lock file: {:#?}", e);
144+
panic!("Error writing lock file: {:#?}", e);
145+
}
146+
}
147+
148+
let storage = self.storage.clone();
149+
let archiver_id = self.id.clone();
150+
let mut shutdown_clone = shutdown.clone();
151+
152+
tokio::spawn(async move {
153+
let mut ticket = interval(LOCK_UPDATE_INTERVAL);
154+
loop {
155+
tokio::select! {
156+
_ = shutdown_clone.changed() => {
157+
info!("Received shutdown signal, exiting lock update loop");
158+
break;
159+
}
160+
_ = ticket.tick() => {
161+
let now = std::time::SystemTime::now()
162+
.duration_since(std::time::UNIX_EPOCH)
163+
.unwrap()
164+
.as_secs();
165+
let written_res = storage.write_lock_file(&LockFile {
166+
archiver_id: archiver_id.clone(),
167+
timestamp: now,
168+
}).await;
169+
170+
if let Err(e) = written_res {
171+
error!("Error update lockfile timestamp: {:#?}", e);
172+
}
173+
}
174+
}
175+
}
176+
});
177+
}
178+
Err(e) => {
179+
error!("Error reading lock file: {:#?}", e);
180+
panic!("Error reading lock file: {:#?}", e);
37181
}
38-
return Ok(());
39182
}
183+
}
184+
185+
#[allow(dead_code)]
186+
async fn backfill_blobs(&self, latest: &BlockHeaderData) {
187+
let backfill_processes_res = self.storage.read_backfill_processes().await;
188+
189+
match backfill_processes_res {
190+
Ok(mut backfill_processes) => {
191+
let backfill_process = BackfillProcess {
192+
start_block: latest.clone(),
193+
current_block: latest.clone(),
194+
};
195+
backfill_processes.insert(latest.root, backfill_process);
196+
let _ = self.storage.write_backfill_processes(&backfill_processes).await;
40197

41-
Ok(())
198+
let mut processes = backfill_processes.clone();
199+
for (_, process) in backfill_processes.iter() {
200+
self.backfill_loop(&process.start_block, &process.current_block, &mut processes).await;
201+
}
202+
}
203+
Err(e) => {
204+
error!("Error reading backfill processes: {:#?}", e);
205+
panic!("Error reading backfill processes: {:#?}", e);
206+
}
207+
}
42208
}
209+
210+
#[allow(dead_code)]
211+
async fn backfill_loop(&self, start: &BlockHeaderData, current: &BlockHeaderData, backfill_processes: &mut BackfillProcesses) {
212+
let mut curr = current.clone();
213+
let mut already_exists = false;
214+
let mut count = 0;
215+
let mut res: Result<(BlockHeaderData, bool)>;
216+
info!("backfill process initiated, curr_hash: {:#?}, curr_slot: {:#?}, start_hash: {:#?},start_slot: {:#?}", curr.root, curr.header.message.slot.clone(), start.root, start.header.message.slot.clone());
217+
218+
while !already_exists {
219+
if curr.root == self.config.origin_block {
220+
info!("reached origin block, hash: {:#?}", curr.root);
221+
self.defer_fn(start, &curr, backfill_processes).await;
222+
return;
223+
}
224+
225+
res = self.persist_blobs_for_block(BlockId::Root(curr.header.message.parent_root), false).await;
226+
if let Err(e) = res {
227+
error!("failed to persist blobs for block, will retry: {:#?}, hash: {:#?}", e, curr.header.message.parent_root);
228+
sleep(BACKFILL_ERROR_RETRY_INTERVAL).await;
229+
continue;
230+
};
231+
232+
let (parent, parent_exists) = res.unwrap();
233+
curr = parent;
234+
already_exists = parent_exists;
235+
236+
if !already_exists {
237+
// todo: metrics
238+
}
239+
240+
count += 1;
241+
if count % 10 == 0 {
242+
let backfill_process = BackfillProcess {
243+
start_block: start.to_owned(),
244+
current_block: curr.clone(),
245+
};
246+
backfill_processes.insert(start.root, backfill_process);
247+
let _ = self.storage.write_backfill_processes(backfill_processes).await;
248+
}
249+
}
250+
self.defer_fn(start, &curr, backfill_processes).await;
251+
}
252+
253+
#[allow(dead_code)]
254+
async fn defer_fn(&self, start: &BlockHeaderData, current: &BlockHeaderData, backfill_processes: &mut BackfillProcesses) {
255+
info!("backfill process complete, end_hash: {:#?}, end_slot: {:#?}, start_hash: {:#?},start_slot: {:#?}", current.root, current.header.message.slot.clone(), start.root, start.header.message.slot.clone());
256+
backfill_processes.remove(&start.root);
257+
let _ = self.storage.write_backfill_processes(backfill_processes).await;
258+
}
259+
260+
// async fn process_blocks_until_known_block() {
261+
// debug!("refreshing live data");
262+
// let mut start: &BlockHeaderData;
263+
// let mut current_block_id = "head";
264+
//
265+
// loop {
266+
//
267+
// }
268+
//
269+
// }
43270
}
44271

45272
#[cfg(test)]
46273
mod tests {
274+
use std::path::PathBuf;
47275
use std::str::FromStr;
48276
use std::time::Duration;
49277

50278
use eth2::{SensitiveUrl, Timeouts};
51-
279+
use blob_archiver_storage::fs::FSStorage;
52280
use super::*;
53281

54282
#[tokio::test]
@@ -57,9 +285,11 @@ mod tests {
57285
SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(),
58286
Timeouts::set_all(Duration::from_secs(30)),
59287
);
60-
let archiver = Archiver::new(beacon_client);
288+
let storage = FSStorage::new(PathBuf::from("test_dir")).await.unwrap();
289+
let (_, rx) = tokio::sync::watch::channel(false);
290+
let archiver = Archiver::new(beacon_client, Arc::new(storage), rx);
61291

62292
let block_id = BlockId::Head;
63-
archiver.persist_blobs_for_block(block_id).await.unwrap();
293+
archiver.persist_blobs_for_block(block_id, false).await.unwrap();
64294
}
65295
}

bin/archiver/src/main.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
use std::path::PathBuf;
12
use std::str::FromStr;
3+
use std::sync::Arc;
24
use std::time::Duration;
35

46
use eth2::types::BlockId;
57
use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts};
6-
8+
use blob_archiver_storage::fs::FSStorage;
79
use crate::archiver::Archiver;
810

911
mod archiver;
@@ -14,12 +16,14 @@ async fn main() {
1416
SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(),
1517
Timeouts::set_all(Duration::from_secs(30)),
1618
);
17-
let archiver = Archiver::new(beacon_client);
19+
let storage = FSStorage::new(PathBuf::from("test_dir")).await.unwrap();
20+
let (_, shutdown_rx) = tokio::sync::watch::channel(false);
21+
let archiver = Archiver::new(beacon_client, Arc::new(storage), shutdown_rx);
1822

1923
let block_id = BlockId::Head;
2024

2125
archiver
22-
.persist_blobs_for_block(block_id)
26+
.persist_blobs_for_block(block_id, false)
2327
.await
2428
.expect("TODO: panic message");
2529
}

0 commit comments

Comments
 (0)