bitrich-info/xchange-stream

Production Bitfinex has rised ConcurrentModificationException and Kraken as well.

Closed this issue · 5 comments

Hi guys I have faced with issue #253 and found root cause of this issue.

It is because Netty can push messages to observer from 2 diferrent threads.
So these methods can be used in diferent threads:
info.bitrich.xchangestream.bitfinex.dto.BitfinexOrderbook#toBitfinexDepth
info.bitrich.xchangestream.bitfinex.dto.BitfinexOrderbook#updateLevel

The same situation I got for the Kraken exchange methods:
info.bitrich.xchangestream.kraken.KrakenOrderBookStorage#toKrakenDepth
info.bitrich.xchangestream.bitfinex.dto.BitfinexOrderbook#updateLevel

so I going to submit pool request with fix

From my Bitfixex log:
bitfinex-connector 2019-08-07 01:22:25.103 ERROR [nioEventLoopGroup-1771-1] baseconnector.StreamingInstrumentWorker - Failed to get Order book for symbol BTG/USD because exception: ConcurrentModificationException - null

From my Kraken Log:
2019-08-19 13:39:13.450 ERROR [nioEventLoopGroup-3-2] baseconnector.StreamingInstrumentWorker - Failed to get Order book for symbol BTC/EUR because exception: ConcurrentModificationException - null java.util.ConcurrentModificationException at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211) at java.util.TreeMap$ValueIterator.next(TreeMap.java:1256) at java.util.AbstractCollection.toArray(AbstractCollection.java:141) at java.util.ArrayList.<init>(ArrayList.java:178) at info.bitrich.xchangestream.kraken.KrakenOrderBookStorage.toKrakenDepth(KrakenOrderBookStorage.java:58) at info.bitrich.xchangestream.kraken.KrakenStreamingMarketDataService.lambda$getOrderBook$0(KrakenStreamingMarketDataService.java:60) at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59) at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:64) at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52) at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:64) at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52) at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52) at io.reactivex.internal.operators.observable.ObservableRefCount$ConnectionObserver.onNext(ObservableRefCount.java:139) at io.reactivex.internal.operators.observable.ObservablePublish$PublishObserver.onNext(ObservablePublish.java:174) at io.reactivex.internal.observers.DisposableLambdaObserver.onNext(DisposableLambdaObserver.java:58) at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onNext(ObservableCreate.java:67) at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleChannelMessage(NettyStreamingService.java:396) at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleMessage(NettyStreamingService.java:359) at info.bitrich.xchangestream.kraken.KrakenStreamingService.handleMessage(KrakenStreamingService.java:94) at info.bitrich.xchangestream.kraken.KrakenStreamingService.handleMessage(KrakenStreamingService.java:29) at info.bitrich.xchangestream.service.netty.JsonNettyStreamingService.messageHandler(JsonNettyStreamingService.java:52) at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:80) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475) at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1224) at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1271) at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) 2019-08-19 13:39:13.647 ERROR [nioEventLoopGroup-3-1] kraken.KrakenStreamingService - Channel book-BTC/EUR has been failed: Subscription with depth10 not Found BTC/EUR

I have created PR, please review one.

@badgerwithagun Can I close this issue?

Thanks