Skip to content

Commit 82ea105

Browse files
committed
feat:add beacon test stub
Signed-off-by: Chen Kai <[email protected]>
1 parent 5862b17 commit 82ea105

File tree

9 files changed

+285
-11
lines changed

9 files changed

+285
-11
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[workspace]
2-
members = ["bin/archiver", "bin/api", "crates/config", "crates/storage"]
2+
members = ["bin/archiver", "bin/api", "crates/config", "crates/storage", "crates/beacon"]
33
default-members = ["bin/archiver", "bin/api"]
44
resolver = "2"
55

@@ -28,4 +28,6 @@ spin = { version = "0.9.8", features = ["mutex"] }
2828
aws-config = { version = "1.5.4", features = ["behavior-version-latest"] }
2929
aws-sdk-s3 = "1.41.0"
3030
tokio = { version = "1.38.1", features = ["full"] }
31-
again = "0.1.2"
31+
again = "0.1.2"
32+
rand = "0.8.5"
33+
once_cell = "1.19.0"

bin/archiver/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@ tokio.workspace = true
2121
eyre.workspace = true
2222
again.workspace = true
2323
blob-archiver-storage = { path = "../../crates/storage" }
24+
blob-archiver-beacon = { path = "../../crates/beacon" }
2425

bin/archiver/src/archiver.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ use again::RetryPolicy;
22
use blob_archiver_storage::{
33
BackfillProcess, BackfillProcesses, BlobData, BlobSidecars, Header, LockFile, Storage,
44
};
5-
use eth2::types::{BlockHeaderData, BlockId, Hash256, MainnetEthSpec};
6-
use eth2::BeaconNodeHttpClient;
5+
use eth2::types::{BlockHeaderData, BlockId, Hash256};
76
use eyre::Result;
87
use serde::{Deserialize, Serialize};
98
use std::sync::atomic::{AtomicU64, Ordering};
@@ -12,6 +11,7 @@ use std::time::Duration;
1211
use tokio::sync::watch::Receiver;
1312
use tokio::time::{interval, sleep};
1413
use tracing::log::{debug, error, info, trace};
14+
use blob_archiver_beacon::beacon_client::BeaconClient;
1515

1616
#[allow(dead_code)]
1717
const LIVE_FETCH_BLOB_MAXIMUM_RETRIES: usize = 10;
@@ -40,7 +40,7 @@ pub struct Config {
4040
}
4141

4242
pub struct Archiver {
43-
pub beacon_client: BeaconNodeHttpClient,
43+
pub beacon_client: Arc<dyn BeaconClient>,
4444

4545
storage: Arc<dyn Storage>,
4646
#[allow(dead_code)]
@@ -53,7 +53,7 @@ pub struct Archiver {
5353

5454
impl Archiver {
5555
pub fn new(
56-
beacon_client: BeaconNodeHttpClient,
56+
beacon_client: Arc<dyn BeaconClient>,
5757
storage: Arc<dyn Storage>,
5858
shutdown_rx: Receiver<bool>,
5959
) -> Self {
@@ -88,7 +88,7 @@ impl Archiver {
8888

8989
let blobs_resp_opt = self
9090
.beacon_client
91-
.get_blobs::<MainnetEthSpec>(BlockId::Root(header.data.root), None)
91+
.get_blobs(BlockId::Root(header.data.root), None)
9292
.await
9393
.map_err(|e| eyre::eyre!(e))?;
9494
if let Some(blob_sidecars) = blobs_resp_opt {
@@ -228,7 +228,7 @@ impl Archiver {
228228
&process.current_block,
229229
&mut processes,
230230
)
231-
.await;
231+
.await;
232232
}
233233
}
234234
Err(e) => {
@@ -249,9 +249,15 @@ impl Archiver {
249249
let mut already_exists = false;
250250
let mut count = 0;
251251
let mut res: Result<(BlockHeaderData, bool)>;
252+
let shutdown_rx = self.shutdown_rx.clone();
252253
info!("backfill process initiated, curr_hash: {:#?}, curr_slot: {:#?}, start_hash: {:#?},start_slot: {:#?}", curr.root, curr.header.message.slot.clone(), start.root, start.header.message.slot.clone());
253254

254255
while !already_exists {
256+
if *shutdown_rx.borrow() {
257+
info!("Shutdown signal received, breaking backfill loop");
258+
return;
259+
}
260+
255261
if curr.root == self.config.origin_block {
256262
info!("reached origin block, hash: {:#?}", curr.root);
257263
self.defer_fn(start, &curr, backfill_processes).await;
@@ -380,7 +386,8 @@ mod tests {
380386

381387
use super::*;
382388
use blob_archiver_storage::fs::FSStorage;
383-
use eth2::{SensitiveUrl, Timeouts};
389+
use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts};
390+
use blob_archiver_beacon::beacon_client::BeaconClientEth2;
384391

385392
#[tokio::test]
386393
async fn test_persist_blobs_for_block() {
@@ -392,7 +399,10 @@ mod tests {
392399
let storage = FSStorage::new(dir.clone()).await.unwrap();
393400
tokio::fs::create_dir_all(dir).await.unwrap();
394401
let (_, rx) = tokio::sync::watch::channel(false);
395-
let archiver = Archiver::new(beacon_client, Arc::new(storage), rx);
402+
let beacon_client_eth2 = BeaconClientEth2 {
403+
beacon_client,
404+
};
405+
let archiver = Archiver::new(Arc::new(beacon_client_eth2), Arc::new(storage), rx);
396406

397407
let block_id = BlockId::Head;
398408
archiver

bin/archiver/src/main.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::archiver::Archiver;
77
use blob_archiver_storage::fs::FSStorage;
88
use eth2::types::BlockId;
99
use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts};
10+
use blob_archiver_beacon::beacon_client::BeaconClientEth2;
1011

1112
mod archiver;
1213

@@ -18,7 +19,10 @@ async fn main() {
1819
);
1920
let storage = FSStorage::new(PathBuf::from("test_dir")).await.unwrap();
2021
let (_, shutdown_rx) = tokio::sync::watch::channel(false);
21-
let archiver = Archiver::new(beacon_client, Arc::new(storage), shutdown_rx);
22+
let beacon_client_eth2 = BeaconClientEth2 {
23+
beacon_client,
24+
};
25+
let archiver = Archiver::new(Arc::new(beacon_client_eth2), Arc::new(storage), shutdown_rx);
2226

2327
let block_id = BlockId::Head;
2428

crates/beacon/Cargo.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[package]
2+
name = "blob-archiver-beacon"
3+
version = "0.1.0"
4+
edition.workspace = true
5+
license.workspace = true
6+
authors.workspace = true
7+
repository.workspace = true
8+
homepage.workspace = true
9+
exclude.workspace = true
10+
11+
[dependencies]
12+
async-trait.workspace = true
13+
eth2.workspace = true
14+
rand.workspace = true
15+
once_cell.workspace = true

crates/beacon/src/beacon_client.rs

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
use std::collections::HashMap;
2+
3+
use async_trait::async_trait;
4+
use eth2::{BeaconNodeHttpClient, Error};
5+
use eth2::types::{BeaconBlockHeader, BlobSidecarList, BlockHeaderAndSignature, BlockHeaderData, BlockId, EthSpec, ExecutionOptimisticFinalizedResponse, GenericResponse, Hash256, MainnetEthSpec, SignatureBytes, Slot};
6+
7+
use crate::blob_test_helper::{FIVE, FOUR, new_blob_sidecars, ONE, ORIGIN_BLOCK, START_SLOT, THREE, TWO};
8+
9+
#[async_trait]
10+
pub trait BeaconClient {
11+
async fn get_beacon_headers_block_id(
12+
&self,
13+
block_id: BlockId,
14+
) -> Result<Option<ExecutionOptimisticFinalizedResponse<BlockHeaderData>>, Error>;
15+
16+
async fn get_blobs(
17+
&self,
18+
block_id: BlockId,
19+
indices: Option<&[u64]>,
20+
) -> Result<Option<GenericResponse<BlobSidecarList<MainnetEthSpec>>>, Error>;
21+
}
22+
23+
pub struct BeaconClientStub<E: EthSpec> {
24+
pub headers: HashMap<String, BlockHeaderData>,
25+
pub blobs: HashMap<String, BlobSidecarList<E>>,
26+
}
27+
28+
#[async_trait]
29+
impl BeaconClient for BeaconClientStub<MainnetEthSpec> {
30+
async fn get_beacon_headers_block_id(&self, block_id: BlockId) -> Result<Option<ExecutionOptimisticFinalizedResponse<BlockHeaderData>>, Error> {
31+
let header = self.headers.get(block_id.to_string().as_str());
32+
33+
Ok(header.map(|h| ExecutionOptimisticFinalizedResponse {
34+
execution_optimistic: Some(true),
35+
finalized: Some(true),
36+
data: h.clone(),
37+
}))
38+
}
39+
40+
async fn get_blobs(&self, block_id: BlockId, _indices: Option<&[u64]>) -> Result<Option<GenericResponse<BlobSidecarList<MainnetEthSpec>>>, Error> {
41+
let blobs = self.blobs.get(block_id.to_string().as_str());
42+
43+
Ok(blobs.map(|b| GenericResponse {
44+
data: b.clone(),
45+
}))
46+
}
47+
}
48+
49+
impl Default for BeaconClientStub<MainnetEthSpec> {
50+
fn default() -> Self {
51+
let make_header = |slot: Slot, hash: Hash256, parent: Hash256| -> BlockHeaderData {
52+
BlockHeaderData {
53+
root: hash,
54+
canonical: false,
55+
header: BlockHeaderAndSignature {
56+
message: BeaconBlockHeader {
57+
slot,
58+
proposer_index: 0,
59+
parent_root: parent,
60+
state_root: Hash256::default(),
61+
body_root: Hash256::default(),
62+
},
63+
signature: SignatureBytes::empty(),
64+
},
65+
}
66+
};
67+
68+
let start_slot: Slot = START_SLOT;
69+
let origin_blobs = new_blob_sidecars(1);
70+
let one_blobs = new_blob_sidecars(2);
71+
let two_blobs = new_blob_sidecars(0);
72+
let three_blobs = new_blob_sidecars(4);
73+
let four_blobs = new_blob_sidecars(5);
74+
let five_blobs = new_blob_sidecars(6);
75+
76+
BeaconClientStub {
77+
headers: HashMap::from([
78+
(
79+
ORIGIN_BLOCK.to_string(),
80+
make_header(start_slot, *ORIGIN_BLOCK, Hash256::from_slice(&[9, 9, 9])),
81+
),
82+
(
83+
ONE.to_string(),
84+
make_header(start_slot + 1, *ONE, *ORIGIN_BLOCK),
85+
),
86+
(
87+
TWO.to_string(),
88+
make_header(start_slot + 2, *TWO, *ONE),
89+
),
90+
(
91+
THREE.to_string(),
92+
make_header(start_slot + 3, *THREE, *TWO),
93+
),
94+
(
95+
FOUR.to_string(),
96+
make_header(start_slot + 4, *FOUR, *THREE),
97+
),
98+
(
99+
FIVE.to_string(),
100+
make_header(start_slot + 5, *FIVE, *FOUR),
101+
),
102+
(
103+
"head".to_string(),
104+
make_header(start_slot + 5, *FIVE, *FOUR),
105+
),
106+
(
107+
"finalized".to_string(),
108+
make_header(start_slot + 3, *THREE, *TWO),
109+
),
110+
(
111+
start_slot.as_u64().to_string(),
112+
make_header(start_slot, *ORIGIN_BLOCK, Hash256::from_slice(&[9, 9, 9])),
113+
),
114+
(
115+
(start_slot + 1).as_u64().to_string(),
116+
make_header(start_slot + 1, *ONE, *ORIGIN_BLOCK),
117+
),
118+
(
119+
(start_slot + 2).as_u64().to_string(),
120+
make_header(start_slot + 2, *TWO, *ONE),
121+
),
122+
(
123+
(start_slot + 3).as_u64().to_string(),
124+
make_header(start_slot + 3, *THREE, *TWO),
125+
),
126+
(
127+
(start_slot + 4).as_u64().to_string(),
128+
make_header(start_slot + 4, *FOUR, *THREE),
129+
),
130+
(
131+
(start_slot + 5).as_u64().to_string(),
132+
make_header(start_slot + 5, *FIVE, *FOUR),
133+
),
134+
]),
135+
136+
blobs: HashMap::from([
137+
(ORIGIN_BLOCK.to_string(),
138+
origin_blobs.clone()),
139+
(ONE.to_string(),
140+
one_blobs.clone()),
141+
(TWO.to_string(),
142+
two_blobs.clone()),
143+
(THREE.to_string(),
144+
three_blobs.clone()),
145+
(FOUR.to_string(),
146+
four_blobs.clone()),
147+
(FIVE.to_string(),
148+
five_blobs.clone()),
149+
("head".to_string(),
150+
five_blobs.clone()),
151+
("finalized".to_string(),
152+
three_blobs.clone()),
153+
(start_slot.as_u64().to_string(),
154+
origin_blobs.clone()),
155+
((start_slot + 1).as_u64().to_string(),
156+
one_blobs.clone()),
157+
((start_slot + 2).as_u64().to_string(),
158+
two_blobs.clone()),
159+
((start_slot + 3).as_u64().to_string(),
160+
three_blobs.clone()),
161+
((start_slot + 4).as_u64().to_string(),
162+
four_blobs.clone()),
163+
((start_slot + 5).as_u64().to_string(),
164+
five_blobs.clone()),
165+
]),
166+
}
167+
}
168+
}
169+
170+
impl BeaconClientStub<MainnetEthSpec> {
171+
pub fn new() -> Self {
172+
Self {
173+
headers: HashMap::new(),
174+
blobs: HashMap::new(),
175+
}
176+
}
177+
}
178+
179+
pub struct BeaconClientEth2 {
180+
pub beacon_client: BeaconNodeHttpClient,
181+
}
182+
183+
#[async_trait]
184+
impl BeaconClient for BeaconClientEth2 {
185+
async fn get_beacon_headers_block_id(&self, block_id: BlockId) -> Result<Option<ExecutionOptimisticFinalizedResponse<BlockHeaderData>>, Error> {
186+
return self.beacon_client.get_beacon_headers_block_id(block_id).await;
187+
}
188+
189+
async fn get_blobs(&self, block_id: BlockId, indices: Option<&[u64]>) -> Result<Option<GenericResponse<BlobSidecarList<MainnetEthSpec>>>, Error> {
190+
return self.beacon_client.get_blobs(block_id, indices).await;
191+
}
192+
}

0 commit comments

Comments
 (0)