firepol/ccxt-websockets-db-updater

Fix multiple subscriptions for binance

Closed this issue · 3 comments

If I configure multiple pairs for binance, it doesn't work.

To reproduce, configure:
settings_ws.json:

{
  "limit": 5,
  "subscriptions": {
    "binance": [
      "NANO/BTC", "ETH/BTC"
    ]
  }
}

Then run:

./ob_updater.py --debug

Error:

Task exception was never retrieved
future: <Task finished coro=<subscribe_ws() done, defined at /home/firepol/projects-python/ccxtws/utils.py:40> exception=Exception('connection was closed uncleanly (peer dropped the TCP connection without previous WebSocket closing handshake)',)>
Traceback (most recent call last):
  File "/home/firepol/projects-python/ccxtws/utils.py", line 58, in subscribe_ws
    await exchange.websocket_subscribe(event, symbol, {'limit': limit})
  File "/home/firepol/env/ccxtws/lib/python3.6/site-packages/ccxt/async_support/base/exchange.py", line 786, in websocket_subscribe
    conxid = await self._websocket_ensure_conx_active(event, symbol, True, params)
  File "/home/firepol/env/ccxtws/lib/python3.6/site-packages/ccxt/async_support/base/exchange.py", line 621, in _websocket_ensure_conx_active
    await self.websocket_connect(conxid)
  File "/home/firepol/env/ccxtws/lib/python3.6/site-packages/ccxt/async_support/base/exchange.py", line 648, in websocket_connect
    await websocket_connection.connect()
  File "/home/firepol/env/ccxtws/lib/python3.6/site-packages/ccxt/async_support/websocket/websocket_connection.py", line 94, in connect
    await future
Exception: connection was closed uncleanly (peer dropped the TCP connection without previous WebSocket closing handshake)

Expected result, like in the other exchanges, the program should work, not crash for multiple pairs in the same exchange.

lfern commented

@firepol, try this changes:

diff --git a/ob_updater.py b/ob_updater.py
index 872c459..e998fdc 100755
--- a/ob_updater.py
+++ b/ob_updater.py
@@ -27,8 +27,7 @@ def main():
             settings = utils.get_exchange_settings(exchange_name)
             exchange = utils.get_ccxt_exchange(exchange_name, settings)
 
-            for symbol in symbols:
-                asyncio.ensure_future(utils.subscribe_ws('ob', exchange, symbol, limit,
+            asyncio.ensure_future(utils.subscribe_ws('ob', exchange, symbols, limit,
                                                          loop, pp, args.debug, args.verbose))
 
         loop.run_forever()
diff --git a/utils.py b/utils.py
index 42fad5a..b9d5fcb 100644
--- a/utils.py
+++ b/utils.py
-async def subscribe_ws(event, exchange, symbol, limit, loop, pp, debug=False, verbose=False):
+async def subscribe_ws(event, exchange, symbols, limit, loop, pp, debug=False, verbose=False):
     @exchange.on('err')
     def websocket_error(err, conxid):  # pylint: disable=W0612
         print(type(err).__name__ + ":" + str(err))
@@ -54,6 +57,6 @@ async def subscribe_ws(event, exchange, symbol, limit, loop, pp, debug=False, ve
         sys.stdout.flush()
 
     sys.stdout.flush()
-
-    await exchange.websocket_subscribe(event, symbol, {'limit': limit})
-    print(f'subscribed: {exchange.id} {symbol}')
+    for symbol in symbols:
+        await exchange.websocket_subscribe(event, symbol, {'limit': limit})
+        print(f'subscribed: {exchange.id} {symbol}')
lfern commented

binance use streams in websocket connection. You have to open websocket with all symbols you want to observe: wss://stream.binance.com:9443/stream?streams=<streamName1>/<streamName2>/<streamName3>

First time you try to subscribe websocket library open websocket with only one stream:wss://stream.binance.com:9443/stream?streams=NANOBTC@depth

while waiting for first subscription succeeds, if you try to subscribe to second symbol, binance ws driver need to close previous connection and open websocket with a new URL: wss://stream.binance.com:9443/stream?streams=NANOBTC@depth/ETHBTC@depth, and the first websocket open action is closed before it succeeds.

I don't know how to handle it correctly, maybe before closing binance driver would have to wait to first subscription succeeds. but close function is not async, and we need to be sure previous ws connection is closed before connecting with new URL.

With above changes, you are ensuring not subscribing in paralell on same exchange. It is not the best solution, but it works for now.

Thanks a lot @lfern ! I implemented your fix, updated the sample config and it looks like it works like a charm.