Skip to content

Commit d0b3697

Browse files
rsky-relay: publisher state/mio
1 parent b4fa73b commit d0b3697

19 files changed

+662
-599
lines changed

rsky-relay/Cargo.toml

+3-5
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ edition = "2024"
66
[dependencies]
77
# external
88
bstr = "1"
9-
bus = "2"
109
chrono = { version = "0.4", features = ["serde"] }
1110
ciborium = "0.2"
1211
cid = { version = "0.11", features = ["serde-codec"] }
@@ -19,8 +18,8 @@ httparse = "1.10"
1918
k256 = "0.13"
2019
magnetic = "2"
2120
mimalloc = "0.1"
21+
mio = { version = "1", features = ["os-ext", "os-poll"] }
2222
multibase = "0.9"
23-
nix = { version = "0.29", features = ["event", "socket"] }
2423
p256 = "0.13"
2524
reqwest = { version = "0.12", default-features = false, features = ["gzip", "hickory-dns", "http2", "json", "rustls-tls-webpki-roots-no-provider"] }
2625
rs-car-sync = "0.4"
@@ -38,12 +37,11 @@ thiserror = "2"
3837
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
3938
tracing = { version = "0.1", features = ["release_max_level_debug"] }
4039
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
41-
tungstenite = { version = "0.26", features = ["rustls-tls-webpki-roots"] }
40+
tungstenite = { version = "0.26", features = ["rustls-tls-webpki-roots", "url"] }
4241
url = "2"
42+
urlencoding = "2"
4343
zerocopy = { version = "0.8", features = ["derive"] }
4444

4545
# internal
4646
rsky-common = { workspace = true }
47-
rsky-crypto = { workspace = true }
4847
rsky-identity = { workspace = true }
49-
rsky-lexicon = { workspace = true }

rsky-relay/bootstrap.sh

+2
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,5 @@ curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"verpa.us-west.
3535
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"witchesbutter.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
3636
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"woodear.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
3737
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"atproto.brid.gy"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
38+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"esnoticia.online"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
39+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"at.app.wafrn.net"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'

rsky-relay/src/crawler/connection.rs

+30-18
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use std::io;
2-
use std::os::fd::{AsFd, BorrowedFd};
2+
use std::net::TcpStream;
3+
use std::os::fd::{AsRawFd, RawFd};
34

45
use thingbuf::mpsc;
56
use thiserror::Error;
6-
use tungstenite::Message;
77
use tungstenite::stream::MaybeTlsStream;
8+
use tungstenite::{Message, WebSocket};
9+
use url::Url;
810

9-
use crate::crawler::types::{Client, Config, StatusSender};
10-
use crate::types::MessageSender;
11+
use crate::types::{Cursor, MessageSender};
1112

1213
#[derive(Debug, Error)]
1314
pub enum ConnectionError {
@@ -20,28 +21,33 @@ pub enum ConnectionError {
2021
}
2122

2223
pub struct Connection {
23-
client: Client,
24-
config: Config,
24+
pub(crate) hostname: String,
25+
client: WebSocket<MaybeTlsStream<TcpStream>>,
2526
message_tx: MessageSender,
26-
_status_tx: StatusSender,
2727
}
2828

29-
impl AsFd for Connection {
29+
impl AsRawFd for Connection {
3030
#[inline]
31-
fn as_fd(&self) -> BorrowedFd<'_> {
31+
fn as_raw_fd(&self) -> RawFd {
3232
match self.client.get_ref() {
33-
MaybeTlsStream::Plain(stream) => stream.as_fd(),
34-
MaybeTlsStream::Rustls(stream) => stream.get_ref().as_fd(),
33+
MaybeTlsStream::Plain(stream) => stream.as_raw_fd(),
34+
MaybeTlsStream::Rustls(stream) => stream.get_ref().as_raw_fd(),
3535
_ => todo!(),
3636
}
3737
}
3838
}
3939

4040
impl Connection {
4141
pub fn connect(
42-
config: Config, message_tx: MessageSender, status_tx: StatusSender,
42+
hostname: String, cursor: Option<Cursor>, message_tx: MessageSender,
4343
) -> Result<Self, ConnectionError> {
44-
let (client, _) = tungstenite::connect(&config.uri)?;
44+
#[expect(clippy::unwrap_used)]
45+
let mut url =
46+
Url::parse(&format!("wss://{hostname}/xrpc/com.atproto.sync.subscribeRepos")).unwrap();
47+
if let Some(cursor) = cursor {
48+
url.query_pairs_mut().append_pair("cursor", &cursor.to_string());
49+
}
50+
let (client, _) = tungstenite::connect(url)?;
4551
match client.get_ref() {
4652
MaybeTlsStream::Rustls(stream) => {
4753
stream.get_ref().set_nonblocking(true)?;
@@ -51,7 +57,7 @@ impl Connection {
5157
}
5258
_ => {}
5359
}
54-
Ok(Self { client, config, message_tx, _status_tx: status_tx })
60+
Ok(Self { hostname, client, message_tx })
5561
}
5662

5763
pub fn close(&mut self) -> Result<(), ConnectionError> {
@@ -72,16 +78,22 @@ impl Connection {
7278

7379
let bytes = match msg {
7480
Message::Binary(bytes) => bytes,
75-
Message::Close(_) => todo!(),
81+
Message::Ping(_) | Message::Pong(_) => {
82+
continue;
83+
}
84+
Message::Close(close) => {
85+
tracing::debug!("[{}] received close: {close:?}", self.hostname);
86+
continue;
87+
}
7688
_ => {
77-
tracing::debug!("[{}] unknown message: {msg}", self.config.hostname);
89+
tracing::debug!("[{}] unknown message: {msg:?}", self.hostname);
7890
continue;
7991
}
8092
};
8193

8294
let mut slot = self.message_tx.try_send_ref()?;
83-
slot.data = bytes.into();
84-
slot.hostname = self.config.hostname.clone();
95+
slot.data = bytes;
96+
slot.hostname.clone_from(&self.hostname);
8597
}
8698
Ok(())
8799
}

rsky-relay/src/crawler/manager.rs

+20-53
Original file line numberDiff line numberDiff line change
@@ -2,75 +2,61 @@ use std::sync::atomic::Ordering;
22
use std::time::Duration;
33
use std::{io, thread};
44

5-
use hashbrown::HashMap;
6-
use http::Uri;
7-
use magnetic::Consumer;
8-
use magnetic::buffer::dynamic::DynamicBufferP2;
95
use thiserror::Error;
106

117
use crate::SHUTDOWN;
12-
use crate::crawler::types::{
13-
Command, CommandSender, Config, LocalId, Status, StatusReceiver, WorkerId,
14-
};
8+
use crate::crawler::types::{Command, CommandSender, RequestCrawlReceiver};
159
use crate::crawler::worker::{Worker, WorkerError};
16-
use crate::types::{MessageSender, RequestCrawlReceiver};
10+
use crate::types::MessageSender;
1711

1812
const CAPACITY: usize = 1024;
1913
const SLEEP: Duration = Duration::from_millis(10);
2014

2115
#[derive(Debug, Error)]
2216
pub enum ManagerError {
2317
#[error("spawn error: {0}")]
24-
SpawnError(#[from] io::Error),
18+
Spawn(#[from] io::Error),
2519
#[error("worker error: {0}")]
26-
WorkerError(#[from] WorkerError),
20+
Worker(#[from] WorkerError),
2721
#[error("rtrb error: {0}")]
28-
PushError(#[from] rtrb::PushError<Command>),
22+
Push(#[from] Box<rtrb::PushError<Command>>),
2923
#[error("join error")]
30-
JoinError,
24+
Join,
25+
}
26+
27+
impl From<rtrb::PushError<Command>> for ManagerError {
28+
fn from(value: rtrb::PushError<Command>) -> Self {
29+
Box::new(value).into()
30+
}
3131
}
3232

3333
#[derive(Debug)]
3434
struct WorkerHandle {
35-
pub configs: Vec<Config>,
3635
pub command_tx: CommandSender,
3736
pub thread_handle: thread::JoinHandle<Result<(), WorkerError>>,
3837
}
3938

4039
pub struct Manager {
4140
workers: Box<[WorkerHandle]>,
42-
next_id: WorkerId,
43-
configs: HashMap<Uri, Config>,
44-
status_rx: StatusReceiver,
41+
next_id: usize,
4542
request_crawl_rx: RequestCrawlReceiver,
4643
}
4744

4845
impl Manager {
4946
pub fn new(
50-
n_workers: usize, message_tx: MessageSender, request_crawl_rx: RequestCrawlReceiver,
47+
n_workers: usize, message_tx: &MessageSender, request_crawl_rx: RequestCrawlReceiver,
5148
) -> Result<Self, ManagerError> {
52-
let (status_tx, status_rx) =
53-
magnetic::mpsc::mpsc_queue(DynamicBufferP2::new(CAPACITY).unwrap());
5449
let workers = (0..n_workers)
5550
.map(|worker_id| -> Result<_, ManagerError> {
5651
let message_tx = message_tx.clone();
57-
let status_tx = status_tx.clone();
5852
let (command_tx, command_rx) = rtrb::RingBuffer::new(CAPACITY);
5953
let thread_handle = thread::Builder::new()
6054
.name(format!("rsky-crawl-{worker_id}"))
61-
.spawn(move || {
62-
Worker::new(WorkerId(worker_id), message_tx, status_tx, command_rx).run()
63-
})?;
64-
Ok(WorkerHandle { configs: Vec::new(), command_tx, thread_handle })
55+
.spawn(move || Worker::new(worker_id, message_tx, command_rx)?.run())?;
56+
Ok(WorkerHandle { command_tx, thread_handle })
6557
})
6658
.collect::<Result<Vec<_>, _>>()?;
67-
Ok(Self {
68-
workers: workers.into_boxed_slice(),
69-
next_id: WorkerId(0),
70-
configs: HashMap::new(),
71-
status_rx,
72-
request_crawl_rx,
73-
})
59+
Ok(Self { workers: workers.into_boxed_slice(), next_id: 0, request_crawl_rx })
7460
}
7561

7662
pub fn run(mut self) -> Result<(), ManagerError> {
@@ -87,40 +73,21 @@ impl Manager {
8773
worker.command_tx.push(Command::Shutdown)?;
8874
}
8975
for (id, worker) in self.workers.into_iter().enumerate() {
90-
if let Err(err) = worker.thread_handle.join().map_err(|_| ManagerError::JoinError)? {
76+
if let Err(err) = worker.thread_handle.join().map_err(|_| ManagerError::Join)? {
9177
tracing::warn!("crawler worker {id} error: {err}");
9278
}
9379
}
9480
Ok(())
9581
}
9682

97-
fn handle_status(&mut self, _status: Status) -> Result<bool, ManagerError> {
98-
Ok(true)
99-
}
100-
10183
fn update(&mut self) -> Result<bool, ManagerError> {
10284
if SHUTDOWN.load(Ordering::Relaxed) {
10385
return Ok(false);
10486
}
10587

106-
if let Ok(status) = self.status_rx.try_pop() {
107-
if !self.handle_status(status)? {
108-
return Ok(false);
109-
}
110-
}
111-
11288
if let Ok(request_crawl) = self.request_crawl_rx.pop() {
113-
if !self.configs.contains_key(&request_crawl.uri) {
114-
let config = Config {
115-
uri: request_crawl.uri.clone(),
116-
hostname: request_crawl.hostname.clone(),
117-
worker_id: self.next_id,
118-
local_id: LocalId(self.workers[self.next_id.0].configs.len()),
119-
};
120-
self.next_id = WorkerId((self.next_id.0 + 1) % self.workers.len());
121-
self.configs.insert(request_crawl.uri, config.clone());
122-
self.workers[config.worker_id.0].command_tx.push(Command::Connect(config)).unwrap();
123-
}
89+
self.workers[self.next_id].command_tx.push(Command::Connect(request_crawl))?;
90+
self.next_id = (self.next_id + 1) % self.workers.len();
12491
}
12592

12693
Ok(true)

rsky-relay/src/crawler/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ mod types;
44
mod worker;
55

66
pub use manager::Manager;
7+
pub use types::{RequestCrawl, RequestCrawlSender};

rsky-relay/src/crawler/types.rs

+11-25
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,22 @@
1-
use std::net::TcpStream;
2-
3-
use http::Uri;
4-
use magnetic::buffer::dynamic::DynamicBufferP2;
5-
use magnetic::mpsc::{MPSCConsumer, MPSCProducer};
61
use rtrb::{Consumer, Producer};
7-
use tungstenite::WebSocket;
8-
use tungstenite::stream::MaybeTlsStream;
2+
use serde::Deserialize;
3+
4+
use crate::types::Cursor;
95

10-
pub type Client = WebSocket<MaybeTlsStream<TcpStream>>;
116
pub type CommandSender = Producer<Command>;
127
pub type CommandReceiver = Consumer<Command>;
13-
pub type StatusSender = MPSCProducer<Status, DynamicBufferP2<Status>>;
14-
pub type StatusReceiver = MPSCConsumer<Status, DynamicBufferP2<Status>>;
8+
pub type RequestCrawlSender = Producer<RequestCrawl>;
9+
pub type RequestCrawlReceiver = Consumer<RequestCrawl>;
1510

16-
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
17-
pub struct WorkerId(pub usize);
18-
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
19-
pub struct LocalId(pub usize);
20-
21-
#[derive(Debug, Clone)]
22-
pub struct Config {
23-
pub uri: Uri,
11+
#[derive(Debug, Deserialize)]
12+
pub struct RequestCrawl {
2413
pub hostname: String,
25-
pub worker_id: WorkerId,
26-
pub local_id: LocalId,
14+
#[serde(skip)]
15+
pub cursor: Option<Cursor>,
2716
}
2817

29-
#[derive(Debug, Clone)]
18+
#[derive(Debug)]
3019
pub enum Command {
31-
Connect(Config),
20+
Connect(RequestCrawl),
3221
Shutdown,
3322
}
34-
35-
#[derive(Debug, Clone)]
36-
pub enum Status {}

0 commit comments

Comments
 (0)