leonoel/missionary

nullpointerexception with subscribe

Closed this issue · 6 comments

I am trying to use missionary with reactive streams:

(def x (.executeReactive cql-session "select * from chat"))
(def flow (y/subscribe x))
(y/? (y/reduce conj flow))

where executeReactive returns a https://docs.datastax.com/en/latest-java-driver-api/com/datastax/dse/driver/api/core/cql/reactive/ReactiveResultSet.html

This results in an exception, apparently the error being passed to sneakyThrow here is null
https://github.com/leonoel/missionary/blob/07930608ff6dbf91dae1f51ee44df0d9612f1a61/java/missionary/impl/Sub.java#LL72C25-L72C25

don't know if I'm doing something wrong, but I think the error should be clearer and not NPE.

full trace:

1. Unhandled java.lang.NullPointerException
   (No message)

                 Util.java:  237  clojure.lang.Util/sneakyThrow
                  Sub.java:   72  missionary.impl.Sub/deref
               Reduce.java:   33  missionary.impl.Reduce/transfer
               Reduce.java:   55  missionary.impl.Reduce/ready
               Reduce.java:   69  missionary.impl.Reduce$1/invoke
                  Sub.java:  141  missionary.impl.Sub/onComplete
ReactiveResultSetSubscription.java:  377  com.datastax.dse.driver.internal.core.cql.reactive.ReactiveResultSetSubscription/doOnComplete
ReactiveResultSetSubscription.java:  240  com.datastax.dse.driver.internal.core.cql.reactive.ReactiveResultSetSubscription/drain
ReactiveResultSetSubscription.java:  358  com.datastax.dse.driver.internal.core.cql.reactive.ReactiveResultSetSubscription/lambda$fetchNextPageAndEnqueue$2
    CompletableFuture.java:  718  java.util.concurrent.CompletableFuture$UniAccept/tryFire
    CompletableFuture.java:  510  java.util.concurrent.CompletableFuture/postComplete
    CompletableFuture.java: 2179  java.util.concurrent.CompletableFuture/complete
    CqlRequestHandler.java:  349  com.datastax.oss.driver.internal.core.cql.CqlRequestHandler/setFinalResult
    CqlRequestHandler.java:  104  com.datastax.oss.driver.internal.core.cql.CqlRequestHandler/access$1500
    CqlRequestHandler.java:  680  com.datastax.oss.driver.internal.core.cql.CqlRequestHandler$NodeResponseCallback/onResponse
      InFlightHandler.java:  257  com.datastax.oss.driver.internal.core.channel.InFlightHandler/channelRead
AbstractChannelHandlerContext.java:  379  com.datastax.oss.driver.shaded.netty.channel.AbstractChannelHandlerContext/invokeChannelRead
AbstractChannelHandlerContext.java:  365  com.datastax.oss.driver.shaded.netty.channel.AbstractChannelHandlerContext/invokeChannelRead
AbstractChannelHandlerContext.java:  357  com.datastax.oss.driver.shaded.netty.channel.AbstractChannelHandlerContext/fireChannelRead
     IdleStateHandler.java:  286  com.datastax.oss.driver.shaded.netty.handler.timeout.IdleStateHandler/channelRead
AbstractChannelHandlerContext.java:  379  com.datastax.oss.driver.shaded.netty.channel.AbstractChannelHandlerContext/invokeChannelRead
AbstractChannelHandlerContext.java:  365  com.datastax.oss.driver.shaded.netty.channel.AbstractChannelHandlerContext/invokeChannelRead
AbstractChannelHandlerContext.java:  357  com.datastax.oss.driver.shaded.netty.channel.AbstractChannelHandlerContext/fireChannelRead
 ByteToMessageDecoder.java:  324  com.datastax.oss.driver.shaded.netty.handler.codec.ByteToMessageDecoder/fireChannelRead
 ByteToMessageDecoder.java:  296  com.datastax.oss.driver.shaded.netty.handler.codec.ByteToMessageDecoder/channelRead
AbstractChannelHandlerContext.java:  379  com.datastax.oss.driver.shaded.netty.channel.AbstractChannelHandlerContext/invokeChannelRead
AbstractChannelHandlerContext.java:  365  com.datastax.oss.driver.shaded.netty.channel.AbstractChannelHandlerContext/invokeChannelRead
AbstractChannelHandlerContext.java:  357  com.datastax.oss.driver.shaded.netty.channel.AbstractChannelHandlerContext/fireChannelRead
ChannelInboundHandlerAdapter.java:   93  com.datastax.oss.driver.shaded.netty.channel.ChannelInboundHandlerAdapter/channelRead
  InboundTrafficMeter.java:   44  com.datastax.oss.driver.internal.core.channel.InboundTrafficMeter/channelRead
AbstractChannelHandlerContext.java:  379  com.datastax.oss.driver.shaded.netty.channel.AbstractChannelHandlerContext/invokeChannelRead
AbstractChannelHandlerContext.java:  365  com.datastax.oss.driver.shaded.netty.channel.AbstractChannelHandlerContext/invokeChannelRead
AbstractChannelHandlerContext.java:  357  com.datastax.oss.driver.shaded.netty.channel.AbstractChannelHandlerContext/fireChannelRead
DefaultChannelPipeline.java: 1410  com.datastax.oss.driver.shaded.netty.channel.DefaultChannelPipeline$HeadContext/channelRead
AbstractChannelHandlerContext.java:  379  com.datastax.oss.driver.shaded.netty.channel.AbstractChannelHandlerContext/invokeChannelRead
AbstractChannelHandlerContext.java:  365  com.datastax.oss.driver.shaded.netty.channel.AbstractChannelHandlerContext/invokeChannelRead
DefaultChannelPipeline.java:  919  com.datastax.oss.driver.shaded.netty.channel.DefaultChannelPipeline/fireChannelRead
AbstractNioByteChannel.java:  166  com.datastax.oss.driver.shaded.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe/read
         NioEventLoop.java:  719  com.datastax.oss.driver.shaded.netty.channel.nio.NioEventLoop/processSelectedKey
         NioEventLoop.java:  655  com.datastax.oss.driver.shaded.netty.channel.nio.NioEventLoop/processSelectedKeysOptimized
         NioEventLoop.java:  581  com.datastax.oss.driver.shaded.netty.channel.nio.NioEventLoop/processSelectedKeys
         NioEventLoop.java:  493  com.datastax.oss.driver.shaded.netty.channel.nio.NioEventLoop/run
SingleThreadEventExecutor.java:  989  com.datastax.oss.driver.shaded.netty.util.concurrent.SingleThreadEventExecutor$4/run
    ThreadExecutorMap.java:   74  com.datastax.oss.driver.shaded.netty.util.internal.ThreadExecutorMap$2/run
FastThreadLocalRunnable.java:   30  com.datastax.oss.driver.shaded.netty.util.concurrent.FastThreadLocalRunnable/run
               Thread.java: 1589  java.lang.Thread/run

Looks like ReactiveResultSet needs to be adapted to a missionary flow. I can help with the missionary adapter if you make it easy for us by providing a working usage of this API from ordinary clojure? And links to the appropriate documentation

Oh, I now recognize y/subscribe as a missionary builtin adapter

Please can you provide more instructions as to how I can reproduce this? deps.edn + how to get a live connection

https://github.com/nivekuil/missionary-repro

docker run -p 9042:9042 docker.io/scylladb/scylla first

Thank you for reporting. This is a bug, will be fixed in next release.

fixed in b.29