Skip to content

Commit 32f73a1

Browse files
rsky-relay: crawler
1 parent 90128bf commit 32f73a1

File tree

9 files changed

+567
-0
lines changed

9 files changed

+567
-0
lines changed

rsky-relay/Cargo.toml

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "rsky-relay"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
# external
8+
bus = "2"
9+
color-eyre = "0.6"
10+
derive_more = { version = "2", features = ["full"] }
11+
hashbrown = "0.15"
12+
http = "1"
13+
httparse = "1.10"
14+
magnetic = "2"
15+
nix = { version = "0.29", features = ["event", "socket"] }
16+
rtrb = "0.3"
17+
rustls = "0.23"
18+
serde = { version = "1", features = ["derive"] }
19+
serde_json = "1"
20+
thingbuf = "0.1"
21+
thiserror = "2"
22+
tungstenite = { version = "0.26", features = ["rustls-tls-native-roots"] }
23+
url = "2"

rsky-relay/src/client/connection.rs

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use std::io;
2+
use std::os::fd::{AsFd, BorrowedFd};
3+
4+
use thingbuf::mpsc;
5+
use thiserror::Error;
6+
use tungstenite::Message;
7+
use tungstenite::stream::MaybeTlsStream;
8+
9+
use crate::client::types::{Client, Config, StatusSender};
10+
use crate::types::MessageSender;
11+
12+
#[derive(Debug, Error)]
13+
pub enum ConnectionError {
14+
#[error("io error: {0}")]
15+
Io(#[from] io::Error),
16+
#[error("tungstenite error: {0}")]
17+
Tungstenite(#[from] tungstenite::Error),
18+
#[error("thingbuf error: {0}")]
19+
Thingbuf(#[from] mpsc::errors::TrySendError),
20+
}
21+
22+
pub struct Connection {
23+
client: Client,
24+
config: Config,
25+
message_tx: MessageSender,
26+
status_tx: StatusSender,
27+
}
28+
29+
impl AsFd for Connection {
30+
fn as_fd(&self) -> BorrowedFd<'_> {
31+
match self.client.get_ref() {
32+
MaybeTlsStream::Plain(stream) => stream.as_fd(),
33+
MaybeTlsStream::Rustls(stream) => stream.get_ref().as_fd(),
34+
_ => todo!(),
35+
}
36+
}
37+
}
38+
39+
impl Connection {
40+
pub fn connect(
41+
config: Config, message_tx: MessageSender, status_tx: StatusSender,
42+
) -> Result<Self, ConnectionError> {
43+
let (client, _) = tungstenite::connect(&config.uri)?;
44+
match client.get_ref() {
45+
MaybeTlsStream::Rustls(stream) => {
46+
stream.get_ref().set_nonblocking(true)?;
47+
}
48+
MaybeTlsStream::Plain(stream) => {
49+
stream.set_nonblocking(true)?;
50+
}
51+
_ => {}
52+
}
53+
Ok(Self { client, config, message_tx, status_tx })
54+
}
55+
56+
pub fn close(&mut self) -> Result<(), ConnectionError> {
57+
self.client.close(None)?;
58+
self.client.flush()?;
59+
Ok(())
60+
}
61+
62+
pub fn poll(&mut self) -> Result<(), ConnectionError> {
63+
loop {
64+
let msg = match self.client.read() {
65+
Ok(msg) => msg,
66+
Err(tungstenite::Error::Io(e)) if e.kind() == io::ErrorKind::WouldBlock => {
67+
return Ok(());
68+
}
69+
Err(err) => Err(err)?,
70+
};
71+
72+
let bytes = match msg {
73+
Message::Binary(bytes) => bytes,
74+
Message::Close(_) => todo!(),
75+
_ => unreachable!("{msg}"),
76+
};
77+
78+
let mut slot = self.message_tx.try_send_ref()?;
79+
slot.data = bytes.into();
80+
slot.uri = self.config.uri.clone();
81+
}
82+
}
83+
}

rsky-relay/src/client/manager.rs

+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
use std::thread;
2+
3+
use hashbrown::HashMap;
4+
use http::Uri;
5+
use magnetic::Consumer;
6+
use magnetic::buffer::dynamic::DynamicBufferP2;
7+
use thiserror::Error;
8+
9+
use crate::client::types::{
10+
Command, CommandSender, Config, LocalId, Status, StatusReceiver, WorkerId,
11+
};
12+
use crate::client::worker::{Worker, WorkerError};
13+
use crate::types::{CrawlRequestReceiver, Cursor, MessageSender};
14+
15+
const CAPACITY: usize = 1024;
16+
17+
#[derive(Debug, Error)]
18+
pub enum ManagerError {
19+
#[error("worker error: {0}")]
20+
WorkerError(#[from] WorkerError),
21+
#[error("rtrb error: {0}")]
22+
PushError(#[from] rtrb::PushError<Command>),
23+
#[error("join error")]
24+
JoinError,
25+
}
26+
27+
#[derive(Debug)]
28+
struct WorkerHandle {
29+
pub configs: Vec<Config>,
30+
pub command_tx: CommandSender,
31+
pub thread_handle: thread::JoinHandle<()>,
32+
}
33+
34+
pub struct Manager {
35+
workers: Box<[WorkerHandle]>,
36+
next_id: WorkerId,
37+
configs: HashMap<Uri, Config>,
38+
status_rx: StatusReceiver,
39+
crawl_request_rx: CrawlRequestReceiver,
40+
}
41+
42+
impl Manager {
43+
pub fn new(
44+
n_workers: usize, message_tx: MessageSender, crawl_request_rx: CrawlRequestReceiver,
45+
) -> Result<Self, ManagerError> {
46+
let (status_tx, status_rx) =
47+
magnetic::mpsc::mpsc_queue(DynamicBufferP2::new(CAPACITY).unwrap());
48+
let workers = (0..n_workers)
49+
.map(|worker_id| {
50+
let message_tx = message_tx.clone();
51+
let status_tx = status_tx.clone();
52+
let (command_tx, command_rx) = rtrb::RingBuffer::new(CAPACITY);
53+
let thread_handle = thread::spawn(move || {
54+
Worker::new(WorkerId(worker_id), message_tx, status_tx, command_rx).run();
55+
});
56+
WorkerHandle { configs: Vec::new(), command_tx, thread_handle }
57+
})
58+
.collect::<Vec<_>>();
59+
Ok(Self {
60+
workers: workers.into_boxed_slice(),
61+
next_id: WorkerId(0),
62+
configs: HashMap::new(),
63+
status_rx,
64+
crawl_request_rx,
65+
})
66+
}
67+
68+
pub fn run(mut self) -> Result<(), ManagerError> {
69+
while self.update()? {}
70+
self.shutdown()
71+
}
72+
73+
pub fn shutdown(mut self) -> Result<(), ManagerError> {
74+
for worker in &mut self.workers {
75+
worker.command_tx.push(Command::Shutdown)?;
76+
}
77+
for worker in self.workers {
78+
worker.thread_handle.join().map_err(|_| ManagerError::JoinError)?;
79+
}
80+
Ok(())
81+
}
82+
83+
fn handle_status(&mut self, status: Status) -> Result<bool, ManagerError> {
84+
match status {}
85+
Ok(true)
86+
}
87+
88+
fn update(&mut self) -> Result<bool, ManagerError> {
89+
if let Ok(status) = self.status_rx.try_pop() {
90+
if !self.handle_status(status)? {
91+
return Ok(false);
92+
}
93+
}
94+
95+
if let Ok(request) = self.crawl_request_rx.pop() {
96+
if !self.configs.contains_key(&request.uri) {
97+
let config = Config {
98+
uri: request.uri.clone(),
99+
cursor: Cursor(0),
100+
worker_id: self.next_id,
101+
local_id: LocalId(self.workers[self.next_id.0].configs.len()),
102+
};
103+
self.next_id = WorkerId((self.next_id.0 + 1) % self.workers.len());
104+
self.configs.insert(request.uri.clone(), config.clone());
105+
self.workers[config.worker_id.0].command_tx.push(Command::Connect(config)).unwrap();
106+
}
107+
}
108+
109+
Ok(true)
110+
}
111+
}

rsky-relay/src/client/mod.rs

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
mod connection;
2+
mod manager;
3+
mod types;
4+
mod worker;
5+
6+
pub use manager::{Manager, ManagerError};

rsky-relay/src/client/types.rs

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use std::net::TcpStream;
2+
3+
use http::Uri;
4+
use magnetic::buffer::dynamic::DynamicBufferP2;
5+
use magnetic::mpsc::{MPSCConsumer, MPSCProducer};
6+
use rtrb::{Consumer, Producer};
7+
use tungstenite::WebSocket;
8+
use tungstenite::stream::MaybeTlsStream;
9+
10+
use crate::types::Cursor;
11+
12+
pub type Client = WebSocket<MaybeTlsStream<TcpStream>>;
13+
pub type CommandSender = Producer<Command>;
14+
pub type CommandReceiver = Consumer<Command>;
15+
pub type StatusSender = MPSCProducer<Status, DynamicBufferP2<Status>>;
16+
pub type StatusReceiver = MPSCConsumer<Status, DynamicBufferP2<Status>>;
17+
18+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
19+
pub struct WorkerId(pub usize);
20+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
21+
pub struct LocalId(pub usize);
22+
23+
#[derive(Debug, Clone)]
24+
pub struct Config {
25+
pub uri: Uri,
26+
pub cursor: Cursor,
27+
pub worker_id: WorkerId,
28+
pub local_id: LocalId,
29+
}
30+
31+
#[derive(Debug, Clone)]
32+
pub enum Command {
33+
Connect(Config),
34+
Reconnect(LocalId),
35+
Shutdown,
36+
}
37+
38+
#[derive(Debug, Clone)]
39+
pub enum Status {}

0 commit comments

Comments
 (0)