Skip to content

Commit b4fa73b

Browse files
rsky-relay: crawler state
1 parent 65616b0 commit b4fa73b

File tree

13 files changed

+224
-55
lines changed

13 files changed

+224
-55
lines changed

rsky-relay/bootstrap.sh

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"agaric.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
2+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"amanita.us-east.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
3+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"blewit.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
4+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"boletus.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
5+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"bracket.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
6+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"button.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
7+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"chaga.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
8+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"chanterelle.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
9+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"conocybe.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
10+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"coral.us-east.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
11+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"cordyceps.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
12+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"cremini.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
13+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"enoki.us-east.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
14+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"ganoderma.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
15+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"gomphus.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
16+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"hydnum.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
17+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"inkcap.us-east.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
18+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"lepista.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
19+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"lionsmane.us-east.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
20+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"lobster.us-east.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
21+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"magic.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
22+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"maitake.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
23+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"matsutake.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
24+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"milkcap.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
25+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"morel.us-east.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
26+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"oyster.us-east.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
27+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"porcini.us-east.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
28+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"puffball.us-east.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
29+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"russula.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
30+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"shaggymane.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
31+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"shiitake.us-east.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
32+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"shimeji.us-east.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
33+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"stinkhorn.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
34+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"verpa.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
35+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"witchesbutter.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
36+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"woodear.us-west.host.bsky.network"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'
37+
curl -X POST -H 'Content-Type: application/json' -d '{"hostname":"atproto.brid.gy"}' 'http://localhost:9000/xrpc/com.atproto.sync.requestCrawl'

rsky-relay/src/crawler/connection.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub struct Connection {
2323
client: Client,
2424
config: Config,
2525
message_tx: MessageSender,
26-
status_tx: StatusSender,
26+
_status_tx: StatusSender,
2727
}
2828

2929
impl AsFd for Connection {
@@ -51,7 +51,7 @@ impl Connection {
5151
}
5252
_ => {}
5353
}
54-
Ok(Self { client, config, message_tx, status_tx })
54+
Ok(Self { client, config, message_tx, _status_tx: status_tx })
5555
}
5656

5757
pub fn close(&mut self) -> Result<(), ConnectionError> {
@@ -74,14 +74,14 @@ impl Connection {
7474
Message::Binary(bytes) => bytes,
7575
Message::Close(_) => todo!(),
7676
_ => {
77-
tracing::debug!("unknown message: {msg}");
77+
tracing::debug!("[{}] unknown message: {msg}", self.config.hostname);
7878
continue;
79-
},
79+
}
8080
};
8181

8282
let mut slot = self.message_tx.try_send_ref()?;
8383
slot.data = bytes.into();
84-
slot.uri = self.config.uri.clone();
84+
slot.hostname = self.config.hostname.clone();
8585
}
8686
Ok(())
8787
}

rsky-relay/src/crawler/manager.rs

+16-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::sync::atomic::Ordering;
2-
use std::thread;
32
use std::time::Duration;
3+
use std::{io, thread};
44

55
use hashbrown::HashMap;
66
use http::Uri;
@@ -13,13 +13,15 @@ use crate::crawler::types::{
1313
Command, CommandSender, Config, LocalId, Status, StatusReceiver, WorkerId,
1414
};
1515
use crate::crawler::worker::{Worker, WorkerError};
16-
use crate::types::{Cursor, MessageSender, RequestCrawlReceiver};
16+
use crate::types::{MessageSender, RequestCrawlReceiver};
1717

1818
const CAPACITY: usize = 1024;
1919
const SLEEP: Duration = Duration::from_millis(10);
2020

2121
#[derive(Debug, Error)]
2222
pub enum ManagerError {
23+
#[error("spawn error: {0}")]
24+
SpawnError(#[from] io::Error),
2325
#[error("worker error: {0}")]
2426
WorkerError(#[from] WorkerError),
2527
#[error("rtrb error: {0}")]
@@ -50,16 +52,18 @@ impl Manager {
5052
let (status_tx, status_rx) =
5153
magnetic::mpsc::mpsc_queue(DynamicBufferP2::new(CAPACITY).unwrap());
5254
let workers = (0..n_workers)
53-
.map(|worker_id| {
55+
.map(|worker_id| -> Result<_, ManagerError> {
5456
let message_tx = message_tx.clone();
5557
let status_tx = status_tx.clone();
5658
let (command_tx, command_rx) = rtrb::RingBuffer::new(CAPACITY);
57-
let thread_handle = thread::spawn(move || {
58-
Worker::new(WorkerId(worker_id), message_tx, status_tx, command_rx).run()
59-
});
60-
WorkerHandle { configs: Vec::new(), command_tx, thread_handle }
59+
let thread_handle = thread::Builder::new()
60+
.name(format!("rsky-crawl-{worker_id}"))
61+
.spawn(move || {
62+
Worker::new(WorkerId(worker_id), message_tx, status_tx, command_rx).run()
63+
})?;
64+
Ok(WorkerHandle { configs: Vec::new(), command_tx, thread_handle })
6165
})
62-
.collect::<Vec<_>>();
66+
.collect::<Result<Vec<_>, _>>()?;
6367
Ok(Self {
6468
workers: workers.into_boxed_slice(),
6569
next_id: WorkerId(0),
@@ -73,6 +77,8 @@ impl Manager {
7377
while self.update()? {
7478
thread::sleep(SLEEP);
7579
}
80+
tracing::info!("shutting down crawler");
81+
SHUTDOWN.store(true, Ordering::Relaxed);
7682
self.shutdown()
7783
}
7884

@@ -88,8 +94,7 @@ impl Manager {
8894
Ok(())
8995
}
9096

91-
fn handle_status(&mut self, status: Status) -> Result<bool, ManagerError> {
92-
match status {}
97+
fn handle_status(&mut self, _status: Status) -> Result<bool, ManagerError> {
9398
Ok(true)
9499
}
95100

@@ -108,7 +113,7 @@ impl Manager {
108113
if !self.configs.contains_key(&request_crawl.uri) {
109114
let config = Config {
110115
uri: request_crawl.uri.clone(),
111-
cursor: Cursor(0),
116+
hostname: request_crawl.hostname.clone(),
112117
worker_id: self.next_id,
113118
local_id: LocalId(self.workers[self.next_id.0].configs.len()),
114119
};

rsky-relay/src/crawler/types.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ use rtrb::{Consumer, Producer};
77
use tungstenite::WebSocket;
88
use tungstenite::stream::MaybeTlsStream;
99

10-
use crate::types::Cursor;
11-
1210
pub type Client = WebSocket<MaybeTlsStream<TcpStream>>;
1311
pub type CommandSender = Producer<Command>;
1412
pub type CommandReceiver = Consumer<Command>;
@@ -23,7 +21,7 @@ pub struct LocalId(pub usize);
2321
#[derive(Debug, Clone)]
2422
pub struct Config {
2523
pub uri: Uri,
26-
pub cursor: Cursor,
24+
pub hostname: String,
2725
pub worker_id: WorkerId,
2826
pub local_id: LocalId,
2927
}

rsky-relay/src/lib.rs

+31
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,34 @@
1+
#![deny(
2+
deprecated_safe,
3+
future_incompatible,
4+
let_underscore,
5+
keyword_idents,
6+
nonstandard_style,
7+
refining_impl_trait,
8+
rust_2018_compatibility,
9+
rust_2018_idioms,
10+
rust_2021_compatibility,
11+
rust_2024_compatibility,
12+
unused,
13+
warnings,
14+
clippy::all,
15+
clippy::cargo,
16+
clippy::dbg_macro,
17+
clippy::expect_used,
18+
clippy::iter_over_hash_type,
19+
clippy::nursery,
20+
clippy::pathbuf_init_then_push,
21+
clippy::pedantic,
22+
clippy::print_stderr,
23+
clippy::print_stdout,
24+
clippy::renamed_function_params,
25+
clippy::str_to_string,
26+
clippy::string_to_string,
27+
clippy::unused_result_ok,
28+
clippy::unwrap_used
29+
)]
30+
#![allow(clippy::missing_errors_doc, clippy::missing_panics_doc, clippy::missing_safety_doc)]
31+
132
mod crawler;
233
mod publisher;
334
mod server;

rsky-relay/src/main.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use rsky_relay::{
1515
CrawlerManager, MessageRecycle, PublisherManager, SHUTDOWN, Server, ValidatorManager,
1616
};
1717

18-
const CAPACITY1: usize = 1 << 20;
18+
const CAPACITY1: usize = 1 << 16;
1919
const CAPACITY2: usize = 1 << 10;
2020
const WORKERS: usize = 4;
2121

@@ -44,9 +44,9 @@ pub async fn main() -> Result<()> {
4444
let server = Server::new(request_crawl_tx, subscribe_repos_tx)?;
4545
thread::scope(move |s| {
4646
tokio::spawn(validator.run());
47-
s.spawn(move || crawler.run());
48-
s.spawn(move || publisher.run());
49-
s.spawn(move || server.run());
47+
thread::Builder::new().name("rsky-crawl".into()).spawn_scoped(s, move || crawler.run())?;
48+
thread::Builder::new().name("rsky-pub".into()).spawn_scoped(s, move || publisher.run())?;
49+
thread::Builder::new().name("rsky-server".into()).spawn_scoped(s, move || server.run())?;
5050
let mut signals =
5151
SignalsInfo::<WithOrigin>::new(TERM_SIGNALS).expect("failed to init signals");
5252
for signal_info in &mut signals {

rsky-relay/src/publisher/connection.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub enum ConnectionError {
3030
pub struct Connection {
3131
client: Client,
3232
queue: VecDeque<(Instant, Message)>,
33-
status_tx: StatusSender,
33+
_status_tx: StatusSender,
3434
}
3535

3636
impl AsFd for Connection {
@@ -58,7 +58,7 @@ impl Connection {
5858
}
5959
_ => {}
6060
}
61-
Ok(Self { client, queue: VecDeque::new(), status_tx })
61+
Ok(Self { client, queue: VecDeque::new(), _status_tx: status_tx })
6262
}
6363

6464
pub fn close(&mut self) -> Result<(), ConnectionError> {
@@ -69,7 +69,7 @@ impl Connection {
6969

7070
pub fn send(&mut self, input: &[u8]) -> Result<(), ConnectionError> {
7171
match self.client.send(Message::binary(input.to_vec())) {
72-
Ok(()) => {},
72+
Ok(()) => {}
7373
Err(tungstenite::Error::WriteBufferFull(msg)) => {
7474
self.queue.push_back((Instant::now(), msg));
7575
if self.queue.len() > MAX_LEN {
@@ -91,7 +91,7 @@ impl Connection {
9191
pub fn poll(&mut self) -> Result<(), ConnectionError> {
9292
while let Some((instant, msg)) = self.queue.pop_front() {
9393
match self.client.send(msg) {
94-
Ok(()) => {},
94+
Ok(()) => {}
9595
Err(tungstenite::Error::WriteBufferFull(msg)) => {
9696
self.queue.push_back((instant, msg));
9797
break;

rsky-relay/src/publisher/manager.rs

+14-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::sync::atomic::Ordering;
2-
use std::thread;
32
use std::time::Duration;
3+
use std::{io, thread};
44

55
use magnetic::Consumer;
66
use magnetic::buffer::dynamic::DynamicBufferP2;
@@ -18,6 +18,8 @@ const SLEEP: Duration = Duration::from_millis(10);
1818

1919
#[derive(Debug, Error)]
2020
pub enum ManagerError {
21+
#[error("spawn error: {0}")]
22+
SpawnError(#[from] io::Error),
2123
#[error("worker error: {0}")]
2224
WorkerError(#[from] WorkerError),
2325
#[error("rtrb error: {0}")]
@@ -48,16 +50,18 @@ impl Manager {
4850
let (status_tx, status_rx) =
4951
magnetic::mpsc::mpsc_queue(DynamicBufferP2::new(CAPACITY).unwrap());
5052
let workers = (0..n_workers)
51-
.map(|worker_id| {
53+
.map(|worker_id| -> Result<_, ManagerError> {
5254
let message_rx = validator.subscribe();
5355
let status_tx = status_tx.clone();
5456
let (command_tx, command_rx) = rtrb::RingBuffer::new(CAPACITY);
55-
let thread_handle = thread::spawn(move || {
56-
Worker::new(WorkerId(worker_id), message_rx, status_tx, command_rx).run()
57-
});
58-
WorkerHandle { configs: Vec::new(), command_tx, thread_handle }
57+
let thread_handle = thread::Builder::new()
58+
.name(format!("rsky-pub-{worker_id}"))
59+
.spawn(move || {
60+
Worker::new(WorkerId(worker_id), message_rx, status_tx, command_rx).run()
61+
})?;
62+
Ok(WorkerHandle { configs: Vec::new(), command_tx, thread_handle })
5963
})
60-
.collect::<Vec<_>>();
64+
.collect::<Result<Vec<_>, _>>()?;
6165
Ok(Self {
6266
workers: workers.into_boxed_slice(),
6367
next_id: WorkerId(0),
@@ -70,6 +74,8 @@ impl Manager {
7074
while self.update()? {
7175
thread::sleep(SLEEP);
7276
}
77+
tracing::info!("shutting down publisher");
78+
SHUTDOWN.store(true, Ordering::Relaxed);
7379
self.shutdown()
7480
}
7581

@@ -85,8 +91,7 @@ impl Manager {
8591
Ok(())
8692
}
8793

88-
fn handle_status(&mut self, status: Status) -> Result<bool, ManagerError> {
89-
match status {}
94+
fn handle_status(&mut self, _status: Status) -> Result<bool, ManagerError> {
9095
Ok(true)
9196
}
9297

0 commit comments

Comments
 (0)