bitrich-info/xchange-stream

[Bitmex] There is a bug on orderChanges endpoint

Opened this issue · 11 comments

Hello, the implementation of orderChanges in BitmexStreamingTradeService has a bug when an order has been filled. When Bitmex sends a FILLED order message, it doesn't include many required fields in order to create a new LimitOrder and send it to the stream. These data are null from the response:
side,price,ordType and orderQty have null values and because of that a NullPointException occures.
Here is the output which i have encounter:

2020-01-09 16:54:37 INFO BitmexStreamingMarketDataService:37 - Bitmex connection succeeded. Clearing orderbooks.
2020-01-09 16:54:37 INFO BitmexStreamingService:312 - Subscribing to channel order
BitmexOrder{orderID='9f2f8774-0573-9d3c-a45c-216ed7eabadd', account=4644, side='Buy', price=7907.5, avgPx=null, ordType='Limit', ordStatus=NEW, clOrdID='', orderQty=1, cumQty=0, workingIndicator=false, timestamp='2020-01-09T14:55:10.125Z', symbol='XBTUSD'}
LimitOrder [limitPrice=null, Order [type=BID, originalAmount=1, cumulativeAmount=0, averagePrice=null, fee=null, currencyPair=XBT/USD, id=9f2f8774-0573-9d3c-a45c-216ed7eabadd, timestamp=null, status=NEW, flags=[], userReference=null]]
BitmexOrder{orderID='9f2f8774-0573-9d3c-a45c-216ed7eabadd', account=4644, side='null', price=null, avgPx=7905, ordType='null', ordStatus=FILLED, clOrdID='', orderQty=null, cumQty=1, workingIndicator=false, timestamp='2020-01-09T14:55:10.125Z', symbol='XBTUSD'}
null

io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.NullPointerException
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117)
at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onError(ObservableFlattenIterable.java:125)
at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onNext(ObservableFlattenIterable.java:81)
at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onNext(ObservableRefCount.java:207)
at io.reactivex.internal.operators.observable.ObservablePublishAlt$PublishConnection.onNext(ObservablePublishAlt.java:177)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:62)
at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onNext(ObservableRefCount.java:207)
at io.reactivex.internal.operators.observable.ObservablePublishAlt$PublishConnection.onNext(ObservablePublishAlt.java:177)
at io.reactivex.internal.observers.DisposableLambdaObserver.onNext(DisposableLambdaObserver.java:58)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onNext(ObservableCreate.java:66)
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleChannelMessage(NettyStreamingService.java:395)
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleMessage(NettyStreamingService.java:358)
at info.bitrich.xchangestream.bitmex.BitmexStreamingService.handleMessage(BitmexStreamingService.java:129)
at info.bitrich.xchangestream.bitmex.BitmexStreamingService.handleMessage(BitmexStreamingService.java:33)
at info.bitrich.xchangestream.service.netty.JsonNettyStreamingService.messageHandler(JsonNettyStreamingService.java:52)
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.dealWithTextFrame(WebSocketClientHandler.java:96)
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:80)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1224)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1271)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
at info.bitrich.xchangestream.bitmex.dto.BitmexOrder.toOrder(BitmexOrder.java:85)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.stream.ReferencePipeline$11$1.accept(ReferencePipeline.java:442)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at info.bitrich.xchangestream.bitmex.BitmexStreamingTradeService.lambda$getOrderChanges$2(BitmexStreamingTradeService.java:39)
at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onNext(ObservableFlattenIterable.java:77)
... 45 more
Exception in thread "nioEventLoopGroup-2-1" io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.NullPointerException
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:117)
at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onError(ObservableFlattenIterable.java:125)
at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onNext(ObservableFlattenIterable.java:81)
at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onNext(ObservableRefCount.java:207)
at io.reactivex.internal.operators.observable.ObservablePublishAlt$PublishConnection.onNext(ObservablePublishAlt.java:177)
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:62)
at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onNext(ObservableRefCount.java:207)
at io.reactivex.internal.operators.observable.ObservablePublishAlt$PublishConnection.onNext(ObservablePublishAlt.java:177)
at io.reactivex.internal.observers.DisposableLambdaObserver.onNext(DisposableLambdaObserver.java:58)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onNext(ObservableCreate.java:66)
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleChannelMessage(NettyStreamingService.java:395)
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleMessage(NettyStreamingService.java:358)
at info.bitrich.xchangestream.bitmex.BitmexStreamingService.handleMessage(BitmexStreamingService.java:129)
at info.bitrich.xchangestream.bitmex.BitmexStreamingService.handleMessage(BitmexStreamingService.java:33)
at info.bitrich.xchangestream.service.netty.JsonNettyStreamingService.messageHandler(JsonNettyStreamingService.java:52)
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.dealWithTextFrame(WebSocketClientHandler.java:96)
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:80)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1224)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1271)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
at info.bitrich.xchangestream.bitmex.dto.BitmexOrder.toOrder(BitmexOrder.java:85)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.stream.ReferencePipeline$11$1.accept(ReferencePipeline.java:442)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at info.bitrich.xchangestream.bitmex.BitmexStreamingTradeService.lambda$getOrderChanges$2(BitmexStreamingTradeService.java:39)
at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onNext(ObservableFlattenIterable.java:77)
... 45 more

How can we send a limitOrder/marketOrder to the stream in which we don't have the side field?

mdvx commented

You may notice that Kraken too will send orderChanges with out a side or currencyPair, this is because the updates are deltas, and the fields did not change. Maybe this is a larger discussing, because 'fixing' this requires an order cache. @badgerwithagun

Indeed, we need to discuss about it. I am waiting for @badgerwithagun input.

I suggest to make an openOrderList cache and everytime an update occurs to send the updated list of openOrders. So instead of having Observable.Order we should have Observable.OpenOrders. What do you thing? Because openOrders updates depend to one another and they are not like userTrades which they are one time only events.

mdvx commented

How do we handle delta orderbook updates? should we try and use the same pattern. I am not sold on the idea of caching order at the exchange level, because it requires synchronisation (however if we could do something where the Exchange holds a cache instance, then different exchanges could implement different caching strategues.

synchronisation: When we have updates with 2+ fields (without sync, the order state is temporarily invalid)

What do you mean temporarily invalid? If we save a local copy of a List of openOrders, every update that will come from the stream will update this list. So even if there are 2 updates for the same order, they will be 2 separate messages and we will handle with FIFO. Can you please give an example? Thanks

mdvx commented

lets say you have an order, that get an update to executedQuantity, averagePrice and state: you can:
A) copy the existing order in the cache, update the copy, and replace the orignal order with the copy.

  1. This can leave hanging (previous) order objects
    B) sync(lock) the exiting order, upate all the fields, release lock
  2. lots of locking, but probably ok
    C) Other means?
    but basically, my thought is cache should operate in a maner that best matches the exchanges update cycle (Still one cache per exchange, but maybe cache for Kraken is different to BitMex)
mdvx commented

When we broadcast an OpenOrder object, we broadcast a reference to an OpenOrder object.

When another thread gets this broadcast it recieves the reference to the OpenOrder object, which might now have processed next updates, or be in the middle of processing them, or not. The OpenOrder is in an indeterminate state.

mdvx commented

Whereas the are always going to some states where and order is expected to be in situe (PENDING,WORKING) there are other states are considered to final and non-changeable (FILLED, DONE, CANCELED). This allows the algos to make decisions abount sending the next set of orders.