bitrich-info/xchange-stream

[Kraken] KrakenStreamingService possible bug on switch loop

Opened this issue · 25 comments

Hello, yesterday i had an error on the websocket with a message:
2020-01-16 11:48:54 ERROR WebSocketClientHandler:112 - WebSocket client encountered exception (IOException - Invalid argument). Closing

Searching the logs i found that the last log before a connection was this:
2020-01-16 12:11:50 ERROR KrakenStreamingService:93 - Channel null has been failed: ESession:Invalid session

Then the channel send this message to infinity until i finally saw the error and close the application:
2020-01-16 12:12:20 ERROR KrakenStreamingService:107 - Unknown message: {"Error":"Exceeded msg rate"}

I search in the KrakenStreamingService and i saw that in the switch loop, there isn't any break keyword under the line 93 which is the last log that i have found.

I have code my application in order to retry to infinity until the connection succeeded, but this time the retry did not triggered indicating that no error message was thrown to the stream.

Is it possible that this missing break created the issue? It is the only one that is missing. Thanks

mdvx commented

No this missing break would not have created the issue (but it can fixed for clarity). I saw the error few times while developing the private channel code, but not recently. With Kraken, I have noticed that the rate limits apply to the API Key, not the connection, so if you have 2 connections open, the rate limit is halved. I am not excatly sure why the socket gets rate limited, since it only ever subscribes to the two private channels, unless there are a lot of orderbook subscriptions?

mdvx commented

I have code my application in order to retry to infinity until the connection succeeded,
Ah, you should probably read up on how the Kraken rate limiter works, it is not obvious.

No this missing break would not have created the issue (but it can fixed for clarity). I saw the error few times while developing the private channel code, but not recently. With Kraken, I have noticed that the rate limits apply to the API Key, not the connection, so if you have 2 connections open, the rate limit is halved. I am not excatly sure why the socket gets rate limited, since it only ever subscribes to the two private channels, unless there are a lot of orderbook subscriptions?

So this is a rate limit issue? The thing is that i have multiply orderbooks but on userTrades i have every subscription with different API keys.
It is a shame it was not so easy :)

Can i create a PR adding this break?

Hello again, i have put the break and rerun the websocket. Now i am getting a different error:

2020-01-20 20:50:07 ERROR WebSocketClientHandler:67 - WebSocket Client failed to connect. Invalid handshake response getStatus: 502 Bad Gateway [id: 0x1e8ff6cf, L:/192.168.1.3:38692 - R:ws.kraken.com/104.16.215.191:443]
2020-01-20 20:50:07 WARN KrakenStreamingService:221 - Problem with connection: class io.netty.handler.codec.http.websocketx.WebSocketHandshakeException - Invalid handshake response getStatus: 502 Bad Gateway
2020-01-20 20:50:23 WARN KrakenStreamingService:228 - Resubscribing channels
2020-01-20 20:50:33 WARN KrakenStreamingService:228 - Resubscribing channels
2020-01-20 20:50:53 ERROR KrakenStreamingService:108 - Unknown message: {"event":"error","errorMessage":"Exceeded msg rate"}
2020-01-20 20:51:03 ERROR KrakenStreamingService:108 - Unknown message: {"event":"error","errorMessage":"Exceeded msg rate"}
2020-01-20 20:51:23 ERROR KrakenStreamingService:108 - Unknown message: {"event":"error","errorMessage":"Exceeded msg rate"}

And the Unknown message prints to infinity until i close it.

Maybe this has to do with the custom kraken handler KrakenWebSocketClientHandler.class. I believe that this error has nothing to do with rate limiting from Kraken. I will check further and i will post what i have found. Does anyone encounter the same issue? Thanks

P.S: This error doesn't happen when i subscribe for the first time. It happens at random times when the websocket is running.

Hi guys,

As far as I know there is some limit of message-per-second on the Kraken side.
E.g. each call of the getOrderbook() method hits this Kraken limiter beacuse it sends subscription message to Kraken. Dispose method sends unsubscription message and hit limit as well.
When connection is broken NettyStreamingService automatically sends batch of subscription messages for each channel:

   public void resubscribeChannels() {
        for (Entry<String, Subscription> entry : channels.entrySet()) {
            try {
                Subscription subscription = entry.getValue();
                sendMessage(getSubscribeMessage(subscription.channelName, subscription.args));
            } catch (IOException e) {
                LOG.error("Failed to reconnect channel: {}", entry.getKey());
            }
        }
    }

If you have the N currency pairs subscriptions then it cause N hits of the Kraken limiter at the same time!
I belive this is the reason of {"event":"error","errorMessage":"Exceeded msg rate"}
How to fix:
We should have posibility to inject any throrottle controller before sending messages to websocket:
e.g. in the method NettyStreamingService.sendMessage(String message)

As example we can implement this like beforeConnectionHandler in ConnectableService:

    public static final String BEFORE_CONNECTION_HANDLER = "Before_Connection_Event_Handler";

    /**
     * {@link Runnable} handler is called before opening new socket connection.
     */
    private Runnable beforeConnectionHandler = () -> {
    };

    public void setBeforeConnectionHandler(Runnable beforeConnectionHandler) {
        if (beforeConnectionHandler != null) {
            this.beforeConnectionHandler = beforeConnectionHandler;
        }
    }

    protected abstract Completable openConnection();

    public Completable connect() {
        beforeConnectionHandler.run();
        return openConnection();
    }

We can allow to specify beforeSendingHandler and call one before sending.

public void sendMessage(String message) {
    LOG.debug("Sending message: {}", message);

    **beforeSendHandler.run();**        
   
    if (webSocketChannel == null || !webSocketChannel.isOpen()) {
        LOG.warn("WebSocket is not open! Call connect first.");
        return;
    }

    if (!webSocketChannel.isWritable()) {
        LOG.warn("Cannot send data to WebSocket as it is not writable.");
        return;
    }

    if (message != null) {
        WebSocketFrame frame = new TextWebSocketFrame(message);
        webSocketChannel.writeAndFlush(frame);
    }
}

These are my thoughts regarding this issue.

P.S.
In my application I use own throrottle controll wrapper around call of getOrderbook() method. But I can't control automatic resubscribing inside bitrich - this is the problem for me why sometime I see "Exceeded msg rate" in my log as you guys.

@pchertalev Thank you so much for your input and the information that you have shared!
@badgerwithagun Can we implement a solution that @pchertalev suggests?

I'm too busy at the moment, but I can do this some time later If nobody implement it at the time :)

mdvx commented

That ties up with my experiences too

Let me describe why this issue is very serios.
Lets imagine scenario:

  1. We have subscribed 20 orderbook channels
  2. Connection has broken because some network issue
  3. Bitrich tryes to restore connection:
    3.1 Bitrich successfully sent 20 subscription requests
    3.2 Bitrich expects that all 20 channels are subscribed
    3.3. Bitrich received "Subscription success" messages for the first 15 messages
    3.3. Bitrich received {"Error":"Exceeded msg rate"} for message all next request messages (16, 17 ... 20) because in order 16th subscription request has exceeded rate limit and all next messages are rejected as well because Kraken block is active.
  4. So as totally:
    We think that we are subscribed to all 20 channes but we are subscribed only on first 15 channels, because last 5 subscription requests were rejected!!! And kraken will never push messages for these blocked channels.

@pchertalev Thank you for your input! So even if we have subscribed to the first 15, kraken will not send messages to none of them because they have ban our API key? Is this correct?
This applies to every channel connection and not just orderbooks.

Also, Kraken support told me that in times of very high amount of requests, they limit the 15 request threshold even more. So it is possible that we will get ban for less than 15 requests

In case rejecting subscription request by Kraken rate limiter we will never get any messages regarding this channel.
Error limit error does not have any information about what message was rejected and it is big problem because we don't know what channels are really subscribed.
I see only one method to detect not worked channels: pereodical checking that all request has been confirmed.
E.g. we can check that map is empty:
info/bitrich/xchangestream/kraken/KrakenStreamingService.java:36
private final Map<Integer, String> subscriptionRequestMap = new ConcurrentHashMap<>();
If not we can send request message again.

I asked Kraken support many many times the follwoing questions:

  • is how many subscription requests Kraken allows in one sec/minute/hour?
  • what is block period?
  • what is block entity: IP address, Socket, Auth Token?

But I did any get any feedback. Looks like they don't know answers :)

LOL) security reason) All exchanges provide limits info: bitfinex, bitstamp, binnance... etc
You should be oraculum if would like to use Kraken API.
They should hide all methods signature for total security )
YOU SHELL NO PASS!!! (C) Gandalf
I would like to help Kraken servers by limiting my client requests but they don't want to say expected limits :) LOL

mdvx commented

What are the WebSocket API rate limits?

The trading rate limits across all order entry mediums (web site, Kraken Pro, REST and WebSocket APIs) remain the same, and are explained in detail on our rate limits support page.

The rate limit of a single WebSocket API connection will vary depending upon the load on the system. WebSocket API clients will receive the error message {"Error": "Exceeded msg rate"} when the rate limit is exceeded.

link to limits https://support.kraken.com/hc/en-us/articles/206548367

mdvx commented

https://support.kraken.com/hc/en-us/articles/360022326871-Kraken-WebSocket-API-Frequently-Asked-Questions#12

The WebSocket API allows multiple feed subscriptions via a single WebSocket connection (such as subscribing to all available market data for the XBT/USD currency pair via a single connection), so it is possible to stream all available market data for all currency pairs without reaching the WebSocket connection limits.

https://support.kraken.com/hc/en-us/articles/360022326871-Kraken-WebSocket-API-Frequently-Asked-Questions#12

The WebSocket API allows multiple feed subscriptions via a single WebSocket connection (such as subscribing to all available market data for the XBT/USD currency pair via a single connection), so it is possible to stream all available market data for all currency pairs without reaching the WebSocket connection limits.

Yes,this is doable.But does this creates any "burden" to the connection because we are talking about the updates of at least 50 orderbooks or more? We will of course filter the output to only the orderbook that we want via currencypair filtering. Is it OK to do it based on the performance? Will we have any performance reduction?

mdvx commented

It really depends on the requirements of your app. It is probably not a great thing to do for the library, but you free to fork the code, and tailor it to your needs. If it can be done in a generic way, for instance by using a ExchangeSpecification parameter, that might something worth checking back in.
NB: Coinbase pro has a similar implementatiom, it requires all the products to be supplied in ProductSubscription before opening the socket.

Ok,
I checked Kraken docs, we can use one message for subscribing all required instruments

{
  "event": "subscribe",
  "pair": [
    "XBT/USD",
    "XBT/EUR"
    ....
  ],
  "subscription": {
    "name": "ticker"
  }
}

But it is not supported by core bitrich and need to be implemented.
Resubscribe method calls getSubscribeMessage(subscription.channelName, subscription.args) for each channel:

    public void resubscribeChannels() {
        for (Entry<String, Subscription> entry : channels.entrySet()) {
            try {
                Subscription subscription = entry.getValue();
                sendMessage(getSubscribeMessage(subscription.channelName, subscription.args));
            } catch (IOException e) {
                LOG.error("Failed to reconnect channel: {}", entry.getKey());
            }
        }
    }

So we need to have specific resubscribeChannels() method which will generate only 3 subscription messages (ticker, book and trades) with all required currency pairs inside.

Guys, do you have any progress on this?

E.g. we can check that map is empty:
info/bitrich/xchangestream/kraken/KrakenStreamingService.java:36
private final Map<Integer, String> subscriptionRequestMap = new ConcurrentHashMap<>();
If not we can send request message again.

I'm using a similar approach, using a timeout after a subscription message has been created. After 2 seconds (perhaps could be adjusted/adjustable) check the subscriptionRequestMap and fail if the request is still there, i.e. not subscribed yet. I chose to fail rather than resend the message again, as this way the subscriber is notified via onError() and can choose to retry using various strategies, including the very convenient retryWhen().

See #550 for the changes and a test that shows how it works. It depends on #549, because otherwise the new subscriber does not receive any events.

guys I have implemented batch resubscribing for kraken implementation