@@ -3,12 +3,14 @@ use async_nats::jetstream::{self};
33use async_nats:: HeaderMap ;
44use clap:: Parser ;
55use config:: Config ;
6+ use futures:: future:: join_all;
67use pyth_sdk_solana:: state:: { load_price_account, PriceStatus , PythnetPriceAccount } ;
78use pyth_stream:: utils:: setup_jetstream;
89use serde:: { Deserialize , Deserializer , Serialize } ;
910use solana_account_decoder:: UiAccountEncoding ;
1011use solana_client:: nonblocking:: pubsub_client:: PubsubClient ;
1112use solana_client:: rpc_config:: { RpcAccountInfoConfig , RpcProgramAccountsConfig } ;
13+ use solana_client:: rpc_response:: { Response , RpcKeyedAccount } ;
1214use solana_metrics:: datapoint_info;
1315use solana_sdk:: account:: Account ;
1416use solana_sdk:: commitment_config:: CommitmentConfig ;
@@ -17,6 +19,7 @@ use std::collections::HashMap;
1719use std:: collections:: HashSet ;
1820use std:: path:: PathBuf ;
1921use std:: str:: FromStr ;
22+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
2023use std:: sync:: Arc ;
2124use std:: time:: Instant ;
2225use tokio:: sync:: Mutex ;
@@ -26,6 +29,7 @@ use tokio_stream::StreamExt;
2629use tracing:: { debug, error, info, warn} ;
2730use tracing_subscriber:: { fmt, EnvFilter } ;
2831use url:: Url ;
32+
2933#[ derive( Debug , Serialize , Deserialize ) ]
3034struct PriceUpdate {
3135 #[ serde( rename = "type" ) ]
@@ -101,6 +105,111 @@ struct Args {
101105 config : Option < PathBuf > ,
102106}
103107
108+ fn get_price_account_from_update (
109+ update : & Response < RpcKeyedAccount > ,
110+ ) -> Result < PythnetPriceAccount > {
111+ let account: Account = match update. value . account . decode ( ) {
112+ Some ( account) => account,
113+ _none => {
114+ warn ! ( "Failed to decode account from update" ) ;
115+ return Err ( anyhow:: anyhow!( "Failed to decode account from update" ) ) ;
116+ }
117+ } ;
118+
119+ let price_account: PythnetPriceAccount = match load_price_account ( & account. data ) {
120+ Ok ( pyth_account) => * pyth_account,
121+ Err ( _) => {
122+ debug ! ( "Not a price account, skipping" ) ;
123+ return Err ( anyhow:: anyhow!( "Not a price account, skipping" ) ) ;
124+ }
125+ } ;
126+
127+ Ok ( price_account)
128+ }
129+
130+ async fn publish_price_updates (
131+ jetstream : jetstream:: Context ,
132+ price_account : PythnetPriceAccount ,
133+ update : & Response < RpcKeyedAccount > ,
134+ ) {
135+ let price_update = PriceUpdate {
136+ update_type : "price_update" . to_string ( ) ,
137+ price_feed : PriceFeed {
138+ id : update. value . pubkey . to_string ( ) ,
139+ price : PriceInfo {
140+ price : price_account. agg . price . to_string ( ) ,
141+ conf : price_account. agg . conf . to_string ( ) ,
142+ expo : price_account. expo ,
143+ publish_time : price_account. timestamp ,
144+ slot : update. context . slot ,
145+ } ,
146+ ema_price : PriceInfo {
147+ price : price_account. ema_price . val . to_string ( ) ,
148+ conf : price_account. ema_conf . val . to_string ( ) ,
149+ expo : price_account. expo ,
150+ publish_time : price_account. timestamp ,
151+ slot : update. context . slot ,
152+ } ,
153+ } ,
154+ } ;
155+ let price_update_message = serde_json:: to_string ( & price_update) . unwrap ( ) ;
156+
157+ // Create a unique message ID
158+ let price_update_message_id = format ! (
159+ "{}:{}" ,
160+ price_update. price_feed. id, price_update. price_feed. price. slot
161+ ) ;
162+
163+ // Create headers with the Nats-Msg-Id
164+ let mut price_update_headers = HeaderMap :: new ( ) ;
165+ price_update_headers. insert ( "Nats-Msg-Id" , price_update_message_id. as_str ( ) ) ;
166+
167+ match jetstream
168+ . publish_with_headers (
169+ "pyth.price.updates" ,
170+ price_update_headers,
171+ price_update_message. into ( ) ,
172+ )
173+ . await
174+ {
175+ Ok ( _) => debug ! (
176+ "Published price update to JetStream with ID: {}" ,
177+ price_update_message_id
178+ ) ,
179+ Err ( e) => warn ! ( "Failed to publish price update to JetStream: {}" , e) ,
180+ }
181+ }
182+
183+ /**
184+ * Process the publisher price updates for a given price account and update
185+ * @param price_account: The price account
186+ * @param update: The update
187+ * @param publisher_buffer: The publisher buffer
188+ */
189+ async fn process_publisher_price_updates (
190+ price_account : PythnetPriceAccount ,
191+ update : & Response < RpcKeyedAccount > ,
192+ publisher_buffer : PublisherBuffer ,
193+ ) {
194+ for component in price_account. comp {
195+ if component. publisher != Pubkey :: default ( ) {
196+ let publisher_price_update = PublisherPriceUpdate {
197+ feed_id : update. value . pubkey . to_string ( ) ,
198+ publisher : component. publisher . to_string ( ) ,
199+ price : price_account. agg . price . to_string ( ) ,
200+ slot : update. context . slot ,
201+ } ;
202+
203+ let key = (
204+ publisher_price_update. feed_id . clone ( ) ,
205+ publisher_price_update. publisher . clone ( ) ,
206+ ) ;
207+ let mut buf = publisher_buffer. lock ( ) . await ;
208+ buf. insert ( key, publisher_price_update) ;
209+ }
210+ }
211+ }
212+
104213async fn fetch_price_updates ( jetstream : jetstream:: Context , config : & AppConfig ) -> Result < ( ) > {
105214 info ! ( "Starting Pyth reader" ) ;
106215 let publisher_buffer: PublisherBuffer = Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ;
@@ -125,10 +234,9 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig)
125234 . program_subscribe ( & config. pyth . program_key , Some ( rpc_config) )
126235 . await ?;
127236
128- let mut update_count = 0 ;
129- let mut duration_count = 0 ;
130- let mut unique_price_feeds = HashSet :: new ( ) ;
131- let mut last_report_time = Instant :: now ( ) ;
237+ let update_count = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
238+ let duration_count = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
239+ let unique_price_feeds = Arc :: new ( Mutex :: new ( HashSet :: < Pubkey > :: new ( ) ) ) ;
132240 let jetstream_clone = jetstream. clone ( ) ;
133241 let buffer_clone = publisher_buffer. clone ( ) ;
134242 let mut msg_id_counter = 0 ;
@@ -170,26 +278,40 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig)
170278 }
171279 } ) ;
172280
281+ let update_count_clone = update_count. clone ( ) ;
282+ let duration_count_clone = duration_count. clone ( ) ;
283+ tokio:: spawn ( async move {
284+ loop {
285+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
286+ info ! (
287+ "Total duration: {:?}, Update Count: {:?}" ,
288+ duration_count_clone. load( Ordering :: Relaxed ) as u128 ,
289+ update_count_clone. load( Ordering :: Relaxed ) ,
290+ ) ;
291+ if update_count_clone. load ( Ordering :: Relaxed ) > 0 {
292+ info ! (
293+ "Average duration: {:?}" ,
294+ duration_count_clone. load( Ordering :: Relaxed ) as u128
295+ / update_count_clone. load( Ordering :: Relaxed ) as u128
296+ ) ;
297+ }
298+ duration_count_clone. store ( 0 , Ordering :: Relaxed ) ;
299+ update_count_clone. store ( 0 , Ordering :: Relaxed ) ;
300+ }
301+ } ) ;
302+
173303 while let Some ( update) = notif. next ( ) . await {
174- debug ! ( "Received price update" ) ;
175304 let start_time = Instant :: now ( ) ;
176305
177- let account: Account = match update. value . account . decode ( ) {
178- Some ( account) => account,
306+ debug ! ( "Received price update" ) ;
307+ let price_account: PythnetPriceAccount = match get_price_account_from_update ( & update) {
308+ Ok ( account) => account,
179309 _none => {
180310 warn ! ( "Failed to decode account from update" ) ;
181311 continue ;
182312 }
183313 } ;
184314
185- let price_account: PythnetPriceAccount = match load_price_account ( & account. data ) {
186- Ok ( pyth_account) => * pyth_account,
187- Err ( _) => {
188- debug ! ( "Not a price account, skipping" ) ;
189- continue ;
190- }
191- } ;
192-
193315 // We want to send price updates whenever the aggregate changes but sometimes the accounts can change without the aggregate changing
194316 if price_account. agg . status == PriceStatus :: Trading
195317 && ( update. context . slot - price_account. agg . pub_slot )
@@ -200,98 +322,27 @@ async fn fetch_price_updates(jetstream: jetstream::Context, config: &AppConfig)
200322 price_account. prod
201323 ) ;
202324
203- let price_update = PriceUpdate {
204- update_type : "price_update" . to_string ( ) ,
205- price_feed : PriceFeed {
206- id : update. value . pubkey . to_string ( ) ,
207- price : PriceInfo {
208- price : price_account. agg . price . to_string ( ) ,
209- conf : price_account. agg . conf . to_string ( ) ,
210- expo : price_account. expo ,
211- publish_time : price_account. timestamp ,
212- slot : update. context . slot , // Add this field
213- } ,
214- ema_price : PriceInfo {
215- price : price_account. ema_price . val . to_string ( ) ,
216- conf : price_account. ema_conf . val . to_string ( ) ,
217- expo : price_account. expo ,
218- publish_time : price_account. timestamp ,
219- slot : update. context . slot , // Add this field
220- } ,
221- } ,
222- } ;
223-
224- let price_update_message = serde_json:: to_string ( & price_update) ?;
225-
226- // Create a unique message ID
227- let price_update_message_id = format ! (
228- "{}:{}" ,
229- price_update. price_feed. id, price_update. price_feed. price. slot
230- ) ;
231-
232- // Create headers with the Nats-Msg-Id
233- let mut price_update_headers = HeaderMap :: new ( ) ;
234- price_update_headers. insert ( "Nats-Msg-Id" , price_update_message_id. as_str ( ) ) ;
235-
236325 let jetstream_clone = jetstream. clone ( ) ;
326+ let publisher_buffer_clone = publisher_buffer. clone ( ) ;
327+ let unique_price_feeds_clone = unique_price_feeds. clone ( ) ;
328+ let update_count_clone = update_count. clone ( ) ;
329+ let duration_count_clone = duration_count. clone ( ) ;
237330 task:: spawn ( async move {
238- match jetstream_clone
239- . publish_with_headers (
240- "pyth.price.updates" ,
241- price_update_headers,
242- price_update_message. into ( ) ,
243- )
244- . await
245- {
246- Ok ( _) => debug ! (
247- "Published price update to JetStream with ID: {}" ,
248- price_update_message_id
249- ) ,
250- Err ( e) => warn ! ( "Failed to publish price update to JetStream: {}" , e) ,
251- }
331+ publish_price_updates ( jetstream_clone, price_account, & update) . await ;
332+ process_publisher_price_updates ( price_account, & update, publisher_buffer_clone)
333+ . await ;
334+
335+ let mut feeds = unique_price_feeds_clone. lock ( ) . await ;
336+ feeds. insert ( price_account. prod ) ;
337+ let end_time = Instant :: now ( ) ;
338+ let duration = end_time. duration_since ( start_time) ;
339+ update_count_clone. fetch_add ( 1 , Ordering :: Relaxed ) ;
340+ duration_count_clone. fetch_add ( duration. as_millis ( ) as usize , Ordering :: Relaxed ) ;
341+ info ! ( "Processing price update, slot: {}" , update. context. slot) ;
252342 } ) ;
253-
254- for component in price_account. comp {
255- let publisher = component. publisher . to_string ( ) ;
256- let publisher_price_update = PublisherPriceUpdate {
257- feed_id : update. value . pubkey . to_string ( ) ,
258- publisher : publisher. clone ( ) ,
259- price : price_account. agg . price . to_string ( ) ,
260- slot : update. context . slot , // Add this field
261- } ;
262-
263- let key = ( publisher_price_update. feed_id . clone ( ) , publisher) ;
264- let mut buf = publisher_buffer. lock ( ) . await ;
265- buf. insert ( key, publisher_price_update) ;
266- }
267-
268- let end_time = Instant :: now ( ) ;
269- let duration = end_time. duration_since ( start_time) ;
270- duration_count += duration. as_micros ( ) ;
271- update_count += 1 ;
272- unique_price_feeds. insert ( price_account. prod ) ;
273343 } else {
274344 debug ! ( "Skipping price update due to invalid status or slot difference" ) ;
275345 }
276-
277- // Report aggregate information every minute and emit metrics
278- if last_report_time. elapsed ( ) >= Duration :: from_secs ( 1 ) {
279- info ! (
280- "Processed {} updates from {} unique price feeds in the last 1 secs" ,
281- update_count,
282- unique_price_feeds. len( )
283- ) ;
284- info ! (
285- "last slot: {}, comp: {}" ,
286- update. context. slot,
287- price_account. comp. len( )
288- ) ;
289- info ! ( "Average duration: {:?}" , duration_count / update_count) ;
290- duration_count = 0 ;
291- update_count = 0 ;
292- unique_price_feeds. clear ( ) ;
293- last_report_time = Instant :: now ( ) ;
294- }
295346 }
296347
297348 // If we exit the loop, it means the stream has ended
0 commit comments