Skip to content

Commit 559d7cb

Browse files
committed
api server
1 parent 7eb4ccd commit 559d7cb

File tree

2 files changed

+247
-8
lines changed

2 files changed

+247
-8
lines changed

bin/api/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ exclude.workspace = true
1212
eth2.workspace = true
1313

1414
clap.workspace = true
15+
ctrlc.workspace = true
1516
futures-util.workspace = true
1617
reqwest.workspace = true
1718
serde_json.workspace = true
@@ -21,6 +22,7 @@ tokio.workspace = true
2122
eyre.workspace = true
2223
again.workspace = true
2324
hex.workspace = true
25+
tracing-appender.workspace = true
2426
tracing-subscriber.workspace = true
2527
uuid.workspace = true
2628
warp.workspace = true

bin/api/src/main.rs

Lines changed: 245 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,260 @@
1+
use std::fs;
2+
use std::path::PathBuf;
3+
use std::str::FromStr;
4+
use std::sync::Arc;
5+
use std::time::Duration;
6+
7+
use clap::Parser;
8+
use ctrlc::set_handler;
9+
use eth2::types::Hash256;
10+
use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts};
11+
use serde::{Deserialize, Serialize};
12+
use tokio::sync::Mutex;
13+
use tracing_appender::rolling::{RollingFileAppender, Rotation};
14+
use tracing_subscriber::layer::SubscriberExt;
15+
use tracing_subscriber::util::SubscriberInitExt;
16+
use tracing_subscriber::{fmt, EnvFilter};
17+
18+
use blob_archiver_beacon::beacon_client;
19+
use blob_archiver_beacon::beacon_client::BeaconClientEth2;
20+
use blob_archiver_storage::fs::FSStorage;
21+
use blob_archiver_storage::s3::{S3Config, S3Storage};
22+
use blob_archiver_storage::storage;
23+
use blob_archiver_storage::storage::{Storage, StorageType};
24+
125
mod api;
226

327
#[allow(dead_code)]
428
static INIT: std::sync::Once = std::sync::Once::new();
529

6-
use clap::Parser;
30+
#[tokio::main]
31+
async fn main() {
32+
let args = CliArgs::parse();
33+
34+
let config: Config = args.to_config();
35+
init_logging(
36+
config.log_config.verbose,
37+
config.log_config.log_dir.clone(),
38+
config
39+
.log_config
40+
.log_rotation
41+
.clone()
42+
.map(|s| to_rotation(s.as_str())),
43+
);
44+
let beacon_client = BeaconNodeHttpClient::new(
45+
SensitiveUrl::from_str(config.beacon_config.beacon_endpoint.as_str()).unwrap(),
46+
Timeouts::set_all(config.beacon_config.beacon_client_timeout),
47+
);
48+
let storage: Arc<Mutex<dyn Storage>> = if config.storage_config.storage_type == StorageType::FS
49+
{
50+
Arc::new(Mutex::new(
51+
FSStorage::new(config.storage_config.fs_dir.clone().unwrap())
52+
.await
53+
.unwrap(),
54+
))
55+
} else {
56+
Arc::new(Mutex::new(
57+
S3Storage::new(config.storage_config.s3_config.clone().unwrap())
58+
.await
59+
.unwrap(),
60+
))
61+
};
62+
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
63+
set_handler(move || {
64+
tracing::info!("shutting down");
65+
shutdown_tx
66+
.send(true)
67+
.expect("could not send shutdown signal");
68+
})
69+
.expect("could not register shutdown handler");
70+
71+
let beacon_client_eth2 = BeaconClientEth2 { beacon_client };
72+
let api = api::Api::new(Arc::new(Mutex::new(beacon_client_eth2)), storage.clone());
73+
let addr: std::net::SocketAddr = config.listen_addr.parse().expect("Invalid listen address");
74+
75+
let (_, server) = warp::serve(api.routes())
76+
.bind_with_graceful_shutdown(addr, async move {
77+
shutdown_rx.clone().changed().await.ok();
78+
});
79+
server.await;
80+
81+
}
82+
83+
fn init_logging(verbose: u8, log_dir: Option<PathBuf>, rotation: Option<Rotation>) {
84+
INIT.call_once(|| {
85+
setup_tracing(verbose, log_dir, rotation).expect("Failed to setup tracing");
86+
});
87+
}
88+
89+
#[allow(dead_code)]
90+
pub fn setup_tracing(
91+
verbose: u8,
92+
log_dir: Option<PathBuf>,
93+
rotation: Option<Rotation>,
94+
) -> eyre::Result<()> {
95+
let filter = match verbose {
96+
0 => EnvFilter::new("error"),
97+
1 => EnvFilter::new("warn"),
98+
2 => EnvFilter::new("info"),
99+
3 => EnvFilter::new("debug"),
100+
_ => EnvFilter::new("trace"),
101+
};
102+
103+
let subscriber = tracing_subscriber::registry()
104+
.with(EnvFilter::from_default_env())
105+
.with(filter);
106+
107+
if let Some(log_dir) = log_dir {
108+
fs::create_dir_all(&log_dir)
109+
.map_err(|e| eyre::eyre!("Failed to create log directory: {}", e))?;
7110

8-
fn main() {
9-
println!("Hello, world!");
111+
let file_appender = RollingFileAppender::new(
112+
rotation.unwrap_or(Rotation::DAILY),
113+
log_dir,
114+
"blob-archiver.log",
115+
);
116+
let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender);
117+
let file_layer = fmt::layer().with_writer(non_blocking).with_ansi(false);
118+
119+
subscriber
120+
.with(file_layer)
121+
.with(fmt::layer().with_writer(std::io::stdout))
122+
.try_init()?;
123+
} else {
124+
subscriber
125+
.with(fmt::layer().with_writer(std::io::stdout))
126+
.try_init()?;
127+
}
128+
129+
Ok(())
10130
}
11131

12132
#[derive(Parser, Debug)]
13133
#[clap(author, version, about, long_about = None)]
14134
struct CliArgs {
15-
#[clap(short, long, value_parser, default_value = "config.toml")]
16-
config: String,
135+
#[clap(short = 'v', long, action = clap::ArgAction::Count, default_value = "4")]
136+
verbose: u8,
137+
138+
#[clap(long)]
139+
log_dir: Option<String>,
140+
141+
#[clap(long, help = "Log rotation values: DAILY, HOURLY, MINUTELY, NEVER")]
142+
log_rotation: Option<String>,
143+
144+
#[clap(long, required = true)]
145+
beacon_endpoint: String,
146+
147+
#[clap(long, default_value = "10")]
148+
beacon_client_timeout: u64,
149+
150+
#[clap(long, default_value = "6")]
151+
poll_interval: u64,
152+
153+
#[clap(long, default_value = "0.0.0.0:8000")]
154+
listen_addr: String,
17155

18-
#[clap(short, long, action = clap::ArgAction::Count)]
156+
#[clap(long, required = true)]
157+
origin_block: String,
158+
159+
#[clap(long, default_value = "s3")]
160+
storage_type: String,
161+
162+
#[clap(long)]
163+
s3_endpoint: Option<String>,
164+
165+
#[clap(long)]
166+
s3_bucket: Option<String>,
167+
168+
#[clap(long)]
169+
s3_path: Option<String>,
170+
171+
#[clap(long, default_value = "false")]
172+
s3_compress: Option<bool>,
173+
#[clap(long)]
174+
fs_dir: Option<String>,
175+
}
176+
177+
impl CliArgs {
178+
fn to_config(&self) -> Config {
179+
let log_config = LogConfig {
180+
log_dir: self.log_dir.as_ref().map(PathBuf::from),
181+
log_rotation: self.log_rotation.clone(),
182+
verbose: self.verbose,
183+
};
184+
185+
let beacon_config = beacon_client::Config {
186+
beacon_endpoint: self.beacon_endpoint.clone(),
187+
beacon_client_timeout: Duration::from_secs(self.beacon_client_timeout),
188+
};
189+
190+
let storage_type = StorageType::from_str(self.storage_type.as_str()).unwrap();
191+
192+
let s3_config = if storage_type == StorageType::S3 {
193+
Some(S3Config {
194+
endpoint: self.s3_endpoint.clone().unwrap(),
195+
bucket: self.s3_bucket.clone().unwrap(),
196+
path: self.s3_path.clone().unwrap(),
197+
compression: self.s3_compress.unwrap(),
198+
})
199+
} else {
200+
None
201+
};
202+
203+
let fs_dir = self.fs_dir.as_ref().map(PathBuf::from);
204+
205+
let storage_config = storage::Config {
206+
storage_type,
207+
s3_config,
208+
fs_dir,
209+
};
210+
211+
let mut padded_hex = self
212+
.origin_block
213+
.strip_prefix("0x")
214+
.unwrap_or(&self.origin_block)
215+
.to_string();
216+
padded_hex = format!("{:0<64}", padded_hex);
217+
let origin_block = Hash256::from_slice(&hex::decode(padded_hex).expect("Invalid hex"));
218+
219+
Config {
220+
poll_interval: Duration::from_secs(self.poll_interval),
221+
listen_addr: self.listen_addr.clone(),
222+
origin_block,
223+
beacon_config,
224+
storage_config,
225+
log_config,
226+
}
227+
}
228+
}
229+
230+
#[allow(dead_code)]
231+
fn to_rotation(s: &str) -> Rotation {
232+
match s {
233+
"DAILY" => Rotation::DAILY,
234+
"HOURLY" => Rotation::HOURLY,
235+
"MINUTELY" => Rotation::MINUTELY,
236+
_ => Rotation::NEVER,
237+
}
238+
}
239+
240+
#[derive(Debug, PartialEq, Eq, Clone, Default, Serialize, Deserialize)]
241+
pub struct LogConfig {
242+
log_dir: Option<PathBuf>,
243+
log_rotation: Option<String>,
19244
verbose: u8,
245+
}
246+
247+
#[derive(Debug, PartialEq, Eq, Clone, Default, Serialize, Deserialize)]
248+
pub struct Config {
249+
pub poll_interval: Duration,
250+
251+
pub listen_addr: String,
252+
253+
pub origin_block: Hash256,
254+
255+
pub beacon_config: beacon_client::Config,
256+
257+
pub storage_config: storage::Config,
20258

21-
#[clap(short, long)]
22-
dry_run: bool,
259+
pub log_config: LogConfig,
23260
}

0 commit comments

Comments
 (0)