Skip to content

Commit e06a421

Browse files
rsky-relay: publisher
1 parent 32f73a1 commit e06a421

File tree

15 files changed

+759
-156
lines changed

15 files changed

+759
-156
lines changed

rsky-relay/Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ edition = "2024"
55

66
[dependencies]
77
# external
8+
bstr = "1"
89
bus = "2"
910
color-eyre = "0.6"
1011
derive_more = { version = "2", features = ["full"] }
@@ -17,7 +18,10 @@ rtrb = "0.3"
1718
rustls = "0.23"
1819
serde = { version = "1", features = ["derive"] }
1920
serde_json = "1"
21+
signal-hook = { version = "0.3", features = ["extended-siginfo"] }
2022
thingbuf = "0.1"
2123
thiserror = "2"
24+
tracing = { version = "0.1", features = ["release_max_level_debug"] }
25+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
2226
tungstenite = { version = "0.26", features = ["rustls-tls-native-roots"] }
2327
url = "2"

rsky-relay/src/client/connection.rs renamed to rsky-relay/src/crawler/connection.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use thiserror::Error;
66
use tungstenite::Message;
77
use tungstenite::stream::MaybeTlsStream;
88

9-
use crate::client::types::{Client, Config, StatusSender};
9+
use crate::crawler::types::{Client, Config, StatusSender};
1010
use crate::types::MessageSender;
1111

1212
#[derive(Debug, Error)]
@@ -27,6 +27,7 @@ pub struct Connection {
2727
}
2828

2929
impl AsFd for Connection {
30+
#[inline]
3031
fn as_fd(&self) -> BorrowedFd<'_> {
3132
match self.client.get_ref() {
3233
MaybeTlsStream::Plain(stream) => stream.as_fd(),
@@ -60,11 +61,11 @@ impl Connection {
6061
}
6162

6263
pub fn poll(&mut self) -> Result<(), ConnectionError> {
63-
loop {
64+
for _ in 0..1024 {
6465
let msg = match self.client.read() {
6566
Ok(msg) => msg,
6667
Err(tungstenite::Error::Io(e)) if e.kind() == io::ErrorKind::WouldBlock => {
67-
return Ok(());
68+
break;
6869
}
6970
Err(err) => Err(err)?,
7071
};
@@ -79,5 +80,6 @@ impl Connection {
7980
slot.data = bytes.into();
8081
slot.uri = self.config.uri.clone();
8182
}
83+
Ok(())
8284
}
8385
}

rsky-relay/src/client/manager.rs renamed to rsky-relay/src/crawler/manager.rs

+22-14
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::sync::atomic::Ordering;
12
use std::thread;
23

34
use hashbrown::HashMap;
@@ -6,11 +7,12 @@ use magnetic::Consumer;
67
use magnetic::buffer::dynamic::DynamicBufferP2;
78
use thiserror::Error;
89

9-
use crate::client::types::{
10+
use crate::SHUTDOWN;
11+
use crate::crawler::types::{
1012
Command, CommandSender, Config, LocalId, Status, StatusReceiver, WorkerId,
1113
};
12-
use crate::client::worker::{Worker, WorkerError};
13-
use crate::types::{CrawlRequestReceiver, Cursor, MessageSender};
14+
use crate::crawler::worker::{Worker, WorkerError};
15+
use crate::types::{Cursor, MessageSender, RequestCrawlReceiver};
1416

1517
const CAPACITY: usize = 1024;
1618

@@ -28,20 +30,20 @@ pub enum ManagerError {
2830
struct WorkerHandle {
2931
pub configs: Vec<Config>,
3032
pub command_tx: CommandSender,
31-
pub thread_handle: thread::JoinHandle<()>,
33+
pub thread_handle: thread::JoinHandle<Result<(), WorkerError>>,
3234
}
3335

3436
pub struct Manager {
3537
workers: Box<[WorkerHandle]>,
3638
next_id: WorkerId,
3739
configs: HashMap<Uri, Config>,
3840
status_rx: StatusReceiver,
39-
crawl_request_rx: CrawlRequestReceiver,
41+
request_crawl_rx: RequestCrawlReceiver,
4042
}
4143

4244
impl Manager {
4345
pub fn new(
44-
n_workers: usize, message_tx: MessageSender, crawl_request_rx: CrawlRequestReceiver,
46+
n_workers: usize, message_tx: MessageSender, request_crawl_rx: RequestCrawlReceiver,
4547
) -> Result<Self, ManagerError> {
4648
let (status_tx, status_rx) =
4749
magnetic::mpsc::mpsc_queue(DynamicBufferP2::new(CAPACITY).unwrap());
@@ -51,7 +53,7 @@ impl Manager {
5153
let status_tx = status_tx.clone();
5254
let (command_tx, command_rx) = rtrb::RingBuffer::new(CAPACITY);
5355
let thread_handle = thread::spawn(move || {
54-
Worker::new(WorkerId(worker_id), message_tx, status_tx, command_rx).run();
56+
Worker::new(WorkerId(worker_id), message_tx, status_tx, command_rx).run()
5557
});
5658
WorkerHandle { configs: Vec::new(), command_tx, thread_handle }
5759
})
@@ -61,7 +63,7 @@ impl Manager {
6163
next_id: WorkerId(0),
6264
configs: HashMap::new(),
6365
status_rx,
64-
crawl_request_rx,
66+
request_crawl_rx,
6567
})
6668
}
6769

@@ -74,8 +76,10 @@ impl Manager {
7476
for worker in &mut self.workers {
7577
worker.command_tx.push(Command::Shutdown)?;
7678
}
77-
for worker in self.workers {
78-
worker.thread_handle.join().map_err(|_| ManagerError::JoinError)?;
79+
for (id, worker) in self.workers.into_iter().enumerate() {
80+
if let Err(err) = worker.thread_handle.join().map_err(|_| ManagerError::JoinError)? {
81+
tracing::warn!("crawler worker {id} error: {err}");
82+
}
7983
}
8084
Ok(())
8185
}
@@ -86,22 +90,26 @@ impl Manager {
8690
}
8791

8892
fn update(&mut self) -> Result<bool, ManagerError> {
93+
if SHUTDOWN.load(Ordering::Relaxed) {
94+
return Ok(false);
95+
}
96+
8997
if let Ok(status) = self.status_rx.try_pop() {
9098
if !self.handle_status(status)? {
9199
return Ok(false);
92100
}
93101
}
94102

95-
if let Ok(request) = self.crawl_request_rx.pop() {
96-
if !self.configs.contains_key(&request.uri) {
103+
if let Ok(request_crawl) = self.request_crawl_rx.pop() {
104+
if !self.configs.contains_key(&request_crawl.uri) {
97105
let config = Config {
98-
uri: request.uri.clone(),
106+
uri: request_crawl.uri.clone(),
99107
cursor: Cursor(0),
100108
worker_id: self.next_id,
101109
local_id: LocalId(self.workers[self.next_id.0].configs.len()),
102110
};
103111
self.next_id = WorkerId((self.next_id.0 + 1) % self.workers.len());
104-
self.configs.insert(request.uri.clone(), config.clone());
112+
self.configs.insert(request_crawl.uri, config.clone());
105113
self.workers[config.worker_id.0].command_tx.push(Command::Connect(config)).unwrap();
106114
}
107115
}

rsky-relay/src/client/mod.rs renamed to rsky-relay/src/crawler/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ mod manager;
33
mod types;
44
mod worker;
55

6-
pub use manager::{Manager, ManagerError};
6+
pub use manager::Manager;

rsky-relay/src/client/types.rs renamed to rsky-relay/src/crawler/types.rs

-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ pub struct Config {
3131
#[derive(Debug, Clone)]
3232
pub enum Command {
3333
Connect(Config),
34-
Reconnect(LocalId),
3534
Shutdown,
3635
}
3736

rsky-relay/src/client/worker.rs renamed to rsky-relay/src/crawler/worker.rs

+52-26
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1+
use std::thread;
2+
3+
#[cfg(target_os = "linux")]
14
use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags};
25
use thiserror::Error;
36

4-
use crate::client::connection::{Connection, ConnectionError};
5-
use crate::client::types::{Command, CommandReceiver, Config, LocalId, StatusSender, WorkerId};
7+
use crate::crawler::connection::{Connection, ConnectionError};
8+
use crate::crawler::types::{Command, CommandReceiver, Config, LocalId, StatusSender, WorkerId};
69
use crate::types::MessageSender;
710

11+
#[cfg(target_os = "linux")]
812
const EPOLL_FLAGS: EpollFlags = EpollFlags::EPOLLIN;
913

1014
#[derive(Debug, Error)]
@@ -20,7 +24,9 @@ pub struct Worker {
2024
message_tx: MessageSender,
2125
status_tx: StatusSender,
2226
command_rx: CommandReceiver,
27+
#[cfg(target_os = "linux")]
2328
epoll: Epoll,
29+
#[cfg(target_os = "linux")]
2430
events: Vec<EpollEvent>,
2531
}
2632

@@ -36,14 +42,28 @@ impl Worker {
3642
message_tx,
3743
status_tx,
3844
command_rx,
45+
#[cfg(target_os = "linux")]
3946
#[expect(clippy::expect_used)]
4047
epoll: Epoll::new(EpollCreateFlags::empty()).expect("failed to create epoll"),
48+
#[cfg(target_os = "linux")]
4149
events: vec![EpollEvent::empty(); 1024],
4250
}
4351
}
4452

45-
pub fn run(mut self) {
46-
while self.update() {}
53+
pub fn run(mut self) -> Result<(), WorkerError> {
54+
while self.update() {
55+
thread::yield_now();
56+
}
57+
self.shutdown()
58+
}
59+
60+
pub fn shutdown(mut self) -> Result<(), WorkerError> {
61+
for conn in self.connections.iter_mut().filter_map(|x| x.as_mut()) {
62+
if let Err(err) = conn.close() {
63+
tracing::warn!("crawler conn close error: {err}");
64+
}
65+
}
66+
Ok(())
4767
}
4868

4969
fn handle_command(&mut self, command: Command) -> bool {
@@ -55,44 +75,30 @@ impl Worker {
5575
self.status_tx.clone(),
5676
) {
5777
Ok(conn) => {
78+
#[cfg(target_os = "linux")]
5879
#[expect(clippy::expect_used)]
5980
self.epoll
6081
.add(&conn, EpollEvent::new(EPOLL_FLAGS, config.local_id.0 as _))
6182
.expect("failed to add connection");
6283
self.connections.push(Some(conn));
6384
self.configs.push(config);
6485
}
65-
Err(_) => todo!(),
66-
}
67-
}
68-
Command::Reconnect(local_id) => {
69-
if let Some(conn) = &mut self.connections[local_id.0] {
70-
match conn.close() {
71-
Ok(()) => {
72-
self.connections[local_id.0] = None;
73-
}
74-
Err(_) => todo!(),
75-
}
76-
}
77-
match Connection::connect(
78-
self.configs[local_id.0].clone(),
79-
self.message_tx.clone(),
80-
self.status_tx.clone(),
81-
) {
82-
Ok(conn) => {
83-
self.connections[local_id.0] = Some(conn);
86+
Err(err) => {
87+
tracing::warn!("[{}] unable to requestCrawl: {err}", config.uri);
8488
}
85-
Err(_) => todo!(),
8689
}
8790
}
88-
Command::Shutdown => return false,
91+
Command::Shutdown => {
92+
tracing::debug!("shutting down crawler: {}", self.worker_id.0);
93+
return false;
94+
}
8995
}
9096
true
9197
}
9298

99+
#[cfg(target_os = "linux")]
93100
fn update(&mut self) -> bool {
94101
for _ in 0..32 {
95-
// returns false only upon global shutdown, true otherwise
96102
if let Ok(command) = self.command_rx.pop() {
97103
if !self.handle_command(command) {
98104
return false;
@@ -128,9 +134,29 @@ impl Worker {
128134
true
129135
}
130136

137+
#[cfg(not(target_os = "linux"))]
138+
fn update(&mut self) -> bool {
139+
if let Ok(command) = self.command_rx.pop() {
140+
if !self.handle_command(command) {
141+
return false;
142+
}
143+
}
144+
145+
for _ in 0..32 {
146+
for local_id in 0..self.connections.len() {
147+
if !self.poll(LocalId(local_id)) {
148+
return false;
149+
}
150+
}
151+
}
152+
153+
true
154+
}
155+
131156
fn poll(&mut self, local_id: LocalId) -> bool {
132157
if let Some(conn) = &mut self.connections[local_id.0] {
133158
if let Err(_) = conn.poll() {
159+
#[cfg(target_os = "linux")]
134160
#[expect(clippy::expect_used)]
135161
self.epoll.delete(conn).expect("failed to delete connection");
136162
self.connections[local_id.0] = None;

rsky-relay/src/lib.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,13 @@
1-
pub mod client;
1+
mod crawler;
2+
mod publisher;
3+
mod server;
24
mod types;
35

4-
pub use types::{CrawlRequest, MessageRecycle};
6+
use std::sync::atomic::AtomicBool;
7+
8+
pub static SHUTDOWN: AtomicBool = AtomicBool::new(false);
9+
10+
pub use crawler::Manager as CrawlerManager;
11+
pub use publisher::Manager as PublisherManager;
12+
pub use server::Server;
13+
pub use types::{MessageRecycle, RequestCrawl};

0 commit comments

Comments
 (0)