77
88from pusher .config import Config , STALE_TIMEOUT_SECONDS
99from pusher .exception import StaleConnectionError
10- from pusher .price_state import PriceState , PriceUpdate
10+ from pusher .price_state import PriceSourceState , PriceUpdate
1111
1212# This will be in config, but note here.
1313# Other RPC providers exist but so far we've seen their support is incomplete.
@@ -20,10 +20,11 @@ class HyperliquidListener:
2020 Subscribe to any relevant Hyperliquid websocket streams
2121 See https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/websocket
2222 """
23- def __init__ (self , config : Config , price_state : PriceState ):
23+ def __init__ (self , config : Config , hl_oracle_state : PriceSourceState , hl_mark_state : PriceSourceState ):
2424 self .hyperliquid_ws_urls = config .hyperliquid .hyperliquid_ws_urls
25- self .market_symbol = config .hyperliquid .market_symbol
26- self .price_state = price_state
25+ self .asset_context_symbols = config .hyperliquid .asset_context_symbols
26+ self .hl_oracle_state = hl_oracle_state
27+ self .hl_mark_state = hl_mark_state
2728
2829 def get_subscribe_request (self , asset ):
2930 return {
@@ -44,9 +45,10 @@ async def subscribe_single(self, url):
4445
4546 async def subscribe_single_inner (self , url ):
4647 async with websockets .connect (url ) as ws :
47- subscribe_request = self .get_subscribe_request (self .market_symbol )
48- await ws .send (json .dumps (subscribe_request ))
49- logger .info ("Sent subscribe request to {}" , url )
48+ for symbol in self .asset_context_symbols :
49+ subscribe_request = self .get_subscribe_request (symbol )
50+ await ws .send (json .dumps (subscribe_request ))
51+ logger .info ("Sent subscribe request for symbol: {} to {}" , symbol , url )
5052
5153 # listen for updates
5254 while True :
@@ -76,10 +78,10 @@ async def subscribe_single_inner(self, url):
7678 def parse_hyperliquid_ws_message (self , message ):
7779 try :
7880 ctx = message ["data" ]["ctx" ]
81+ symbol = message ["data" ]["coin" ]
7982 now = time .time ()
80- self .price_state .hl_oracle_price = PriceUpdate (ctx ["oraclePx" ], now )
81- self .price_state .hl_mark_price = PriceUpdate (ctx ["markPx" ], now )
82- logger .debug ("on_activeAssetCtx: oraclePx: {} marketPx: {}" , self .price_state .hl_oracle_price ,
83- self .price_state .hl_mark_price )
83+ self .hl_oracle_state .put (symbol , PriceUpdate (ctx ["oraclePx" ], now ))
84+ self .hl_mark_state .put (symbol , PriceUpdate (ctx ["markPx" ], now ))
85+ logger .debug ("on_activeAssetCtx: oraclePx: {} marketPx: {}" , ctx ["oraclePx" ], ctx ["markPx" ])
8486 except Exception as e :
8587 logger .error ("parse_hyperliquid_ws_message error: message: {} e: {}" , message , e )
0 commit comments