diff --git a/polygon/rest/__init__.py b/polygon/rest/__init__.py index 7484378e..ed57ee72 100644 --- a/polygon/rest/__init__.py +++ b/polygon/rest/__init__.py @@ -1,4 +1,5 @@ from .aggs import AggsClient +from .futures import FuturesClient from .trades import TradesClient from .quotes import QuotesClient from .snapshot import SnapshotClient @@ -23,6 +24,7 @@ class RESTClient( AggsClient, + FuturesClient, TradesClient, QuotesClient, SnapshotClient, diff --git a/polygon/rest/futures.py b/polygon/rest/futures.py new file mode 100644 index 00000000..6de7669a --- /dev/null +++ b/polygon/rest/futures.py @@ -0,0 +1,334 @@ +from typing import Optional, Any, Dict, List, Union, Iterator +from urllib3 import HTTPResponse +from datetime import datetime, date + +from .base import BaseClient +from .models.futures import ( + FuturesAgg, + FuturesContract, + FuturesProduct, + FuturesQuote, + FuturesTrade, + FuturesSchedule, + FuturesMarketStatus, + FuturesSnapshot, +) +from .models.common import Sort +from .models.request import RequestOptionBuilder + + +class FuturesClient(BaseClient): + """ + Client for the Futures REST Endpoints + (aligned with the paths from /futures/vX/...) + """ + + def list_futures_aggregates( + self, + ticker: str, + resolution: str, + window_start: Optional[str] = None, + window_start_lt: Optional[str] = None, + window_start_lte: Optional[str] = None, + window_start_gt: Optional[str] = None, + window_start_gte: Optional[str] = None, + limit: Optional[int] = None, + sort: Optional[Union[str, Sort]] = None, + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesAgg], HTTPResponse]: + """ + Endpoint: GET /futures/vX/aggs/{ticker} + + Get aggregates for a futures contract in a given time range. + This endpoint returns data that includes: + - open, close, high, low + - volume, dollar_volume, etc. + If `next_url` is present, it will be paginated. + """ + url = f"/futures/vX/aggs/{ticker}" + return self._paginate( + path=url, + params=self._get_params(self.list_futures_aggregates, locals()), + raw=raw, + deserializer=FuturesAgg.from_dict, + options=options, + ) + + def list_futures_contracts( + self, + product_code: Optional[str] = None, + first_trade_date: Optional[Union[str, date]] = None, + last_trade_date: Optional[Union[str, date]] = None, + as_of: Optional[Union[str, date]] = None, + active: Optional[str] = None, + type: Optional[str] = None, + limit: Optional[int] = None, + sort: Optional[Union[str, Sort]] = None, + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesContract], HTTPResponse]: + """ + Endpoint: GET /futures/vX/contracts + + The Contracts endpoint returns a paginated list of futures contracts. + """ + url = "/futures/vX/contracts" + return self._paginate( + path=url, + params=self._get_params(self.list_futures_contracts, locals()), + raw=raw, + deserializer=FuturesContract.from_dict, + options=options, + ) + + def get_futures_contract_details( + self, + ticker: str, + as_of: Optional[Union[str, date]] = None, + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[FuturesContract, HTTPResponse]: + """ + Endpoint: GET /futures/vX/contracts/{ticker} + + Returns details for a single contract at a specified point in time. + (No next_url in the response -> just a single get). + """ + url = f"/futures/vX/contracts/{ticker}" + return self._get( + path=url, + params=self._get_params(self.get_futures_contract_details, locals()), + deserializer=FuturesContract.from_dict, + raw=raw, + result_key="results", + options=options, + ) + + def list_futures_products( + self, + name: Optional[str] = None, + name_search: Optional[str] = None, + as_of: Optional[Union[str, date]] = None, + market_identifier_code: Optional[str] = None, + sector: Optional[str] = None, + sub_sector: Optional[str] = None, + asset_class: Optional[str] = None, + asset_sub_class: Optional[str] = None, + type: Optional[str] = None, + limit: Optional[int] = None, + sort: Optional[Union[str, Sort]] = None, + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesProduct], HTTPResponse]: + """ + Endpoint: GET /futures/vX/products + + Returns a list of futures products (including combos). + """ + url = "/futures/vX/products" + return self._paginate( + path=url, + params=self._get_params(self.list_futures_products, locals()), + raw=raw, + deserializer=FuturesProduct.from_dict, + options=options, + ) + + def get_futures_product_details( + self, + product_code: str, + type: Optional[str] = None, + as_of: Optional[Union[str, date]] = None, + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[FuturesProduct, HTTPResponse]: + """ + Endpoint: GET /futures/vX/products/{product_code} + + Returns the details for a single product as it was at a specific day. + (No next_url -> single get). + """ + url = f"/futures/vX/products/{product_code}" + return self._get( + path=url, + params=self._get_params(self.get_futures_product_details, locals()), + deserializer=FuturesProduct.from_dict, + raw=raw, + result_key="results", + options=options, + ) + + def list_futures_quotes( + self, + ticker: str, + timestamp: Optional[str] = None, + timestamp_lt: Optional[str] = None, + timestamp_lte: Optional[str] = None, + timestamp_gt: Optional[str] = None, + timestamp_gte: Optional[str] = None, + session_end_date: Optional[str] = None, + session_end_date_lt: Optional[str] = None, + session_end_date_lte: Optional[str] = None, + session_end_date_gt: Optional[str] = None, + session_end_date_gte: Optional[str] = None, + limit: Optional[int] = None, + sort: Optional[Union[str, Sort]] = None, + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesQuote], HTTPResponse]: + """ + Endpoint: GET /futures/vX/quotes/{ticker} + + Get quotes for a contract in a given time range (paginated). + """ + url = f"/futures/vX/quotes/{ticker}" + return self._paginate( + path=url, + params=self._get_params(self.list_futures_quotes, locals()), + raw=raw, + deserializer=FuturesQuote.from_dict, + options=options, + ) + + def list_futures_trades( + self, + ticker: str, + timestamp: Optional[str] = None, + timestamp_lt: Optional[str] = None, + timestamp_lte: Optional[str] = None, + timestamp_gt: Optional[str] = None, + timestamp_gte: Optional[str] = None, + session_end_date: Optional[str] = None, + session_end_date_lt: Optional[str] = None, + session_end_date_lte: Optional[str] = None, + session_end_date_gt: Optional[str] = None, + session_end_date_gte: Optional[str] = None, + limit: Optional[int] = None, + sort: Optional[Union[str, Sort]] = None, + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesTrade], HTTPResponse]: + """ + Endpoint: GET /futures/vX/trades/{ticker} + + Get trades for a contract in a given time range (paginated). + """ + url = f"/futures/vX/trades/{ticker}" + return self._paginate( + path=url, + params=self._get_params(self.list_futures_trades, locals()), + raw=raw, + deserializer=FuturesTrade.from_dict, + options=options, + ) + + def list_futures_schedules( + self, + session_end_date: Optional[str] = None, + market_identifier_code: Optional[str] = None, + limit: Optional[int] = None, + sort: Optional[Union[str, Sort]] = None, + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesSchedule], HTTPResponse]: + """ + Endpoint: GET /futures/vX/schedules + + Returns a list of trading schedules for multiple futures products on a specific date. + If `next_url` is present, this is paginated. + """ + url = "/futures/vX/schedules" + return self._paginate( + path=url, + params=self._get_params(self.list_futures_schedules, locals()), + raw=raw, + deserializer=FuturesSchedule.from_dict, + options=options, + ) + + def list_futures_schedules_by_product_code( + self, + product_code: str, + session_end_date: Optional[str] = None, + session_end_date_lt: Optional[str] = None, + session_end_date_lte: Optional[str] = None, + session_end_date_gt: Optional[str] = None, + session_end_date_gte: Optional[str] = None, + limit: Optional[int] = None, + sort: Optional[Union[str, Sort]] = None, + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesSchedule], HTTPResponse]: + """ + Endpoint: GET /futures/vX/products/{product_code}/schedules + + Returns schedule data for a single product across (potentially) many trading dates. + """ + url = f"/futures/vX/products/{product_code}/schedules" + return self._paginate( + path=url, + params=self._get_params( + self.list_futures_schedules_by_product_code, locals() + ), + raw=raw, + deserializer=FuturesSchedule.from_dict, + options=options, + ) + + def list_futures_market_statuses( + self, + product_code_any_of: Optional[str] = None, + product_code: Optional[str] = None, + limit: Optional[int] = None, + sort: Optional[Union[str, Sort]] = None, + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesMarketStatus], HTTPResponse]: + url = "/futures/vX/market-status" + return self._paginate( + path=url, + params=self._get_params(self.list_futures_market_statuses, locals()), + raw=raw, + deserializer=FuturesMarketStatus.from_dict, + options=options, + ) + + def get_futures_snapshot( + self, + ticker: Optional[str] = None, + ticker_any_of: Optional[str] = None, + ticker_gt: Optional[str] = None, + ticker_gte: Optional[str] = None, + ticker_lt: Optional[str] = None, + ticker_lte: Optional[str] = None, + product_code: Optional[str] = None, + product_code_any_of: Optional[str] = None, + product_code_gt: Optional[str] = None, + product_code_gte: Optional[str] = None, + product_code_lt: Optional[str] = None, + product_code_lte: Optional[str] = None, + limit: Optional[int] = None, + sort: Optional[Union[str, Sort]] = None, + params: Optional[Dict[str, Any]] = None, + raw: bool = False, + options: Optional[RequestOptionBuilder] = None, + ) -> Union[Iterator[FuturesSnapshot], HTTPResponse]: + url = "/futures/vX/snapshot" + return self._paginate( + path=url, + params=self._get_params(self.get_futures_snapshot, locals()), + raw=raw, + deserializer=FuturesSnapshot.from_dict, + options=options, + ) diff --git a/polygon/rest/models/__init__.py b/polygon/rest/models/__init__.py index 2c9a8086..3108ab01 100644 --- a/polygon/rest/models/__init__.py +++ b/polygon/rest/models/__init__.py @@ -5,6 +5,7 @@ from .dividends import * from .exchanges import * from .financials import * +from .futures import * from .indicators import * from .markets import * from .quotes import * diff --git a/polygon/rest/models/futures.py b/polygon/rest/models/futures.py new file mode 100644 index 00000000..8f6263e2 --- /dev/null +++ b/polygon/rest/models/futures.py @@ -0,0 +1,344 @@ +from typing import Optional, List +from ...modelclass import modelclass + + +@modelclass +class FuturesAgg: + """ + A single aggregate bar for a futures contract in a given time window. + Corresponds to /futures/vX/aggs/{ticker}. + """ + + ticker: Optional[str] = None + underlying_asset: Optional[str] = None + open: Optional[float] = None + high: Optional[float] = None + low: Optional[float] = None + close: Optional[float] = None + volume: Optional[float] = None + dollar_volume: Optional[float] = None + transaction_count: Optional[int] = None + window_start: Optional[int] = None + session_end_date: Optional[str] = None + settlement_price: Optional[float] = None + + @staticmethod + def from_dict(d): + return FuturesAgg( + ticker=d.get("ticker"), + underlying_asset=d.get("underlying_asset"), + open=d.get("open"), + high=d.get("high"), + low=d.get("low"), + close=d.get("close"), + volume=d.get("volume"), + dollar_volume=d.get("dollar_volume"), + transaction_count=d.get("transaction_count"), + window_start=d.get("window_start"), + session_end_date=d.get("session_end_date"), + settlement_price=d.get("settlement_price"), + ) + + +@modelclass +class FuturesContract: + """ + Represents a single futures contract (or a 'combo' contract). + Corresponds to /futures/vX/contracts endpoints. + """ + + ticker: Optional[str] = None + product_code: Optional[str] = None + market_identifier_code: Optional[str] = None + name: Optional[str] = None + type: Optional[str] = None + as_of: Optional[str] = None + active: Optional[bool] = None + first_trade_date: Optional[str] = None + last_trade_date: Optional[str] = None + days_to_maturity: Optional[int] = None + min_order_quantity: Optional[int] = None + max_order_quantity: Optional[int] = None + settlement_date: Optional[str] = None + settlement_tick_size: Optional[float] = None + spread_tick_size: Optional[float] = None + trade_tick_size: Optional[float] = None + maturity: Optional[str] = None + + @staticmethod + def from_dict(d): + return FuturesContract( + ticker=d.get("ticker"), + product_code=d.get("product_code"), + market_identifier_code=d.get("market_identifier_code"), + name=d.get("name"), + type=d.get("type"), + as_of=d.get("as_of"), + active=d.get("active"), + first_trade_date=d.get("first_trade_date"), + last_trade_date=d.get("last_trade_date"), + days_to_maturity=d.get("days_to_maturity"), + min_order_quantity=d.get("min_order_quantity"), + max_order_quantity=d.get("max_order_quantity"), + settlement_date=d.get("settlement_date"), + settlement_tick_size=d.get("settlement_tick_size"), + spread_tick_size=d.get("spread_tick_size"), + trade_tick_size=d.get("trade_tick_size"), + maturity=d.get("maturity"), + ) + + +@modelclass +class FuturesProduct: + """ + Represents a single futures product (or product 'combo'). + Corresponds to /futures/vX/products endpoints. + """ + + product_code: Optional[str] = None + name: Optional[str] = None + as_of: Optional[str] = None + market_identifier_code: Optional[str] = None + asset_class: Optional[str] = None + asset_sub_class: Optional[str] = None + sector: Optional[str] = None + sub_sector: Optional[str] = None + type: Optional[str] = None + last_updated: Optional[str] = None + otc_eligible: Optional[bool] = None + price_quotation: Optional[str] = None + settlement_currency_code: Optional[str] = None + settlement_method: Optional[str] = None + settlement_type: Optional[str] = None + trade_currency_code: Optional[str] = None + unit_of_measure: Optional[str] = None + unit_of_measure_quantity: Optional[float] = None + + @staticmethod + def from_dict(d): + return FuturesProduct( + product_code=d.get("product_code"), + name=d.get("name"), + as_of=d.get("as_of"), + market_identifier_code=d.get("market_identifier_code"), + asset_class=d.get("asset_class"), + asset_sub_class=d.get("asset_sub_class"), + sector=d.get("sector"), + sub_sector=d.get("sub_sector"), + type=d.get("type"), + last_updated=d.get("last_updated"), + otc_eligible=d.get("otc_eligible"), + price_quotation=d.get("price_quotation"), + settlement_currency_code=d.get("settlement_currency_code"), + settlement_method=d.get("settlement_method"), + settlement_type=d.get("settlement_type"), + trade_currency_code=d.get("trade_currency_code"), + unit_of_measure=d.get("unit_of_measure"), + unit_of_measure_quantity=d.get("unit_of_measure_quantity"), + ) + + +@modelclass +class FuturesQuote: + """ + Represents a futures NBBO quote within a given time range. + Corresponds to /futures/vX/quotes/{ticker} + """ + + ticker: Optional[str] = None + timestamp: Optional[int] = None + session_end_date: Optional[str] = None + ask_price: Optional[float] = None + ask_size: Optional[float] = None + ask_timestamp: Optional[int] = None + bid_price: Optional[float] = None + bid_size: Optional[float] = None + bid_timestamp: Optional[int] = None + + @staticmethod + def from_dict(d): + return FuturesQuote( + ticker=d.get("ticker"), + timestamp=d.get("timestamp"), + session_end_date=d.get("session_end_date"), + ask_price=d.get("ask_price"), + ask_size=d.get("ask_size"), + ask_timestamp=d.get("ask_timestamp"), + bid_price=d.get("bid_price"), + bid_size=d.get("bid_size"), + bid_timestamp=d.get("bid_timestamp"), + ) + + +@modelclass +class FuturesTrade: + """ + Represents a futures trade within a given time range. + Corresponds to /futures/vX/trades/{ticker} + """ + + ticker: Optional[str] = None + timestamp: Optional[int] = None + session_end_date: Optional[str] = None + price: Optional[float] = None + size: Optional[float] = None + + @staticmethod + def from_dict(d): + return FuturesTrade( + ticker=d.get("ticker"), + timestamp=d.get("timestamp"), + session_end_date=d.get("session_end_date"), + price=d.get("price"), + size=d.get("size"), + ) + + +@modelclass +class FuturesScheduleEvent: + """ + Represents a single market event for a schedule (preopen, open, closed, etc.). + """ + + event: Optional[str] = None + timestamp: Optional[str] = None + + @staticmethod + def from_dict(d): + return FuturesScheduleEvent( + event=d.get("event"), + timestamp=d.get("timestamp"), + ) + + +@modelclass +class FuturesSchedule: + """ + Represents a single schedule for a given session_end_date, with events. + Corresponds to /futures/vX/schedules, /futures/vX/schedules/{product_code} + """ + + session_end_date: Optional[str] = None + product_code: Optional[str] = None + market_identifier_code: Optional[str] = None + product_name: Optional[str] = None + schedule: Optional[List[FuturesScheduleEvent]] = None + + @staticmethod + def from_dict(d): + return FuturesSchedule( + session_end_date=d.get("session_end_date"), + product_code=d.get("product_code"), + market_identifier_code=d.get("market_identifier_code"), + product_name=d.get("product_name"), + schedule=[ + FuturesScheduleEvent.from_dict(ev) for ev in d.get("schedule", []) + ], + ) + + +@modelclass +class FuturesMarketStatus: + market_identifier_code: Optional[str] = None + market_status: Optional[str] = ( + None # Enum: pre_open, open, close, pause, post_close_pre_open + ) + product_code: Optional[str] = None + + @staticmethod + def from_dict(d): + return FuturesMarketStatus( + market_identifier_code=d.get("market_identifier_code"), + market_status=d.get("market_status"), + product_code=d.get("product_code"), + ) + + +@modelclass +class FuturesSnapshotDetails: + open_interest: Optional[int] = None + settlement_date: Optional[int] = None + + +@modelclass +class FuturesSnapshotMinute: + close: Optional[float] = None + high: Optional[float] = None + last_updated: Optional[int] = None + low: Optional[float] = None + open: Optional[float] = None + volume: Optional[float] = None + + +@modelclass +class FuturesSnapshotQuote: + ask: Optional[float] = None + ask_size: Optional[int] = None + ask_timestamp: Optional[int] = None + bid: Optional[float] = None + bid_size: Optional[int] = None + bid_timestamp: Optional[int] = None + last_updated: Optional[int] = None + + +@modelclass +class FuturesSnapshotTrade: + last_updated: Optional[int] = None + price: Optional[float] = None + size: Optional[int] = None + + +@modelclass +class FuturesSnapshotSession: + change: Optional[float] = None + change_percent: Optional[float] = None + close: Optional[float] = None + high: Optional[float] = None + low: Optional[float] = None + open: Optional[float] = None + previous_settlement: Optional[float] = None + settlement_price: Optional[float] = None + volume: Optional[float] = None + + +@modelclass +class FuturesSnapshot: + ticker: Optional[str] = None + product_code: Optional[str] = None + details: Optional[FuturesSnapshotDetails] = None + last_minute: Optional[FuturesSnapshotMinute] = None + last_quote: Optional[FuturesSnapshotQuote] = None + last_trade: Optional[FuturesSnapshotTrade] = None + session: Optional[FuturesSnapshotSession] = None + + @staticmethod + def from_dict(d): + return FuturesSnapshot( + ticker=d.get("ticker"), + product_code=d.get("product_code"), + details=( + FuturesSnapshotDetails.from_dict(d.get("details", {})) + if d.get("details") + else None + ), + last_minute=( + FuturesSnapshotMinute.from_dict(d.get("last_minute", {})) + if d.get("last_minute") + else None + ), + last_quote=( + FuturesSnapshotQuote.from_dict(d.get("last_quote", {})) + if d.get("last_quote") + else None + ), + last_trade=( + FuturesSnapshotTrade.from_dict(d.get("last_trade", {})) + if d.get("last_trade") + else None + ), + session=( + FuturesSnapshotSession.from_dict(d.get("session", {})) + if d.get("session") + else None + ), + ) diff --git a/polygon/websocket/__init__.py b/polygon/websocket/__init__.py index 77865d3f..1304028f 100644 --- a/polygon/websocket/__init__.py +++ b/polygon/websocket/__init__.py @@ -49,16 +49,19 @@ def __init__( ) self.api_key = api_key self.feed = feed - self.market = market + if isinstance(market, str): + self.market = Market(market) # converts str input to enum + else: + self.market = market + + self.market_value = self.market.value self.raw = raw if verbose: logger.setLevel(logging.DEBUG) self.websocket_cfg = kwargs if isinstance(feed, Enum): feed = feed.value - if isinstance(market, Enum): - market = market.value - self.url = f"ws{'s' if secure else ''}://{feed}/{market}" + self.url = f"ws{'s' if secure else ''}://{feed}/{self.market_value}" self.subscribed = False self.subs: Set[str] = set() self.max_reconnects = max_reconnects @@ -140,7 +143,7 @@ async def connect( if m["ev"] == "status": logger.debug("status: %s", m["message"]) continue - cmsg = parse(msgJson, logger) + cmsg = parse(msgJson, logger, self.market) if len(cmsg) > 0: await processor(cmsg) # type: ignore diff --git a/polygon/websocket/models/__init__.py b/polygon/websocket/models/__init__.py index 06cab55d..20c02ce1 100644 --- a/polygon/websocket/models/__init__.py +++ b/polygon/websocket/models/__init__.py @@ -1,49 +1,94 @@ -from typing import Dict, Any, List +from typing import Dict, Any, List, Type, Protocol, cast from .common import * from .models import * import logging -def parse_single(data: Dict[str, Any]): +# Protocol to define classes with from_dict method +class FromDictProtocol(Protocol): + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "FromDictProtocol": + pass + + +# Define the mapping of market and event type to model class +MARKET_EVENT_MAP: Dict[Market, Dict[str, Type[FromDictProtocol]]] = { + Market.Stocks: { + "A": EquityAgg, + "AM": EquityAgg, + "T": EquityTrade, + "Q": EquityQuote, + "LULD": LimitUpLimitDown, + "FMV": FairMarketValue, + "NOI": Imbalance, + "LV": LaunchpadValue, + }, + Market.Options: { + "A": EquityAgg, + "AM": EquityAgg, + "T": EquityTrade, + "Q": EquityQuote, + "FMV": FairMarketValue, + "LV": LaunchpadValue, + }, + Market.Indices: { + "A": EquityAgg, + "AM": EquityAgg, + "V": IndexValue, + }, + Market.Futures: { + "A": FuturesAgg, + "AM": FuturesAgg, + "T": FuturesTrade, + "Q": FuturesQuote, + }, + Market.Crypto: { + "XA": CurrencyAgg, + "XAS": CurrencyAgg, + "XT": CryptoTrade, + "XQ": CryptoQuote, + "XL2": Level2Book, + "FMV": FairMarketValue, + "AM": EquityAgg, + "LV": LaunchpadValue, + }, + Market.Forex: { + "CA": CurrencyAgg, + "CAS": CurrencyAgg, + "C": ForexQuote, + "FMV": FairMarketValue, + "AM": EquityAgg, + "LV": LaunchpadValue, + }, +} + + +def parse_single( + data: Dict[str, Any], logger: logging.Logger, market: Market +) -> Optional[WebSocketMessage]: event_type = data["ev"] - if event_type in [EventType.EquityAgg.value, EventType.EquityAggMin.value]: - return EquityAgg.from_dict(data) - elif event_type in [ - EventType.CryptoAgg.value, - EventType.CryptoAggSec.value, - EventType.ForexAgg.value, - EventType.ForexAggSec.value, - ]: - return CurrencyAgg.from_dict(data) - elif event_type == EventType.EquityTrade.value: - return EquityTrade.from_dict(data) - elif event_type == EventType.CryptoTrade.value: - return CryptoTrade.from_dict(data) - elif event_type == EventType.EquityQuote.value: - return EquityQuote.from_dict(data) - elif event_type == EventType.ForexQuote.value: - return ForexQuote.from_dict(data) - elif event_type == EventType.CryptoQuote.value: - return CryptoQuote.from_dict(data) - elif event_type == EventType.Imbalances.value: - return Imbalance.from_dict(data) - elif event_type == EventType.LimitUpLimitDown.value: - return LimitUpLimitDown.from_dict(data) - elif event_type == EventType.CryptoL2.value: - return Level2Book.from_dict(data) - elif event_type == EventType.Value.value: - return IndexValue.from_dict(data) - elif event_type == EventType.LaunchpadValue.value: - return LaunchpadValue.from_dict(data) - elif event_type == EventType.BusinessFairMarketValue.value: - return FairMarketValue.from_dict(data) - return None - - -def parse(msg: List[Dict[str, Any]], logger: logging.Logger) -> List[WebSocketMessage]: + # Look up the model class based on market and event type + model_class: Optional[Type[FromDictProtocol]] = MARKET_EVENT_MAP.get( + market, {} + ).get(event_type) + if model_class: + parsed = model_class.from_dict(data) + return cast( + WebSocketMessage, parsed + ) # Ensure the return type is WebSocketMessage + else: + # Log a warning for unrecognized event types, unless it's a status message + if event_type != "status": + logger.warning("Unknown event type '%s' for market %s", event_type, market) + return None + + +def parse( + msg: List[Dict[str, Any]], logger: logging.Logger, market: Market +) -> List[WebSocketMessage]: res = [] for m in msg: - parsed = parse_single(m) + parsed = parse_single(m, logger, market) if parsed is None: if m["ev"] != "status": logger.warning("could not parse message %s", m) diff --git a/polygon/websocket/models/common.py b/polygon/websocket/models/common.py index 38aea4c4..261f664b 100644 --- a/polygon/websocket/models/common.py +++ b/polygon/websocket/models/common.py @@ -28,6 +28,7 @@ class Market(Enum): Forex = "forex" Crypto = "crypto" Indices = "indices" + Futures = "futures" class EventType(Enum): @@ -46,12 +47,10 @@ class EventType(Enum): LimitUpLimitDown = "LULD" CryptoL2 = "XL2" Value = "V" - """Launchpad* EventTypes are only available to Launchpad users. These values are the same across all asset classes ( - stocks, options, forex, crypto). - """ LaunchpadValue = "LV" LaunchpadAggMin = "AM" - """Business* EventTypes are only available to Business users. These values are the same across all asset classes ( - stocks, options, forex, crypto). - """ BusinessFairMarketValue = "FMV" + FuturesTrade = "T" + FuturesQuote = "Q" + FuturesAgg = "A" + FuturesAggMin = "AM" diff --git a/polygon/websocket/models/models.py b/polygon/websocket/models/models.py index d6fa0c29..9b95b302 100644 --- a/polygon/websocket/models/models.py +++ b/polygon/websocket/models/models.py @@ -359,6 +359,93 @@ def from_dict(d): ) +@modelclass +class FuturesTrade: + event_type: Optional[str] = None + symbol: Optional[str] = None + price: Optional[float] = None + size: Optional[int] = None + timestamp: Optional[int] = None + sequence_number: Optional[int] = None + + @staticmethod + def from_dict(d): + return FuturesTrade( + event_type=d.get("ev"), + symbol=d.get("sym"), + price=d.get("p"), + size=d.get("s"), + timestamp=d.get("t"), + sequence_number=d.get("q"), + ) + + +@modelclass +class FuturesQuote: + event_type: Optional[str] = None + symbol: Optional[str] = None + bid_price: Optional[float] = None + bid_size: Optional[int] = None + bid_timestamp: Optional[int] = None + ask_price: Optional[float] = None + ask_size: Optional[int] = None + ask_timestamp: Optional[int] = None + sip_timestamp: Optional[int] = None + + @staticmethod + def from_dict(d): + return FuturesQuote( + event_type=d.get("ev"), + symbol=d.get("sym"), + bid_price=d.get("bp"), + bid_size=d.get("bs"), + bid_timestamp=d.get("bt"), + ask_price=d.get("ap"), + ask_size=d.get("as"), + ask_timestamp=d.get("at"), + sip_timestamp=d.get("t"), + ) + + +@modelclass +class FuturesAgg: + event_type: Optional[str] = None + symbol: Optional[str] = None + volume: Optional[float] = None + accumulated_volume: Optional[float] = None + official_open_price: Optional[float] = None + vwap: Optional[float] = None + open: Optional[float] = None + close: Optional[float] = None + high: Optional[float] = None + low: Optional[float] = None + aggregate_vwap: Optional[float] = None + average_size: Optional[float] = None + start_timestamp: Optional[int] = None + end_timestamp: Optional[int] = None + otc: Optional[bool] = None # If present + + @staticmethod + def from_dict(d): + return FuturesAgg( + event_type=d.get("ev"), + symbol=d.get("sym"), + volume=d.get("v"), + accumulated_volume=d.get("av"), + official_open_price=d.get("op"), + vwap=d.get("vw"), + open=d.get("o"), + close=d.get("c"), + high=d.get("h"), + low=d.get("l"), + aggregate_vwap=d.get("a"), + average_size=d.get("z"), + start_timestamp=d.get("s"), + end_timestamp=d.get("e"), + otc=d.get("otc"), + ) + + WebSocketMessage = NewType( "WebSocketMessage", List[ @@ -376,6 +463,9 @@ def from_dict(d): IndexValue, LaunchpadValue, FairMarketValue, + FuturesTrade, + FuturesQuote, + FuturesAgg, ] ], )