Skip to content

Commit 65616b0

Browse files
rsky-relay: resolver/verify
1 parent e06a421 commit 65616b0

File tree

19 files changed

+695
-36
lines changed

19 files changed

+695
-36
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[workspace]
2-
members = [ "rsky-common", "rsky-crypto","rsky-feedgen", "rsky-firehose", "rsky-identity", "rsky-labeler", "rsky-lexicon", "rsky-pds", "rsky-syntax", "rsky-jetstream-subscriber", "rsky-repo", "cypher/backend", "cypher/frontend", "rsky-satnav"]
2+
members = ["rsky-common", "rsky-crypto", "rsky-identity", "rsky-lexicon", "rsky-relay", "rsky-satnav"]
33
resolver = "2"
44

55
[workspace.dependencies]

rsky-common/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ pub fn get_handle(doc: &DidDocument) -> Option<String> {
9191

9292
pub fn get_verification_material(
9393
doc: &DidDocument,
94-
key_id: &String,
94+
key_id: &str,
9595
) -> Option<VerificationMaterial> {
9696
let did = get_did(doc);
9797
let keys = &doc.verification_method;

rsky-identity/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ documentation = "https://docs.rs/rsky-identity"
1212

1313
[dependencies]
1414
anyhow = "1.0.82"
15-
reqwest = { version = "0.12.3", features = ["json","blocking"] }
15+
reqwest = { version = "0.12", default-features = false, features = ["gzip", "hickory-dns", "http2", "json", "rustls-tls-webpki-roots-no-provider"] }
1616
serde_json = { version = "1.0.115",features = ["preserve_order"] }
1717
urlencoding = "2.1.3"
1818
thiserror = "1.0.58"

rsky-relay/Cargo.toml

+23-1
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,43 @@ edition = "2024"
77
# external
88
bstr = "1"
99
bus = "2"
10+
chrono = { version = "0.4", features = ["serde"] }
11+
ciborium = "0.2"
12+
cid = { version = "0.11", features = ["serde-codec"] }
1013
color-eyre = "0.6"
1114
derive_more = { version = "2", features = ["full"] }
15+
futures = { version = "0.3", default-features = false, features = ["std"] }
1216
hashbrown = "0.15"
1317
http = "1"
1418
httparse = "1.10"
19+
k256 = "0.13"
1520
magnetic = "2"
21+
mimalloc = "0.1"
22+
multibase = "0.9"
1623
nix = { version = "0.29", features = ["event", "socket"] }
24+
p256 = "0.13"
25+
reqwest = { version = "0.12", default-features = false, features = ["gzip", "hickory-dns", "http2", "json", "rustls-tls-webpki-roots-no-provider"] }
26+
rs-car-sync = "0.4"
1727
rtrb = "0.3"
1828
rustls = "0.23"
1929
serde = { version = "1", features = ["derive"] }
30+
serde_bytes = "0.11"
31+
serde_cbor = "0.11"
32+
serde_ipld_dagcbor = "0.6"
2033
serde_json = "1"
2134
signal-hook = { version = "0.3", features = ["extended-siginfo"] }
35+
sled = { version = "0.34", features = ["compression"] }
2236
thingbuf = "0.1"
2337
thiserror = "2"
38+
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
2439
tracing = { version = "0.1", features = ["release_max_level_debug"] }
2540
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
26-
tungstenite = { version = "0.26", features = ["rustls-tls-native-roots"] }
41+
tungstenite = { version = "0.26", features = ["rustls-tls-webpki-roots"] }
2742
url = "2"
43+
zerocopy = { version = "0.8", features = ["derive"] }
44+
45+
# internal
46+
rsky-common = { workspace = true }
47+
rsky-crypto = { workspace = true }
48+
rsky-identity = { workspace = true }
49+
rsky-lexicon = { workspace = true }

rsky-relay/src/crawler/connection.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,10 @@ impl Connection {
7373
let bytes = match msg {
7474
Message::Binary(bytes) => bytes,
7575
Message::Close(_) => todo!(),
76-
_ => unreachable!("{msg}"),
76+
_ => {
77+
tracing::debug!("unknown message: {msg}");
78+
continue;
79+
},
7780
};
7881

7982
let mut slot = self.message_tx.try_send_ref()?;

rsky-relay/src/crawler/manager.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::atomic::Ordering;
22
use std::thread;
3+
use std::time::Duration;
34

45
use hashbrown::HashMap;
56
use http::Uri;
@@ -15,6 +16,7 @@ use crate::crawler::worker::{Worker, WorkerError};
1516
use crate::types::{Cursor, MessageSender, RequestCrawlReceiver};
1617

1718
const CAPACITY: usize = 1024;
19+
const SLEEP: Duration = Duration::from_millis(10);
1820

1921
#[derive(Debug, Error)]
2022
pub enum ManagerError {
@@ -68,7 +70,9 @@ impl Manager {
6870
}
6971

7072
pub fn run(mut self) -> Result<(), ManagerError> {
71-
while self.update()? {}
73+
while self.update()? {
74+
thread::sleep(SLEEP);
75+
}
7276
self.shutdown()
7377
}
7478

rsky-relay/src/crawler/worker.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ impl Worker {
5454
while self.update() {
5555
thread::yield_now();
5656
}
57+
tracing::info!("shutting down crawler: {}", self.worker_id.0);
5758
self.shutdown()
5859
}
5960

@@ -89,7 +90,6 @@ impl Worker {
8990
}
9091
}
9192
Command::Shutdown => {
92-
tracing::debug!("shutting down crawler: {}", self.worker_id.0);
9393
return false;
9494
}
9595
}

rsky-relay/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mod crawler;
22
mod publisher;
33
mod server;
44
mod types;
5+
mod validator;
56

67
use std::sync::atomic::AtomicBool;
78

@@ -11,3 +12,4 @@ pub use crawler::Manager as CrawlerManager;
1112
pub use publisher::Manager as PublisherManager;
1213
pub use server::Server;
1314
pub use types::{MessageRecycle, RequestCrawl};
15+
pub use validator::Manager as ValidatorManager;

rsky-relay/src/main.rs

+16-3
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,32 @@ use std::thread;
44

55
use color_eyre::Result;
66

7-
use rsky_relay::{CrawlerManager, MessageRecycle, PublisherManager, SHUTDOWN, Server};
7+
use mimalloc::MiMalloc;
8+
use rustls::crypto::aws_lc_rs::default_provider;
89
use signal_hook::consts::{SIGINT, TERM_SIGNALS};
910
use signal_hook::flag;
1011
use signal_hook::iterator::SignalsInfo;
1112
use signal_hook::iterator::exfiltrator::WithOrigin;
1213

14+
use rsky_relay::{
15+
CrawlerManager, MessageRecycle, PublisherManager, SHUTDOWN, Server, ValidatorManager,
16+
};
17+
1318
const CAPACITY1: usize = 1 << 20;
1419
const CAPACITY2: usize = 1 << 10;
1520
const WORKERS: usize = 4;
1621

17-
pub fn main() -> Result<()> {
22+
#[global_allocator]
23+
static GLOBAL: MiMalloc = MiMalloc;
24+
25+
#[tokio::main]
26+
pub async fn main() -> Result<()> {
1827
tracing_subscriber::fmt()
1928
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2029
.init();
2130

31+
default_provider().install_default().unwrap();
32+
2233
let terminate_now = Arc::new(AtomicBool::new(false));
2334
flag::register_conditional_shutdown(SIGINT, 1, Arc::clone(&terminate_now))?;
2435
flag::register(SIGINT, Arc::clone(&terminate_now))?;
@@ -27,10 +38,12 @@ pub fn main() -> Result<()> {
2738
thingbuf::mpsc::blocking::with_recycle(CAPACITY1, MessageRecycle);
2839
let (request_crawl_tx, request_crawl_rx) = rtrb::RingBuffer::new(CAPACITY2);
2940
let (subscribe_repos_tx, subscribe_repos_rx) = rtrb::RingBuffer::new(CAPACITY2);
41+
let mut validator = ValidatorManager::new(message_rx).await?;
3042
let crawler = CrawlerManager::new(WORKERS, message_tx, request_crawl_rx)?;
31-
let publisher = PublisherManager::new(WORKERS, message_rx, subscribe_repos_rx)?;
43+
let publisher = PublisherManager::new(WORKERS, &mut validator, subscribe_repos_rx)?;
3244
let server = Server::new(request_crawl_tx, subscribe_repos_tx)?;
3345
thread::scope(move |s| {
46+
tokio::spawn(validator.run());
3447
s.spawn(move || crawler.run());
3548
s.spawn(move || publisher.run());
3649
s.spawn(move || server.run());

rsky-relay/src/publisher/connection.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ impl Connection {
6969

7070
pub fn send(&mut self, input: &[u8]) -> Result<(), ConnectionError> {
7171
match self.client.send(Message::binary(input.to_vec())) {
72-
Ok(()) => (),
72+
Ok(()) => {},
7373
Err(tungstenite::Error::WriteBufferFull(msg)) => {
7474
self.queue.push_back((Instant::now(), msg));
7575
if self.queue.len() > MAX_LEN {
@@ -91,7 +91,7 @@ impl Connection {
9191
pub fn poll(&mut self) -> Result<(), ConnectionError> {
9292
while let Some((instant, msg)) = self.queue.pop_front() {
9393
match self.client.send(msg) {
94-
Ok(()) => (),
94+
Ok(()) => {},
9595
Err(tungstenite::Error::WriteBufferFull(msg)) => {
9696
self.queue.push_back((instant, msg));
9797
break;

rsky-relay/src/publisher/manager.rs

+10-23
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
1-
use std::sync::Arc;
21
use std::sync::atomic::Ordering;
32
use std::thread;
3+
use std::time::Duration;
44

5-
use bus::Bus;
65
use magnetic::Consumer;
76
use magnetic::buffer::dynamic::DynamicBufferP2;
87
use thiserror::Error;
98

10-
use crate::SHUTDOWN;
119
use crate::publisher::types::{
1210
Command, CommandSender, Config, LocalId, Status, StatusReceiver, WorkerId,
1311
};
1412
use crate::publisher::worker::{Worker, WorkerError};
15-
use crate::types::{MessageReceiver, SubscribeReposReceiver};
13+
use crate::types::SubscribeReposReceiver;
14+
use crate::{SHUTDOWN, ValidatorManager};
1615

1716
const CAPACITY: usize = 1024;
17+
const SLEEP: Duration = Duration::from_millis(10);
1818

1919
#[derive(Debug, Error)]
2020
pub enum ManagerError {
@@ -36,22 +36,20 @@ struct WorkerHandle {
3636
pub struct Manager {
3737
workers: Box<[WorkerHandle]>,
3838
next_id: WorkerId,
39-
message_rx: MessageReceiver,
40-
bus: Bus<Arc<Vec<u8>>>,
4139
status_rx: StatusReceiver,
4240
subscribe_repos_rx: SubscribeReposReceiver,
4341
}
4442

4543
impl Manager {
4644
pub fn new(
47-
n_workers: usize, message_rx: MessageReceiver, subscribe_repos_rx: SubscribeReposReceiver,
45+
n_workers: usize, validator: &mut ValidatorManager,
46+
subscribe_repos_rx: SubscribeReposReceiver,
4847
) -> Result<Self, ManagerError> {
49-
let mut bus = Bus::new(CAPACITY);
5048
let (status_tx, status_rx) =
5149
magnetic::mpsc::mpsc_queue(DynamicBufferP2::new(CAPACITY).unwrap());
5250
let workers = (0..n_workers)
5351
.map(|worker_id| {
54-
let message_rx = bus.add_rx();
52+
let message_rx = validator.subscribe();
5553
let status_tx = status_tx.clone();
5654
let (command_tx, command_rx) = rtrb::RingBuffer::new(CAPACITY);
5755
let thread_handle = thread::spawn(move || {
@@ -63,15 +61,15 @@ impl Manager {
6361
Ok(Self {
6462
workers: workers.into_boxed_slice(),
6563
next_id: WorkerId(0),
66-
message_rx,
67-
bus,
6864
status_rx,
6965
subscribe_repos_rx,
7066
})
7167
}
7268

7369
pub fn run(mut self) -> Result<(), ManagerError> {
74-
while self.update()? {}
70+
while self.update()? {
71+
thread::sleep(SLEEP);
72+
}
7573
self.shutdown()
7674
}
7775

@@ -103,17 +101,6 @@ impl Manager {
103101
}
104102
}
105103

106-
for _ in 0..32 {
107-
match self.message_rx.try_recv_ref() {
108-
Ok(msg) => {
109-
self.bus.try_broadcast(Arc::new(msg.data.clone().into())).unwrap();
110-
}
111-
Err(thingbuf::mpsc::errors::TryRecvError::Empty) => break,
112-
Err(thingbuf::mpsc::errors::TryRecvError::Closed) => return Ok(false),
113-
Err(_) => unreachable!(),
114-
}
115-
}
116-
117104
if let Ok(subscribe_repos) = self.subscribe_repos_rx.pop() {
118105
let config = Config {
119106
stream: subscribe_repos.stream,

rsky-relay/src/publisher/worker.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ impl Worker {
5353
while self.update() {
5454
thread::yield_now();
5555
}
56+
tracing::info!("shutting down publisher: {}", self.worker_id.0);
5657
self.shutdown()
5758
}
5859

@@ -84,7 +85,6 @@ impl Worker {
8485
}
8586
}
8687
Command::Shutdown => {
87-
tracing::debug!("shutting down publisher: {}", self.worker_id.0);
8888
return false;
8989
}
9090
}

rsky-relay/src/types.rs

+5
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
use std::net::TcpStream;
2+
use std::sync::LazyLock;
23

34
use http::Uri;
45
use rtrb::{Consumer, Producer};
6+
use sled::Db;
57
use thingbuf::{Recycle, mpsc};
68
use tungstenite::stream::MaybeTlsStream;
79

10+
pub static DB: LazyLock<Db> =
11+
LazyLock::new(|| sled::Config::new().path("db").use_compression(true).open().unwrap());
12+
813
pub type RequestCrawlSender = Producer<RequestCrawl>;
914
pub type RequestCrawlReceiver = Consumer<RequestCrawl>;
1015
pub type SubscribeReposSender = Producer<SubscribeRepos>;

0 commit comments

Comments
 (0)