Skip to content

Commit 4857c89

Browse files
rsky-relay: fixes/perf
1 parent d0b3697 commit 4857c89

File tree

9 files changed

+132
-66
lines changed

9 files changed

+132
-66
lines changed

rsky-relay/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ edition = "2024"
66
[dependencies]
77
# external
88
bstr = "1"
9+
bytes = "1"
910
chrono = { version = "0.4", features = ["serde"] }
1011
ciborium = "0.2"
1112
cid = { version = "0.11", features = ["serde-codec"] }
@@ -16,6 +17,7 @@ hashbrown = "0.15"
1617
http = "1"
1718
httparse = "1.10"
1819
k256 = "0.13"
20+
lru = "0.14"
1921
magnetic = "2"
2022
mimalloc = "0.1"
2123
mio = { version = "1", features = ["os-ext", "os-poll"] }

rsky-relay/src/crawler/connection.rs

+10-4
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub enum ConnectionError {
1717
#[error("tungstenite error: {0}")]
1818
Tungstenite(#[from] tungstenite::Error),
1919
#[error("thingbuf error: {0}")]
20-
Thingbuf(#[from] mpsc::errors::TrySendError),
20+
Thingbuf(#[from] mpsc::errors::Closed),
2121
}
2222

2323
pub struct Connection {
@@ -66,8 +66,14 @@ impl Connection {
6666
Ok(())
6767
}
6868

69-
pub fn poll(&mut self) -> Result<(), ConnectionError> {
69+
// false: not polled
70+
// true: polled
71+
pub fn poll(&mut self) -> Result<bool, ConnectionError> {
7072
for _ in 0..1024 {
73+
if self.message_tx.remaining() < 16 {
74+
return Ok(false);
75+
}
76+
7177
let msg = match self.client.read() {
7278
Ok(msg) => msg,
7379
Err(tungstenite::Error::Io(e)) if e.kind() == io::ErrorKind::WouldBlock => {
@@ -91,10 +97,10 @@ impl Connection {
9197
}
9298
};
9399

94-
let mut slot = self.message_tx.try_send_ref()?;
100+
let mut slot = self.message_tx.send_ref()?;
95101
slot.data = bytes;
96102
slot.hostname.clone_from(&self.hostname);
97103
}
98-
Ok(())
104+
Ok(true)
99105
}
100106
}

rsky-relay/src/crawler/worker.rs

+25-14
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub enum WorkerError {
2323
pub struct Worker {
2424
id: usize,
2525
connections: Vec<Option<Connection>>,
26+
next_idx: usize,
2627
message_tx: MessageSender,
2728
command_rx: CommandReceiver,
2829
poll: Poll,
@@ -35,7 +36,7 @@ impl Worker {
3536
) -> Result<Self, WorkerError> {
3637
let poll = Poll::new()?;
3738
let events = Events::with_capacity(1024);
38-
Ok(Self { id, connections: Vec::new(), message_tx, command_rx, poll, events })
39+
Ok(Self { id, connections: Vec::new(), next_idx: 0, message_tx, command_rx, poll, events })
3940
}
4041

4142
#[expect(clippy::unnecessary_wraps)]
@@ -101,24 +102,29 @@ impl Worker {
101102
}
102103
}
103104

105+
if self.message_tx.remaining() < 16 {
106+
break;
107+
}
108+
104109
let mut events = std::mem::replace(&mut self.events, Events::with_capacity(0));
105-
for _ in 0..32 {
110+
'outer: for _ in 0..32 {
106111
#[expect(clippy::expect_used)]
107112
self.poll
108113
.poll(&mut events, Some(Duration::from_millis(1)))
109114
.expect("failed to poll");
110115
for ev in &events {
111116
if !self.poll(ev.token().0) {
112-
return false;
117+
break 'outer;
113118
}
114119
}
115120
}
116121
self.events = events;
117122
}
118123

119-
for idx in 0..self.connections.len() {
120-
if !self.poll(idx) {
121-
return false;
124+
for _ in 0..self.connections.len() {
125+
self.next_idx = (self.next_idx + 1) % self.connections.len();
126+
if !self.poll(self.next_idx) {
127+
break;
122128
}
123129
}
124130

@@ -127,16 +133,21 @@ impl Worker {
127133

128134
fn poll(&mut self, idx: usize) -> bool {
129135
if let Some(conn) = &mut self.connections[idx] {
130-
if let Err(err) = conn.poll() {
131-
tracing::info!("[{}] disconnected: {err}", conn.hostname);
132-
#[expect(clippy::expect_used)]
133-
self.poll
134-
.registry()
135-
.deregister(&mut SourceFd(&conn.as_raw_fd()))
136-
.expect("failed to deregister");
137-
self.connections[idx] = None;
136+
match conn.poll() {
137+
Ok(true) => {}
138+
Ok(false) => return false,
139+
Err(err) => {
140+
tracing::info!("[{}] disconnected: {err}", conn.hostname);
141+
#[expect(clippy::expect_used)]
142+
self.poll
143+
.registry()
144+
.deregister(&mut SourceFd(&conn.as_raw_fd()))
145+
.expect("failed to deregister");
146+
self.connections[idx] = None;
147+
}
138148
}
139149
}
150+
140151
true
141152
}
142153
}

rsky-relay/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ pub async fn main() -> Result<()> {
3939
let (request_crawl_tx, request_crawl_rx) = rtrb::RingBuffer::new(CAPACITY2);
4040
let (subscribe_repos_tx, subscribe_repos_rx) = rtrb::RingBuffer::new(CAPACITY2);
4141
let validator = ValidatorManager::new(message_rx)?;
42+
tokio::spawn(validator.run());
4243
let crawler = CrawlerManager::new(WORKERS, &message_tx, request_crawl_rx)?;
4344
let publisher = PublisherManager::new(WORKERS, subscribe_repos_rx)?;
4445
let server = Server::new(request_crawl_tx, subscribe_repos_tx)?;
4546
thread::scope(move |s| {
46-
tokio::spawn(validator.run());
4747
thread::Builder::new().name("rsky-crawl".into()).spawn_scoped(s, move || crawler.run())?;
4848
thread::Builder::new().name("rsky-pub".into()).spawn_scoped(s, move || publisher.run())?;
4949
thread::Builder::new().name("rsky-server".into()).spawn_scoped(s, move || server.run())?;

rsky-relay/src/publisher/connection.rs

+22-11
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@ use std::os::fd::{AsRawFd, RawFd};
55
use sled::Tree;
66
use thiserror::Error;
77
use tungstenite::handshake::server::NoCallback;
8+
use tungstenite::protocol::CloseFrame;
9+
use tungstenite::protocol::frame::coding::CloseCode;
810
use tungstenite::stream::MaybeTlsStream;
9-
use tungstenite::{Bytes, HandshakeError, Message, ServerHandshake, WebSocket};
11+
use tungstenite::{Bytes, HandshakeError, Message, ServerHandshake, Utf8Bytes, WebSocket};
1012

1113
use crate::types::Cursor;
1214

13-
const OUTDATED: &[u8] = b"\xa2ate#infobop\x01\xa2dnamenOutdatedCursorgmessagex8Requested cursor exceeded limit. Possibly missing events.";
14-
const FUTURE: &[u8] = b"\xa1bop \xa2eerrorlFutureCursorgmessageuCursor in the future.";
15+
const OUTDATED_MSG: &[u8] = b"\xa2ate#infobop\x01\xa2dnamenOutdatedCursorgmessagex8Requested cursor exceeded limit. Possibly missing events.";
16+
const FUTURE_MSG: &[u8] = b"\xa1bop \xa2eerrorlFutureCursorgmessageuCursor in the future.";
17+
const FUTURE_CLOSE: CloseFrame =
18+
CloseFrame { code: CloseCode::Policy, reason: Utf8Bytes::from_static("FutureCursor") };
1519

1620
#[derive(Debug, Error)]
1721
pub enum ConnectionError {
@@ -59,12 +63,14 @@ impl Connection {
5963
Ok(Self { addr, client, cursor })
6064
}
6165

62-
pub fn close(&mut self) -> Result<(), ConnectionError> {
63-
self.client.close(None)?;
66+
pub fn close(&mut self, code: CloseFrame) -> Result<(), ConnectionError> {
67+
self.client.close(Some(code))?;
6468
self.client.flush()?;
6569
Ok(())
6670
}
6771

72+
/// false: not sent
73+
/// true: sent
6874
pub fn send(&mut self, mut seq: Cursor, data: Bytes) -> Result<bool, ConnectionError> {
6975
if self.cursor != seq {
7076
return Ok(false);
@@ -83,22 +89,27 @@ impl Connection {
8389
}
8490
}
8591

86-
pub fn poll(&mut self, mut seq: Cursor, firehose: &Tree) -> Result<(), ConnectionError> {
87-
if self.cursor.get() > seq.get() {
88-
self.send(self.cursor, Bytes::from_static(FUTURE))?;
89-
return self.close();
92+
/// false: closed
93+
/// true: not closed
94+
pub fn poll(&mut self, mut seq: Cursor, firehose: &Tree) -> Result<bool, ConnectionError> {
95+
if self.cursor.get() != 0 && self.cursor.get() > seq.get() + 1 {
96+
self.send(self.cursor, Bytes::from_static(FUTURE_MSG))?;
97+
self.close(FUTURE_CLOSE)?;
98+
return Ok(false);
9099
}
91100
for msg in firehose.range(self.cursor..=seq) {
92101
let (k, v) = msg?;
93102
seq = k.into();
94103
if self.cursor != seq {
95-
self.send(self.cursor, Bytes::from_static(OUTDATED))?;
104+
if self.cursor.get() != 0 {
105+
self.send(self.cursor, Bytes::from_static(OUTDATED_MSG))?;
106+
}
96107
self.cursor = seq;
97108
}
98109
if !self.send(seq, Bytes::from_owner(v).slice(8..))? {
99110
break;
100111
}
101112
}
102-
Ok(())
113+
Ok(true)
103114
}
104115
}

rsky-relay/src/publisher/worker.rs

+30-16
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,22 @@ use std::os::fd::AsRawFd;
22
use std::time::Duration;
33
use std::{io, thread};
44

5+
use bytes::Bytes;
56
use mio::unix::SourceFd;
67
use mio::{Events, Interest, Poll, Token};
78
use sled::Tree;
89
use thiserror::Error;
9-
use tungstenite::Bytes;
10+
use tungstenite::Utf8Bytes;
11+
use tungstenite::protocol::CloseFrame;
12+
use tungstenite::protocol::frame::coding::CloseCode;
1013

1114
use crate::publisher::connection::{Connection, ConnectionError};
1215
use crate::publisher::types::{Command, CommandReceiver};
1316
use crate::types::{Cursor, DB};
1417

1518
const INTEREST: Interest = Interest::WRITABLE;
19+
const SHUTDOWN_FRAME: CloseFrame =
20+
CloseFrame { code: CloseCode::Restart, reason: Utf8Bytes::from_static("RelayRestart") };
1621

1722
#[derive(Debug, Error)]
1823
pub enum WorkerError {
@@ -27,6 +32,7 @@ pub enum WorkerError {
2732
pub struct Worker {
2833
id: usize,
2934
connections: Vec<Option<Connection>>,
35+
next_idx: usize,
3036
command_rx: CommandReceiver,
3137
firehose: Tree,
3238
poll: Poll,
@@ -38,7 +44,7 @@ impl Worker {
3844
let firehose = DB.open_tree("firehose")?;
3945
let poll = Poll::new()?;
4046
let events = Events::with_capacity(1024);
41-
Ok(Self { id, connections: Vec::new(), command_rx, firehose, poll, events })
47+
Ok(Self { id, connections: Vec::new(), next_idx: 0, command_rx, firehose, poll, events })
4248
}
4349

4450
pub fn run(mut self) -> Result<(), WorkerError> {
@@ -53,7 +59,7 @@ impl Worker {
5359

5460
pub fn shutdown(mut self) {
5561
for conn in self.connections.iter_mut().filter_map(|x| x.as_mut()) {
56-
if let Err(err) = conn.close() {
62+
if let Err(err) = conn.close(SHUTDOWN_FRAME) {
5763
tracing::warn!("publisher conn close error: {err}");
5864
}
5965
}
@@ -112,23 +118,24 @@ impl Worker {
112118
}
113119

114120
let mut events = std::mem::replace(&mut self.events, Events::with_capacity(0));
115-
for _ in 0..32 {
121+
'outer: for _ in 0..32 {
116122
#[expect(clippy::expect_used)]
117123
self.poll
118124
.poll(&mut events, Some(Duration::from_millis(1)))
119125
.expect("failed to poll");
120126
for ev in &events {
121127
if !self.poll(*seq, ev.token().0) {
122-
return Ok(false);
128+
break 'outer;
123129
}
124130
}
125131
}
126132
self.events = events;
127133
}
128134

129-
for idx in 0..self.connections.len() {
130-
if !self.poll(*seq, idx) {
131-
return Ok(false);
135+
for _ in 0..self.connections.len() {
136+
self.next_idx = (self.next_idx + 1) % self.connections.len();
137+
if !self.poll(*seq, self.next_idx) {
138+
break;
132139
}
133140
}
134141

@@ -154,16 +161,23 @@ impl Worker {
154161

155162
fn poll(&mut self, seq: Cursor, idx: usize) -> bool {
156163
if let Some(conn) = &mut self.connections[idx] {
157-
if let Err(err) = conn.poll(seq, &self.firehose) {
158-
tracing::info!("[{}] disconnected: {err}", conn.addr);
159-
#[expect(clippy::expect_used)]
160-
self.poll
161-
.registry()
162-
.deregister(&mut SourceFd(&conn.as_raw_fd()))
163-
.expect("failed to deregister");
164-
self.connections[idx] = None;
164+
match conn.poll(seq, &self.firehose) {
165+
Ok(true) => return true,
166+
Ok(false) => {
167+
tracing::info!("[{}] closed due to invalid cursor", conn.addr);
168+
}
169+
Err(err) => {
170+
tracing::info!("[{}] disconnected: {err}", conn.addr);
171+
}
165172
}
173+
#[expect(clippy::expect_used)]
174+
self.poll
175+
.registry()
176+
.deregister(&mut SourceFd(&conn.as_raw_fd()))
177+
.expect("failed to deregister");
178+
self.connections[idx] = None;
166179
}
180+
167181
true
168182
}
169183
}

rsky-relay/src/types.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ use std::fmt;
22
use std::ops::{Add, Sub};
33
use std::sync::LazyLock;
44

5+
use bytes::Bytes;
56
use sled::{Db, IVec, Mode};
67
use thingbuf::{Recycle, mpsc};
7-
use tungstenite::Bytes;
88
use zerocopy::big_endian::U64;
99
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned};
1010

@@ -32,6 +32,7 @@ impl Recycle<Message> for MessageRecycle {
3232

3333
fn recycle(&self, element: &mut Message) {
3434
element.data.clear();
35+
element.hostname.clear();
3536
}
3637
}
3738

rsky-relay/src/validator/manager.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ impl Manager {
124124
}
125125
continue;
126126
} else if prev + 1 != curr {
127-
tracing::debug!(
127+
tracing::trace!(
128128
"[{}] seq gap: {prev} -> {curr} ({})",
129129
msg.hostname,
130130
curr - prev - 1
@@ -181,7 +181,7 @@ impl Manager {
181181
let Some((did, key)) = self.resolver.poll().await? else {
182182
continue;
183183
};
184-
let Some(msgs) = self.queue.remove(&did)? else {
184+
let Some(msgs) = self.queue.remove(did)? else {
185185
tracing::debug!("missing queue for did: {did}");
186186
continue;
187187
};
@@ -195,7 +195,7 @@ impl Manager {
195195
};
196196
#[expect(clippy::unwrap_used)]
197197
let commit = event.commit().unwrap().unwrap(); // Already tried parsing
198-
match utils::verify_commit_sig(&commit, key) {
198+
match utils::verify_commit_sig(&commit, *key) {
199199
Ok(res) => {
200200
if res {
201201
let data = event.serialize(data.len(), seq.next())?;
@@ -230,5 +230,6 @@ impl Drop for Manager {
230230
if let Err(err) = DB.flush() {
231231
tracing::warn!("unable to persist cursors: {err}\n{:#?}", self.cursors);
232232
}
233+
tracing::info!("persisted {} cursors", self.cursors.len());
233234
}
234235
}

0 commit comments

Comments
 (0)