Skip to content

Commit 8ad9e02

Browse files
committed
feat: bincode everything
1 parent a7bd5e9 commit 8ad9e02

File tree

2 files changed

+35
-60
lines changed

2 files changed

+35
-60
lines changed

src/bin/pyth_reader.rs

Lines changed: 29 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use tracing::{debug, error, info, warn};
3030
use tracing_subscriber::{fmt, EnvFilter};
3131
use url::Url;
3232

33-
#[derive(Debug, Serialize, Deserialize)]
33+
#[derive(Debug, Serialize, Deserialize, bincode::Encode, bincode::Decode)]
3434
struct PriceUpdate {
3535
#[serde(rename = "type")]
3636
update_type: String,
@@ -48,14 +48,14 @@ struct PublisherPriceUpdate {
4848
type PublisherKey = (String, String); // (feed_id, publisher)
4949
type PublisherBuffer = HashMap<PublisherKey, PublisherPriceUpdate>;
5050

51-
#[derive(Debug, Serialize, Deserialize)]
51+
#[derive(Debug, Serialize, Deserialize, bincode::Encode, bincode::Decode)]
5252
struct PriceFeed {
5353
id: String,
5454
price: PriceInfo,
5555
ema_price: PriceInfo,
5656
}
5757

58-
#[derive(Debug, Serialize, Deserialize)]
58+
#[derive(Debug, Serialize, Deserialize, bincode::Encode, bincode::Decode)]
5959
struct PriceInfo {
6060
price: String,
6161
conf: String,
@@ -152,8 +152,8 @@ async fn publish_price_updates(
152152
},
153153
},
154154
};
155-
let price_update_message = serde_json::to_string(&price_update).unwrap();
156-
155+
let price_update_message: Vec<u8> =
156+
bincode::encode_to_vec(&price_update, bincode::config::standard()).unwrap();
157157
// Create a unique message ID
158158
let price_update_message_id = format!(
159159
"{}:{}",
@@ -213,11 +213,6 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig)
213213
info!("Starting Pyth reader");
214214
let mut publisher_buffer: PublisherBuffer = HashMap::new();
215215
let client = PubsubClient::new(config.pyth.websocket_addr.as_str()).await?;
216-
info!(
217-
"Connected to Pyth WebSocket at {}",
218-
config.pyth.websocket_addr
219-
);
220-
221216
let rpc_config = RpcProgramAccountsConfig {
222217
account_config: RpcAccountInfoConfig {
223218
commitment: Some(CommitmentConfig::confirmed()),
@@ -230,63 +225,48 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig)
230225
};
231226

232227
let (mut notif, _unsub) = client
233-
.program_subscribe(&config.pyth.program_key, Some(rpc_config))
228+
.program_subscribe(&config.pyth.program_key, Some(rpc_config.clone()))
234229
.await?;
235230

236-
let update_count = Arc::new(AtomicUsize::new(0));
237-
let duration_count = Arc::new(AtomicUsize::new(0));
231+
info!(
232+
"Connected to Pyth WebSocket at {}",
233+
config.pyth.websocket_addr
234+
);
235+
236+
let mut update_count = 0;
237+
let mut duration_count = 0;
238238
let jetstream_clone = jetstream.clone();
239239
let mut msg_id_counter = 0;
240240

241-
let update_count_clone = update_count.clone();
242-
let duration_count_clone = duration_count.clone();
243-
// tokio::spawn(async move {
244-
// loop {
245-
// tokio::time::sleep(Duration::from_secs(1)).await;
246-
// info!(
247-
// "Total duration: {:?}, Update Count: {:?}",
248-
// duration_count_clone.load(Ordering::Relaxed) as u128,
249-
// update_count_clone.load(Ordering::Relaxed),
250-
// );
251-
// if update_count_clone.load(Ordering::Relaxed) > 0 {
252-
// info!(
253-
// "Average duration: {:?}",
254-
// duration_count_clone.load(Ordering::Relaxed) as u128
255-
// / update_count_clone.load(Ordering::Relaxed) as u128
256-
// );
257-
// }
258-
// duration_count_clone.store(0, Ordering::Relaxed);
259-
// update_count_clone.store(0, Ordering::Relaxed);
260-
// }
261-
// });
262-
263-
let mut interval = tokio::time::interval(Duration::from_millis(100));
241+
let mut interval = tokio::time::interval(Duration::from_millis(50));
264242
let mut last_seen_slot = 0;
265243

266244
loop {
267245
tokio::select! {
268246
_ = interval.tick() => {
269247
let instant = Instant::now();
270-
271248
let updates: Vec<PublisherPriceUpdate> = {
272249
if publisher_buffer.is_empty() {
273250
continue;
274251
}
275252
publisher_buffer.drain().map(|(_, v)| v).collect()
276253
};
277-
info!("Drained publisher buffer, size: {}, elapsed time: {:?}", updates.len(), instant.elapsed());
278254

279255
// Serialize as JSON array
280256
let body: Vec<u8> = bincode::encode_to_vec(&updates, bincode::config::standard()).unwrap();
281-
282-
info!("Serialized publisher updates, size: {}, elapsed time: {:?}", body.len(), instant.elapsed());
283-
284257
// Use a random ID as Nats-Msg-Id for the batch
285258
let msg_id = format!("publisher_batch:{}", msg_id_counter);
286259
msg_id_counter += 1;
287260
let mut headers = HeaderMap::new();
288261
headers.insert("Nats-Msg-Id", msg_id.as_str());
289-
info!("Publishing {} publisher updates in a batch, total size {}, elapsed time: {:?}", updates.len(), body.len(), instant.elapsed());
262+
// info!("Serialized publisher updates, size: {}, elapsed time: {:?}, publisher buffer size: {:?}", body.len(), instant.elapsed(), publisher_buffer.len());
263+
// info!("Publishing {} publisher updates in a batch, total size {}, elapsed time: {:?}", updates.len(), body.len(), instant.elapsed());
264+
// info!(
265+
// "Average duration: {:?}",
266+
// duration_count/ update_count
267+
// );
268+
duration_count = 0;
269+
update_count = 0;
290270
if let Err(e) = jetstream_clone
291271
.publish_with_headers("pyth.publisher.updates", headers, body.into())
292272
.await
@@ -295,20 +275,19 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig)
295275
} else {
296276
debug!("Published {} publisher updates in a batch", updates.len());
297277
}
298-
299-
info!("Time taken to process publisher updates: {:?}", instant.elapsed());
300278
}
301279
maybe_update = notif.next() => {
280+
let start_time = Instant::now();
281+
302282
let update = match maybe_update {
303283
None => {
304284
let error_msg = "Pythnet network listener stream ended unexpectedly";
305285
error!("{}", error_msg);
306-
return Err(anyhow::anyhow!(error_msg));
286+
break Ok(());
307287
},
308288
Some(update) => update
309289
};
310290

311-
let start_time = Instant::now();
312291
debug!("Received price update");
313292
let price_account: PythnetPriceAccount = match get_price_account_from_update(&update) {
314293
Ok(account) => account,
@@ -329,27 +308,25 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig)
329308
);
330309

331310
let jetstream_clone = jetstream.clone();
332-
let update_count_clone = update_count.clone();
333-
let duration_count_clone = duration_count.clone();
334311
publish_price_updates(jetstream_clone, price_account, &update).await;
335312
process_publisher_price_updates(price_account, &update, &mut publisher_buffer).await;
336313

337314
let end_time = Instant::now();
338315
let duration = end_time.duration_since(start_time);
339-
update_count_clone.fetch_add(1, Ordering::Relaxed);
340-
duration_count_clone.fetch_add(duration.as_millis() as usize, Ordering::Relaxed);
316+
update_count += 1;
317+
duration_count += duration.as_micros();
341318
if update.context.slot > last_seen_slot {
342319
last_seen_slot = update.context.slot;
343320
info!("Processing price update, slot: {}", update.context.slot);
344321
}
322+
345323
} else {
346324
debug!("Skipping price update due to invalid status or slot difference");
347325
}
326+
348327
}
349328
}
350329
}
351-
352-
// If we exit the loop, it means the stream has ended
353330
}
354331

355332
fn load_config(args: &Args) -> Result<AppConfig> {

src/bin/websocket_server.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ struct PriceUpdate {
327327
price_feed: PriceFeed,
328328
}
329329

330-
#[derive(Debug, Serialize, Deserialize)]
330+
#[derive(Debug, Serialize, Deserialize, bincode::Encode, bincode::Decode)]
331331
struct PublisherPriceUpdate {
332332
publisher: String,
333333
feed_id: String,
@@ -380,14 +380,15 @@ async fn handle_nats_publisher_updates_messages(
380380
match msg {
381381
Ok(msg) => {
382382
let updates: Vec<PublisherPriceUpdate> =
383-
match serde_json::from_slice(&msg.payload) {
384-
Ok(u) => u,
383+
match bincode::decode_from_slice(&msg.payload, bincode::config::standard())
384+
{
385+
Ok((updates, _)) => updates,
385386
Err(e) => {
386387
warn!(error = %e, "Failed to parse publisher price update batch");
387388
continue;
388389
}
389390
};
390-
debug!("Parsed {} publisher updates in batch", updates.len());
391+
info!("Parsed {} publisher updates in batch", updates.len());
391392
// Build per-client payloads while holding the lock,
392393
// but DO NOT send while holding it.
393394
let mut to_send: Vec<(String, mpsc::UnboundedSender<Message>, String)> =
@@ -403,9 +404,6 @@ async fn handle_nats_publisher_updates_messages(
403404
client_data.publisher_subscriptions.contains(&u.publisher)
404405
})
405406
.map(|u| {
406-
if u.feed_id == "7jAVut34sgRj6erznsYvLYvjc9GJwXTpN88ThZSDJ65G" {
407-
info!("publisher_price_update: {:#?}", u);
408-
}
409407
json!({
410408
"publisher": u.publisher,
411409
"feed_id": u.feed_id,
@@ -438,7 +436,7 @@ async fn handle_nats_publisher_updates_messages(
438436
batch_json,
439437
));
440438
}
441-
} // lock dropped here
439+
}
442440

443441
// Now send the prepared batches
444442
for (client_addr, sender, batch_json) in to_send {

0 commit comments

Comments
 (0)