Fix multiple subscriptions for binance
Closed this issue · 3 comments
If I configure multiple pairs for binance, it doesn't work.
To reproduce, configure:
settings_ws.json
:
{
"limit": 5,
"subscriptions": {
"binance": [
"NANO/BTC", "ETH/BTC"
]
}
}
Then run:
./ob_updater.py --debug
Error:
Task exception was never retrieved
future: <Task finished coro=<subscribe_ws() done, defined at /home/firepol/projects-python/ccxtws/utils.py:40> exception=Exception('connection was closed uncleanly (peer dropped the TCP connection without previous WebSocket closing handshake)',)>
Traceback (most recent call last):
File "/home/firepol/projects-python/ccxtws/utils.py", line 58, in subscribe_ws
await exchange.websocket_subscribe(event, symbol, {'limit': limit})
File "/home/firepol/env/ccxtws/lib/python3.6/site-packages/ccxt/async_support/base/exchange.py", line 786, in websocket_subscribe
conxid = await self._websocket_ensure_conx_active(event, symbol, True, params)
File "/home/firepol/env/ccxtws/lib/python3.6/site-packages/ccxt/async_support/base/exchange.py", line 621, in _websocket_ensure_conx_active
await self.websocket_connect(conxid)
File "/home/firepol/env/ccxtws/lib/python3.6/site-packages/ccxt/async_support/base/exchange.py", line 648, in websocket_connect
await websocket_connection.connect()
File "/home/firepol/env/ccxtws/lib/python3.6/site-packages/ccxt/async_support/websocket/websocket_connection.py", line 94, in connect
await future
Exception: connection was closed uncleanly (peer dropped the TCP connection without previous WebSocket closing handshake)
Expected result, like in the other exchanges, the program should work, not crash for multiple pairs in the same exchange.
@firepol, try this changes:
diff --git a/ob_updater.py b/ob_updater.py
index 872c459..e998fdc 100755
--- a/ob_updater.py
+++ b/ob_updater.py
@@ -27,8 +27,7 @@ def main():
settings = utils.get_exchange_settings(exchange_name)
exchange = utils.get_ccxt_exchange(exchange_name, settings)
- for symbol in symbols:
- asyncio.ensure_future(utils.subscribe_ws('ob', exchange, symbol, limit,
+ asyncio.ensure_future(utils.subscribe_ws('ob', exchange, symbols, limit,
loop, pp, args.debug, args.verbose))
loop.run_forever()
diff --git a/utils.py b/utils.py
index 42fad5a..b9d5fcb 100644
--- a/utils.py
+++ b/utils.py
-async def subscribe_ws(event, exchange, symbol, limit, loop, pp, debug=False, verbose=False):
+async def subscribe_ws(event, exchange, symbols, limit, loop, pp, debug=False, verbose=False):
@exchange.on('err')
def websocket_error(err, conxid): # pylint: disable=W0612
print(type(err).__name__ + ":" + str(err))
@@ -54,6 +57,6 @@ async def subscribe_ws(event, exchange, symbol, limit, loop, pp, debug=False, ve
sys.stdout.flush()
sys.stdout.flush()
-
- await exchange.websocket_subscribe(event, symbol, {'limit': limit})
- print(f'subscribed: {exchange.id} {symbol}')
+ for symbol in symbols:
+ await exchange.websocket_subscribe(event, symbol, {'limit': limit})
+ print(f'subscribed: {exchange.id} {symbol}')
binance use streams in websocket connection. You have to open websocket with all symbols you want to observe: wss://stream.binance.com:9443/stream?streams=<streamName1>/<streamName2>/<streamName3>
First time you try to subscribe websocket library open websocket with only one stream:wss://stream.binance.com:9443/stream?streams=NANOBTC@depth
while waiting for first subscription succeeds, if you try to subscribe to second symbol, binance ws driver need to close previous connection and open websocket with a new URL: wss://stream.binance.com:9443/stream?streams=NANOBTC@depth/ETHBTC@depth
, and the first websocket open action is closed before it succeeds.
I don't know how to handle it correctly, maybe before closing binance driver would have to wait to first subscription succeeds. but close function is not async, and we need to be sure previous ws connection is closed before connecting with new URL.
With above changes, you are ensuring not subscribing in paralell on same exchange. It is not the best solution, but it works for now.