From 103a92cadd273cd4f3a93e8f4506794948809999 Mon Sep 17 00:00:00 2001 From: Ayaz Abbas Date: Wed, 23 Oct 2024 09:14:13 +0100 Subject: [PATCH] use slot instead of publish_time for unique id --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/bin/pyth_reader.rs | 5 ++++- src/bin/websocket_server.rs | 11 +++++++++-- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 722bde8..c2c5677 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2868,7 +2868,7 @@ dependencies = [ [[package]] name = "pyth-stream" -version = "0.1.5" +version = "0.1.6" dependencies = [ "anyhow", "async-nats", diff --git a/Cargo.toml b/Cargo.toml index 599368e..b163cbc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-stream" -version = "0.1.5" +version = "0.1.6" edition = "2021" [lib] diff --git a/src/bin/pyth_reader.rs b/src/bin/pyth_reader.rs index 2500ca8..bd7a1d6 100644 --- a/src/bin/pyth_reader.rs +++ b/src/bin/pyth_reader.rs @@ -43,6 +43,7 @@ struct PriceInfo { conf: String, expo: i32, publish_time: i64, + slot: u64, // Add this field } // Add this struct to deserialize the configuration @@ -149,12 +150,14 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig) conf: price_account.agg.conf.to_string(), expo: price_account.expo, publish_time: price_account.timestamp, + slot: update.context.slot, // Add this field }, ema_price: PriceInfo { price: price_account.ema_price.val.to_string(), conf: price_account.ema_conf.val.to_string(), expo: price_account.expo, publish_time: price_account.timestamp, + slot: update.context.slot, // Add this field }, }, }; @@ -164,7 +167,7 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig) // Create a unique message ID let message_id = format!( "{}:{}", - price_update.price_feed.id, price_update.price_feed.price.publish_time + price_update.price_feed.id, price_update.price_feed.price.slot ); // Create headers with the Nats-Msg-Id diff --git a/src/bin/websocket_server.rs b/src/bin/websocket_server.rs index 09f9f97..82581d5 100644 --- a/src/bin/websocket_server.rs +++ b/src/bin/websocket_server.rs @@ -263,6 +263,7 @@ struct PriceInfo { conf: String, expo: i32, publish_time: i64, + slot: u64, // Add this field } async fn handle_nats_messages(jetstream: jetstream::Context, clients: Clients) -> Result<()> { @@ -309,7 +310,10 @@ async fn handle_nats_messages(jetstream: jetstream::Context, clients: Clients) - }; price_update.price_feed.id = hex_id.clone(); - debug!("Parsed price update for feed ID: {}", hex_id); + debug!( + "Parsed price update for feed ID: {} at slot: {}", + hex_id, price_update.price_feed.price.slot + ); let clients = clients.lock().await; debug!("Number of connected clients: {}", clients.len()); @@ -320,7 +324,10 @@ async fn handle_nats_messages(jetstream: jetstream::Context, clients: Clients) - if let Err(e) = sender.send(Message::Text(update_json)) { warn!(client_addr = %client_addr, error = %e, "Failed to send price update to client"); } else { - debug!("Successfully sent update to client: {}", client_addr); + debug!( + "Successfully sent update to client: {} for slot: {}", + client_addr, price_update.price_feed.price.slot + ); } } else { debug!(