- Creating websocket threads
- Managing the websocket threads
- Managing multiple orderbooks
- Python 3.7.2
- Pipenv
- websocket-client
- requests
- intermediate
Websockets allow for real time updates while putting less stress on the servers than API calls would. They are especially useful when data is updated frequently, like trades and the orderbooks on crypto currency exchanges. This tutorial will look at how to connect to multiple websockets in parallel. It's a direct continuation on the previous two parts, if anything is unclear refer to the previous tutorials.
Download the files from Github and install the virtual environment
$ cd ~/
$ git clone https://github.com/lamsd/websockets-scryto
$ cd websockets-scryto
$ pipenv install
$ pipenv shell
$ cd part_3
To allow for multiple websocket streams to be active in 'parallel' the threads
library is used. Keep in mind that in Python threads are not real threads. That is, they do not execute at the exact same moment. However, since websockets are mostly dealing with network I/O this is no problem. Real threads can be achieved by using processes
.
In order to keep the code clean and dynamic the websocket has been split up into multiple classes. The Client
class contains the basic fundamentals in order to connect to a websocket stream, this class inherits from Thread
, which is in essence just a class by itself. Then two more classes Huobi
and Binance
are created which represent the Exchanges. Inside these classes the custom code is put for dealing with those particular exchanges. These classes inherit from Client
.
This creates the structure:
# huobi.py
# inherits from Client
class Huobi(Client):
# call init from parent class
def __init__(self, url, exchange):
super().__init__(url, exchange)
# convert message to dict, decode, extract top ask/bid
def on_message(self, message):
data = loads(gzip.decompress(message).decode('utf-8'))
# extract bids/aks
if 'tick' in data:
bid = data['tick']['bids'][0]
ask = data['tick']['asks'][0]
print(f'Updated {self.exchange}')
# respond to ping message
elif 'ping' in data:
params = {"pong": "data['ping']"}
self.ws.send(dumps(params))
# convert dict to string, subscribe to data streem by sending message
def on_open(self):
super().on_open()
params = {"sub": "market.btcusdt.depth.step0", "id": "id1"}
self.ws.send(dumps(params))
Inheritance is achieved by calling the Class name between () in the class constructor, like: class Huobi(Client)
. This copies all the code from the parent class. Function and variables can be overwritten by rewriting the functions/variables in the child class. super().<function_name>
allow for the original function to be called. Code can be added before or after.
In this example super().__init__(url, exchange)
is called in the __init__
function to pass the variables url
and exchange
to the Client
class. on_message
and on_open
are rewritten, therefor replacing the original functions with the same name from the Client
class.
As Threads are just classes they are created like any other class. However, they require one function the manages the thread. Usually this is start() or run(). In this case run has been set inside Client
.
# client.py
# keep connection alive
def run(self):
while True:
self.ws.run_forever()
run()
is called when calling .start()
on the thread class.
# main.py
from binance import Binance
from huobi import Huobi
if __name__ == "__main__":
# create websocket threads
binance = Binance("wss://stream.binance.com:9443/ws/btcusdt@depth", "Binance")
huobi = Huobi("wss://api.huobipro.com/ws", "Huobi")
# start threads
binance.start()
huobi.start()
At the moment there are three active threads. The two exchange websockets and the main thread that created the other two threads. The main thread can be used to process the data from the websockets. For this a shared data structure is needed, as well as a locking mechanisme to prevent data corruption.
# data management
lock = threading.Lock()
orderbooks = {
"Binance": {},
"Huobi": {},
}
# create websocket threads
binance = Binance(
url="wss://stream.binance.com:9443/ws/btcusdt@depth",
exchange="Binance",
orderbook=orderbooks['Binance'],
lock=lock,
)
huobi = Huobi(
url="wss://api.huobipro.com/ws",
exchange="Huobi",
orderbook=orderbooks['Huobi'],
lock=lock,
)
A Lock
has two important functions acquire()
and release()
. By default these are set to blocking, which means that a thread will block until it can acquire a lock. If the lock is already taken it will wait for the lock to be freed. If something goes wrong while the lock is acquired a program can halt if the lock is not released when using blocking statements.
This is prevented with the following code:
lock.acquire()
try:
# execute code
finally:
lock.release()
This entire code block can be placed with:
with lock:
# execute code
This does exactly the same thing and looks as follows in the code:
# binance.py
# Loop through all bid and ask updates, call manage_orderbook accordingly
def process_updates(self, data):
with self.lock:
for update in data['b']:
self.manage_orderbook('bids', update)
for update in data['a']:
self.manage_orderbook('asks', update)
The same orderbooks dict is also accessible from the main thread and is accessed in the same way. To prevent the thread from keeping the orderbooks locked it sleeps 1 second on every pass. The try/except block is for when the orderbook has not been filled with data yet.
# main.py
# print top bid/ask for each exchange
# run forever
def run(orderbooks, lock):
while True:
try:
with lock:
# extract and print data
for key, value in orderbooks.items():
bid = value['bids'][0][0]
ask = value['asks'][0][0]
print(f"{key} bid: {bid} ask: {ask}")
print()
time.sleep(1)
except Exception:
pass
A different approach would be keep track on when the orderbooks were last updated and only accessing the data if that is the case. This can be achieved by adding a last_update
variable to the shared dict and then comparing this to a local last_update
in the main thread.
orderbooks = {
"Binance": {},
"Huobi": {},
"last_update": None, # new
}
def run(orderbooks, lock):
while True:
# local last_update
current_time = datetime.now()
try:
# check for new update
if orderbooks['last_update'] != current_time:
with lock:
# do stuff
# set local last_update to last_update
current_time = orderbooks['last_update']
time.sleep(0.1)
except Exception:
pass
This way only when the there is a new update the main thread will lock the orderbook and process the data. In addition the sleep time has been decreased to 0.1, making processing almost instant. Also, in both the Huobi
and Binance
classes changes have been made to update the last_update variable after each update.
# init
self.orderbook = orderbook[exchange]
self.last_update = orderbook
# after update
self.last_update['last_update'] = datetime.now()
In tutorial 1 there was a part about ping messages. After some testing it appears these messages have to be replied to to keep the connection alive.
# huobi.py
# respond to ping message
elif 'ping' in data:
params = {"pong": "data['ping']"}
self.ws.send(dumps(params))
python main.py
The code for this tutorial can be found on Github!