Skip to content

Commit 719c352

Browse files
committed
feat:refactor
Signed-off-by: Chen Kai <[email protected]>
1 parent 13c668d commit 719c352

File tree

10 files changed

+507
-85
lines changed

10 files changed

+507
-85
lines changed

.github/workflows/rust.yml

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,16 @@ jobs:
1616

1717
steps:
1818
- uses: actions/checkout@v4
19+
- name: Install Rust stable toolchain
20+
uses: dtolnay/rust-toolchain@nightly
21+
- uses: Swatinem/rust-cache@v2
22+
with:
23+
cache-on-failure: true
24+
- name: Lint
25+
run: cargo clippy --workspace --all-targets --all-features -- -D warnings
1926
- name: Build
20-
run: cargo build --verbose
27+
run: cargo build --workspace --all-targets --all-features
28+
- name: Format
29+
run: cargo fmt --all -- --check
2130
- name: Run tests
22-
run: cargo test --verbose
31+
run: cargo test --workspace --all-features --all-targets --locked

bin/archiver/src/archiver.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
use eth2::{BeaconNodeHttpClient, Error};
1+
use blob_archiver_storage::{BlobData, BlobSidecars, Header};
22
use eth2::types::{BlockId, MainnetEthSpec};
3+
use eth2::{BeaconNodeHttpClient, Error};
34
use tracing::log::trace;
4-
use blob_archiver_storage::{BlobData, BlobSidecars, Header};
55

66
pub struct Archiver {
77
pub beacon_client: BeaconNodeHttpClient,
@@ -13,13 +13,25 @@ impl Archiver {
1313
}
1414

1515
pub async fn persist_blobs_for_block(&self, block_id: BlockId) -> Result<(), Error> {
16-
let header_resp_opt = self.beacon_client.get_beacon_headers_block_id(block_id).await?;
16+
let header_resp_opt = self
17+
.beacon_client
18+
.get_beacon_headers_block_id(block_id)
19+
.await?;
1720
if let Some(header) = header_resp_opt {
1821
let beacon_client = self.beacon_client.clone();
19-
let blobs_resp_opt = beacon_client.get_blobs::<MainnetEthSpec>(BlockId::Root(header.data.root), None).await?;
22+
let blobs_resp_opt = beacon_client
23+
.get_blobs::<MainnetEthSpec>(BlockId::Root(header.data.root), None)
24+
.await?;
2025
if let Some(blob_sidecars) = blobs_resp_opt {
2126
let blob_sidecar_list = blob_sidecars.data;
22-
let blob_data = BlobData::new(Header { beacon_block_hash: header.data.root }, BlobSidecars { data: blob_sidecar_list });
27+
let blob_data = BlobData::new(
28+
Header {
29+
beacon_block_hash: header.data.root,
30+
},
31+
BlobSidecars {
32+
data: blob_sidecar_list,
33+
},
34+
);
2335
trace!("Persisting blobs for block: {:?}", blob_data);
2436
return Ok(());
2537
}
@@ -41,10 +53,13 @@ mod tests {
4153

4254
#[tokio::test]
4355
async fn test_persist_blobs_for_block() {
44-
let beacon_client = BeaconNodeHttpClient::new(SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(), Timeouts::set_all(Duration::from_secs(30)));
56+
let beacon_client = BeaconNodeHttpClient::new(
57+
SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(),
58+
Timeouts::set_all(Duration::from_secs(30)),
59+
);
4560
let archiver = Archiver::new(beacon_client);
4661

4762
let block_id = BlockId::Head;
4863
archiver.persist_blobs_for_block(block_id).await.unwrap();
4964
}
50-
}
65+
}

bin/archiver/src/main.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,25 @@
11
use std::str::FromStr;
22
use std::time::Duration;
33

4-
use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts};
54
use eth2::types::BlockId;
5+
use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts};
66

77
use crate::archiver::Archiver;
88

99
mod archiver;
1010

1111
#[tokio::main]
1212
async fn main() {
13-
let beacon_client = BeaconNodeHttpClient::new(SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(), Timeouts::set_all(Duration::from_secs(30)));
13+
let beacon_client = BeaconNodeHttpClient::new(
14+
SensitiveUrl::from_str("https://ethereum-beacon-api.publicnode.com").unwrap(),
15+
Timeouts::set_all(Duration::from_secs(30)),
16+
);
1417
let archiver = Archiver::new(beacon_client);
1518

1619
let block_id = BlockId::Head;
1720

18-
archiver.persist_blobs_for_block(block_id).await.expect("TODO: panic message");
21+
archiver
22+
.persist_blobs_for_block(block_id)
23+
.await
24+
.expect("TODO: panic message");
1925
}

crates/config/src/config.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
use std::str::FromStr;
21
use std::time::Duration;
2+
33
use eth2::types::Hash256;
4-
use anyhow::Result;
54

65
#[derive(Debug, Clone, Default, PartialEq, Eq)]
76
pub struct BeaconConfig {
@@ -16,4 +15,4 @@ pub struct ArchiverConfig {
1615
pub beacon: BeaconConfig,
1716
pub poll_interval: Duration,
1817
pub origin_block: Hash256,
19-
}
18+
}

crates/config/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
pub mod config;
22

3-
pub use config::{ArchiverConfig};
3+
pub use config::ArchiverConfig;

crates/storage/src/fs.rs

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
use std::path::PathBuf;
2+
3+
use async_trait::async_trait;
4+
use eth2::types::Hash256;
5+
use eyre::Result;
6+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
7+
8+
use crate::storage::{BackfillProcesses, BACKFILL_LOCK};
9+
use crate::{BlobData, LockFile, StorageReader, StorageWriter};
10+
11+
pub struct FSStorage {
12+
pub(crate) dir: PathBuf,
13+
}
14+
15+
impl FSStorage {
16+
pub async fn new(dir: PathBuf) -> Result<Self> {
17+
Ok(Self { dir })
18+
}
19+
}
20+
21+
#[async_trait]
22+
impl StorageReader for FSStorage {
23+
async fn read_blob_data(&self, hash: Hash256) -> Result<BlobData> {
24+
let path = self.dir.join(format!("{:x}", hash));
25+
let mut file = tokio::fs::File::open(path).await?;
26+
let mut data = Vec::new();
27+
file.read_to_end(&mut data).await?;
28+
Ok(serde_json::from_slice(&data)?)
29+
}
30+
31+
async fn exists(&self, hash: Hash256) -> bool {
32+
self.dir.join(format!("{:x}", hash)).exists()
33+
}
34+
35+
async fn read_lock_file(&self) -> Result<LockFile> {
36+
let path = self.dir.join("lockfile");
37+
let mut file = tokio::fs::File::open(path).await?;
38+
let mut data = Vec::new();
39+
file.read_to_end(&mut data).await?;
40+
Ok(serde_json::from_slice(&data)?)
41+
}
42+
43+
async fn read_backfill_processes(&self) -> Result<BackfillProcesses> {
44+
BACKFILL_LOCK.lock();
45+
let path = self.dir.join("backfill_processes");
46+
let mut file = tokio::fs::File::open(path).await?;
47+
let mut data = Vec::new();
48+
file.read_to_end(&mut data).await?;
49+
Ok(serde_json::from_slice(&data)?)
50+
}
51+
}
52+
53+
#[async_trait]
54+
impl StorageWriter for FSStorage {
55+
async fn write_blob_data(&self, blob_data: BlobData) -> Result<()> {
56+
let path = self
57+
.dir
58+
.join(format!("{:x}", blob_data.header.beacon_block_hash));
59+
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
60+
let mut file = tokio::fs::File::create(path).await?;
61+
file.write_all(&serde_json::to_vec(&blob_data)?).await?;
62+
Ok(())
63+
}
64+
65+
async fn write_lock_file(&self, lock_file: LockFile) -> Result<()> {
66+
let path = self.dir.join("lockfile");
67+
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
68+
let mut file = tokio::fs::File::create(path).await?;
69+
file.write_all(&serde_json::to_vec(&lock_file)?).await?;
70+
Ok(())
71+
}
72+
73+
async fn write_backfill_process(&self, backfill_process: BackfillProcesses) -> Result<()> {
74+
BACKFILL_LOCK.lock();
75+
let path = self.dir.join("backfill_processes");
76+
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
77+
let mut file = tokio::fs::File::create(path).await?;
78+
file.write_all(&serde_json::to_vec(&backfill_process)?)
79+
.await?;
80+
Ok(())
81+
}
82+
}
83+
84+
#[cfg(test)]
85+
mod tests {
86+
use crate::storage::{
87+
create_test_blob_data, create_test_lock_file, create_test_test_backfill_processes,
88+
};
89+
use tokio::io;
90+
91+
use super::*;
92+
93+
#[tokio::test]
94+
async fn test_fs_storage() {
95+
let storage = FSStorage::new(PathBuf::from("test_dir")).await.unwrap();
96+
tokio::fs::create_dir_all(&storage.dir).await.unwrap();
97+
let blob_data = create_test_blob_data();
98+
assert!(storage
99+
.read_blob_data(blob_data.header.beacon_block_hash)
100+
.await
101+
.is_err_and(|e| e.downcast_ref::<io::Error>().is_some()));
102+
storage.write_blob_data(blob_data.clone()).await.unwrap();
103+
assert_eq!(
104+
storage
105+
.read_blob_data(blob_data.header.beacon_block_hash)
106+
.await
107+
.unwrap(),
108+
blob_data
109+
);
110+
let lock_file = create_test_lock_file();
111+
assert!(storage
112+
.read_lock_file()
113+
.await
114+
.is_err_and(|e| e.downcast_ref::<io::Error>().is_some()));
115+
storage.write_lock_file(lock_file.clone()).await.unwrap();
116+
assert_eq!(storage.read_lock_file().await.unwrap(), lock_file);
117+
let test_backfill_processes = create_test_test_backfill_processes();
118+
assert!(storage
119+
.read_backfill_processes()
120+
.await
121+
.is_err_and(|e| e.downcast_ref::<io::Error>().is_some()));
122+
storage
123+
.write_backfill_process(test_backfill_processes.clone())
124+
.await
125+
.unwrap();
126+
assert_eq!(
127+
storage.read_backfill_processes().await.unwrap(),
128+
test_backfill_processes
129+
);
130+
clean_dir(&storage.dir);
131+
}
132+
133+
#[tokio::test]
134+
async fn test_fs_storage_exists() {
135+
let storage = FSStorage::new(PathBuf::from("test_dir")).await.unwrap();
136+
tokio::fs::create_dir_all(&storage.dir).await.unwrap();
137+
let blob_data = create_test_blob_data();
138+
assert!(!storage.exists(blob_data.header.beacon_block_hash).await);
139+
storage.write_blob_data(blob_data.clone()).await.unwrap();
140+
assert!(storage.exists(blob_data.header.beacon_block_hash).await);
141+
clean_dir(&storage.dir);
142+
}
143+
144+
fn clean_dir(dir: &PathBuf) {
145+
if dir.exists() {
146+
std::fs::remove_dir_all(dir).unwrap();
147+
}
148+
}
149+
}

crates/storage/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
extern crate core;
22

3-
pub mod storage;
3+
mod fs;
44
mod s3;
5+
pub mod storage;
56

67
pub use storage::*;
78

0 commit comments

Comments
 (0)