Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STREAM-KRAKEN] Exception java.util.ConcurrentModificationException: null is thrown #4943

Open
xgabri08 opened this issue Sep 1, 2024 · 5 comments

Comments

@xgabri08
Copy link

xgabri08 commented Sep 1, 2024

Hello,

I have a problem with the fact that when reading data from the Order book from Kraken, sometimes exceptions of the type java.util.ConcurrentModificationException: null appear.

Loading from the Order book runs 99% of the time and seems ok.
But sometimes the exceptions mentioned above start to appear.
Usually a large number of these exceptions come in a row within a short while. Sometimes it's just 3 exceptions, sometimes it's 400 in a row.

Sometimes after exceptions a large number of similar messages start to appear in the log:
Unknown message: [119996416,{"a":[["52686.80000","0.00000000","1725033319.029864"],["52700.90000","0.25000000","1725033315.535775","r"]],"c":"442562974"},"book-10","XBT/EUR"]

I'm not sure about this and just assume that the problem is in the CRC calculation for the OrderBook check. TreeSet is used to store bids and asks, which is not thread safe according to my information. So if there are changes to the order book and my server can't process them fast enough, the above mentioned errors may occur because probably several threads are trying to update the order book that is stored in TreeSet at the same time.

Have you encountered the same problem?
Does anyone have any advice on how to solve this problem?

If necessary, I can provide additional information.
Thank you for your answers

Exceptions occur in different places and that's why I'm posting a few stack traces, but it's probably still not all places

java.util.ConcurrentModificationException: null
at java.base/java.util.TreeMap$KeySpliterator.tryAdvance(TreeMap.java:3125) ~[na:na]
at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[na:na]
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[na:na]
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:na]
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) ~[na:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingChecksum.createCrcString(KrakenStreamingChecksum.java:49) ~[xchange-stream-kraken-5.1.1.jar:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingChecksum.createCrcChecksum(KrakenStreamingChecksum.java:60) ~[xchange-stream-kraken-5.1.1.jar:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingAdapters.adaptOrderbookMessage(KrakenStreamingAdapters.java:93) ~[xchange-stream-kraken-5.1.1.jar:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingMarketDataService.lambda$getOrderBook$0(KrakenStreamingMarketDataService.java:49) ~[xchange-stream-kraken-5.1.1.jar:na]
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:57) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:62) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onNext(ObservableRefCount.java:196) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservablePublish$PublishObserver.onNext(ObservablePublish.java:172) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.observers.DisposableLambdaObserver.onNext(DisposableLambdaObserver.java:58) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onNext(ObservableCreate.java:66) ~[rxjava-2.2.2.jar:na]
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleChannelMessage(NettyStreamingService.java:498) ~[xchange-stream-service-netty-5.1.1.jar:na]
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleMessage(NettyStreamingService.java:457) ~[xchange-stream-service-netty-5.1.1.jar:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingService.handleMessage(KrakenStreamingService.java:194) ~[xchange-stream-kraken-5.1.1.jar:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingService.handleMessage(KrakenStreamingService.java:39) ~[xchange-stream-kraken-5.1.1.jar:na]
at info.bitrich.xchangestream.service.netty.JsonNettyStreamingService.messageHandler(JsonNettyStreamingService.java:55) ~[xchange-stream-service-netty-5.1.1.jar:na]
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.dealWithTextFrame(WebSocketClientHandler.java:101) ~[xchange-stream-service-netty-5.1.1.jar:na]
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:85) ~[xchange-stream-service-netty-5.1.1.jar:na]
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:454) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1338) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1387) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

java.util.ConcurrentModificationException: null
at java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1521) ~[na:na]
at java.base/java.util.TreeMap$KeyIterator.next(TreeMap.java:1575) ~[na:na]
at java.base/java.util.Collection.removeIf(Collection.java:583) ~[na:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingAdapters.lambda$updateInBook$1(KrakenStreamingAdapters.java:45) ~[xchange-stream-kraken-5.1.1.jar:na]
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) ~[na:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingAdapters.updateInBook(KrakenStreamingAdapters.java:43) ~[xchange-stream-kraken-5.1.1.jar:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingAdapters.lambda$adaptOrderbookMessage$2(KrakenStreamingAdapters.java:82) ~[xchange-stream-kraken-5.1.1.jar:na]
at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1085) ~[na:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingAdapters.adaptOrderbookMessage(KrakenStreamingAdapters.java:66) ~[xchange-stream-kraken-5.1.1.jar:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingMarketDataService.lambda$getOrderBook$0(KrakenStreamingMarketDataService.java:49) ~[xchange-stream-kraken-5.1.1.jar:na]
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:57) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:62) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onNext(ObservableRefCount.java:196) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservablePublish$PublishObserver.onNext(ObservablePublish.java:172) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.observers.DisposableLambdaObserver.onNext(DisposableLambdaObserver.java:58) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onNext(ObservableCreate.java:66) ~[rxjava-2.2.2.jar:na]
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleChannelMessage(NettyStreamingService.java:498) ~[xchange-stream-service-netty-5.1.1.jar:na]
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleMessage(NettyStreamingService.java:457) ~[xchange-stream-service-netty-5.1.1.jar:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingService.handleMessage(KrakenStreamingService.java:194) ~[xchange-stream-kraken-5.1.1.jar:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingService.handleMessage(KrakenStreamingService.java:39) ~[xchange-stream-kraken-5.1.1.jar:na]
at info.bitrich.xchangestream.service.netty.JsonNettyStreamingService.messageHandler(JsonNettyStreamingService.java:55) ~[xchange-stream-service-netty-5.1.1.jar:na]
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.dealWithTextFrame(WebSocketClientHandler.java:101) ~[xchange-stream-service-netty-5.1.1.jar:na]
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:85) ~[xchange-stream-service-netty-5.1.1.jar:na]
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1338) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1387) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

java.util.ConcurrentModificationException: null
at java.base/java.util.TreeMap$PrivateEntryIterator.remove(TreeMap.java:1542) ~[na:na]
at java.base/java.util.Collection.removeIf(Collection.java:584) ~[na:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingAdapters.lambda$updateInBook$1(KrakenStreamingAdapters.java:45) ~[xchange-stream-kraken-5.1.1.jar:na]
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) ~[na:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingAdapters.updateInBook(KrakenStreamingAdapters.java:43) ~[xchange-stream-kraken-5.1.1.jar:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingAdapters.lambda$adaptOrderbookMessage$2(KrakenStreamingAdapters.java:82) ~[xchange-stream-kraken-5.1.1.jar:na]
at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1085) ~[na:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingAdapters.adaptOrderbookMessage(KrakenStreamingAdapters.java:66) ~[xchange-stream-kraken-5.1.1.jar:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingMarketDataService.lambda$getOrderBook$0(KrakenStreamingMarketDataService.java:49) ~[xchange-stream-kraken-5.1.1.jar:na]
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:57) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:62) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onNext(ObservableRefCount.java:196) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservablePublish$PublishObserver.onNext(ObservablePublish.java:172) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.observers.DisposableLambdaObserver.onNext(DisposableLambdaObserver.java:58) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onNext(ObservableCreate.java:66) ~[rxjava-2.2.2.jar:na]
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleChannelMessage(NettyStreamingService.java:498) ~[xchange-stream-service-netty-5.1.1.jar:na]
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleMessage(NettyStreamingService.java:457) ~[xchange-stream-service-netty-5.1.1.jar:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingService.handleMessage(KrakenStreamingService.java:194) ~[xchange-stream-kraken-5.1.1.jar:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingService.handleMessage(KrakenStreamingService.java:39) ~[xchange-stream-kraken-5.1.1.jar:na]
at info.bitrich.xchangestream.service.netty.JsonNettyStreamingService.messageHandler(JsonNettyStreamingService.java:55) ~[xchange-stream-service-netty-5.1.1.jar:na]
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.dealWithTextFrame(WebSocketClientHandler.java:101) ~[xchange-stream-service-netty-5.1.1.jar:na]
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:85) ~[xchange-stream-service-netty-5.1.1.jar:na]
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1338) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1387) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

java.util.ConcurrentModificationException: null
at java.base/java.util.TreeMap$KeySpliterator.forEachRemaining(TreeMap.java:3110) ~[na:na]
at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:735) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[na:na]
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:na]
at java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:662) ~[na:na]
at java.base/java.util.stream.ReferencePipeline.max(ReferencePipeline.java:698) ~[na:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingAdapters.adaptOrderbookMessage(KrakenStreamingAdapters.java:117) ~[xchange-stream-kraken-5.1.1.jar:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingMarketDataService.lambda$getOrderBook$0(KrakenStreamingMarketDataService.java:49) ~[xchange-stream-kraken-5.1.1.jar:na]
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:57) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:62) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableRefCount$RefCountObserver.onNext(ObservableRefCount.java:196) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservablePublish$PublishObserver.onNext(ObservablePublish.java:172) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.observers.DisposableLambdaObserver.onNext(DisposableLambdaObserver.java:58) ~[rxjava-2.2.2.jar:na]
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onNext(ObservableCreate.java:66) ~[rxjava-2.2.2.jar:na]
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleChannelMessage(NettyStreamingService.java:498) ~[xchange-stream-service-netty-5.1.1.jar:na]
at info.bitrich.xchangestream.service.netty.NettyStreamingService.handleMessage(NettyStreamingService.java:457) ~[xchange-stream-service-netty-5.1.1.jar:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingService.handleMessage(KrakenStreamingService.java:194) ~[xchange-stream-kraken-5.1.1.jar:na]
at info.bitrich.xchangestream.kraken.KrakenStreamingService.handleMessage(KrakenStreamingService.java:39) ~[xchange-stream-kraken-5.1.1.jar:na]
at info.bitrich.xchangestream.service.netty.JsonNettyStreamingService.messageHandler(JsonNettyStreamingService.java:55) ~[xchange-stream-service-netty-5.1.1.jar:na]
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.dealWithTextFrame(WebSocketClientHandler.java:101) ~[xchange-stream-service-netty-5.1.1.jar:na]
at info.bitrich.xchangestream.service.netty.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:85) ~[xchange-stream-service-netty-5.1.1.jar:na]
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1338) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1387) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

@rizer1980
Copy link
Contributor

Hello, @xgabri08
I no familiar with code of the Kraken exchange specifically.

  1. There was a patch of recently with the transition of the org.knowm.xchange.dto.marketdata.OrderBook.update method from Synchronized to StampedLock . What version do you have?
  2. Do you read the data from the Orderbook somewhere outside the KrakenStreamingMarketDataService.getOrderBook method?
  3. It seems that Kraken uses its own realization methods for orderbook update, which do not look thread-safe at all.
    At the same time, Kraken Futures already uses the org.knowm.xchange.dto.marketdata.OrderBook update method, which is safe.

@xgabri08
Copy link
Author

Hello @rizer1980,
thank you for your very helpful message.

I'm sorry, but I haven't had much time lately and didn't get around to it sooner.

I'll have to check all the points, but that will take some time.

Today I started with point 2, which I think seems to me to be the most problematic.

I made a mistake in my code and stored an orderBook (TreeSet) in the KrakenStreamingMarketDataService.getOrderBook method that came to me in the input. I then accessed it when I needed information from the orderBook. This could definitely cause problems, due to concurrent access to the object.

I modified my code to make a copy of the orderBook in the KrakenStreamingMarketDataService.getOrderBook method and save that for later use.

I'll try it and let you know if it helped.

@douggie
Copy link
Collaborator

douggie commented Sep 15, 2024 via email

@rizer1980
Copy link
Contributor

In general, it is incorrect.
But, for example, we need to save states of OrderBook at a certain point in time.
Then, only a copy.

@douggie
Copy link
Collaborator

douggie commented Sep 15, 2024 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants
@douggie @xgabri08 @rizer1980 and others