forked from slazarov/python-bittrex-websocket
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwebsocket_client.py
179 lines (154 loc) · 5.87 KB
/
websocket_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#!/usr/bin/python
# -*- coding: utf-8 -*-
# bittrex_websocket/websocket_client.py
# Stanislav Lazarov
from threading import Thread
from time import sleep
import cfscrape
from requests import Session
from signalr import Connection
class BittrexSocket(object):
def __init__(self, tickers=None, conn_type='normal'):
"""
:param tickers: a list of tickers, single tickers should also be supplied as a list
:type tickers: []
:param conn_type: 'normal' direct connection or 'cloudflare' workaround
:type conn_type: str
"""
if tickers is None:
self.tickers = ['BTC-ETH']
else:
self.tickers = tickers
self.conn_type = conn_type
self.timeout = 120000
self.conn_list = []
self.threads = []
self.url = 'http://socket-stage.bittrex.com/signalr' # ''http://socket.bittrex.com/signalr'
self.client_callbacks = ['updateExchangeState'] # ['updateSummaryState']
self.server_callbacks = ['SubscribeToExchangeDeltas']
def run(self):
self.on_open()
thread = Thread(target=self._go)
thread.daemon = True
self.threads.append(thread)
self.threads[0].start()
def _go(self):
# Create socket connections
self._start()
def stop(self):
# To-do: come up with better handling of websocket stop
for conn in self.conn_list:
for cb in self.client_callbacks:
conn['corehub'].client.off(cb, self.on_message)
conn['connection'].close()
self.on_close()
def _start(self):
def get_chunks(l, n):
# Yield successive n-sized chunks from l.
for i in range(0, len(l), n):
yield l[i:i + n]
# Initiate a generator that splits the ticker list into chunks
ticker_gen = get_chunks(self.tickers, 20)
while True:
try:
chunk_list = next(ticker_gen)
except StopIteration:
break
if chunk_list is not None:
# Create connection object
conn_obj = self._create_connection()
# Create thread
thread = Thread(target=self._subscribe, args=(conn_obj, chunk_list))
self.threads.append(thread)
conn_obj['thread-name'] = thread.getName()
self.conn_list.append(conn_obj)
thread.start()
return
def _create_connection(self):
# Sometimes Bittrex blocks the normal connection, so
# we have to use a Cloudflare workaround
if self.conn_type == 'normal':
with Session() as connection:
conn = Connection(self.url, connection)
elif self.conn_type == 'cloudflare':
with cfscrape.create_scraper() as connection:
conn = Connection(self.url, connection)
else:
raise Exception('Connection type is invalid, set conn_type to \'normal\' or \'cloudflare\'')
conn.received += self.on_debug
conn.error += self.on_error
corehub = conn.register_hub('coreHub')
conn_object = {'connection': conn, 'corehub': corehub}
return conn_object
def _subscribe(self, conn_object, tickers):
conn, corehub = conn_object['connection'], conn_object['corehub']
print('Establishing ticker update connection...')
try:
conn.start()
print('Ticker update connection established.')
# Subscribe
for cb in self.client_callbacks:
corehub.client.on(cb, self.on_message)
for k, cb in enumerate(self.server_callbacks):
for i, ticker in enumerate(tickers):
corehub.server.invoke(cb, ticker)
if i == len(tickers) - 1:
sleep(5)
# Close the connection if no message is received after timeout value.
conn.wait(self.timeout)
except Exception as e:
print(e)
print('Failed to establish connection')
return
def on_open(self):
# Called before initiating the websocket connection
# Use it when you want to add optional parameters
pass
def on_close(self):
# Called after closing the websocket connection
# Use it when you want to add any closing logic.
print('Bittrex websocket closed.')
def on_error(self, error):
# Error handler
print(error)
self.stop()
def on_debug(self, **kwargs):
# Debug information, shows all data
print(kwargs)
def on_message(self, *args, **kwargs):
"""
This is where you get the order flow stream.
Subscribed via 'updateExchangeState'
Access it from args[0]
Example output:
{
'MarketName': 'BTC-ETH',
'Nounce': 101846,
'Buys': [{'Type': 1, 'Rate': 0.05369548, 'Quantity': 0.0}],
'Sells': [{'Type': 2, 'Rate': 0.05373854, 'Quantity': 62.48260112}],
'Fills': [{'OrderType': 'BUY', 'Rate': 0.05373854, 'Quantity': 0.88839888,
'TimeStamp': '2017-11-24T13:18:43.44'}]
}
"""
print(args[0])
if __name__ == "__main__":
class MyBittrexSocket(BittrexSocket):
def on_open(self):
self.client_callbacks = ['updateExchangeState']
self.nounces = []
self.msg_count = 0
def on_debug(self, **kwargs):
pass
def on_message(self, *args, **kwargs):
self.nounces.append(args[0])
self.msg_count += 1
tickers = ['BTC-ETH', 'ETH-1ST', 'BTC-1ST', 'BTC-NEO', 'ETH-NEO']
ws = MyBittrexSocket(tickers)
ws.run()
while ws.msg_count < 20:
sleep(1)
continue
else:
for msg in ws.nounces:
print(msg)
ws.stop()