lightbend/kafka-streams-scala

KGroupedStreamS recursion

nitayk opened this issue · 12 comments

Hello,
When using the "reduce" functions inside KGroupedStreamS I get StackOverflow exception.
Looking at the package itself in my IDE (inteliji) i can see the recursion calls:
screen shot 2018-01-21 at 18 02 18

Can u please post a complete snippet which generates this exception ? And which recursion are you talking about ?

@debasishg IntelliJ will try to highlight what it believes to be recursive calls (the little circular arrow in the left margin of the above image). FWIW, I'm seing the same thing. Also for the def merge(stream: KStreamS[K, V]): KStreamS[K, V] = inner.merge(stream) function in KStreamS.

Could it be that inner.reduce(...) is marked as tail recursive by IntelliJ due to implicit conversions between KStream and KStreamS?

@debasishg Sorry, I was in a hurry. Here is a working snippet (by working I mean I can run it):

import com.lightbend.kafka.scala.streams.StreamsBuilderS

class test extends App {

  def swap(a: String, b: String) = (b, a)

  def concatenate(a: String, b: String) = s"$a$b"

  val builder = new StreamsBuilderS

  builder.stream[String, String]("input-messages")
    .map(swap)
    .groupByKey()
    .reduce(concatenate)
}

Now, the same code, just using one of the other reduce implementations (with the storeName param):

  builder.stream[String, String]("input-messages")
    .map(swap)
    .groupByKey()
    .reduce(concatenate _, "this_is_a_store_name")
}

And when i am trying to run it Iam getting:

An exception or error caused a run to abort. 
java.lang.StackOverflowError

As far as i understand, the reason is the wrapKGroupedStream implicit function which takes inner (KGroupedStream) and wraps it as KGroupedStreamS.
so the call inner.reduce actually translates to KGroupedStreamS.reduce which is recursive.

The inner points to KGroupedStream and there is a reduce function in that class. So, I don't think the implicit conversion will take place there, unless IntelliJ is doing something special. I don't get the StackOverflowError in my environment.

Can you please try to run the code outside IntelliJ ? The best will be to run it as a unit test of the library using the local Kafka Server that other unit tests use. Then please publish the error that you see.

I found some other error with this API that has the storeName - it's not StackOverflowError. It's coming from within Kafka internals - not sure. Confluent has deprecated this API in the latest release, may be not without a reason. Anyway let's see what you get by running it from outside IntelliJ and move forward.

I just pushed a debug branch. Please have a look at https://github.com/lightbend/kafka-streams-scala/blob/debug-reduce/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTest.scala#L102 which does the same thing that u propose. It works as you can see running the unit test.

Are u getting the error in Scala 2.11 ? I just tested on Scala 2.11 and got the StackOverflowError. While it runs clean in Scala 2.12 ..

I think I found the problem. The reduce method that gives the error was not able to convert the passed in reducer function to Reducer instance through SAM conversion in Scala 2.11. Hence it was calling the method on KGroupedStreamS instead of KGroupedStream.

I fixed it in the debug-reduce branch (https://github.com/lightbend/kafka-streams-scala/blob/debug-reduce/src/main/scala/com/lightbend/kafka/scala/streams/KGroupedStreamS.scala#L47). I will push the fix in the develop branch soon.

Thanks for pointing this out.

Yep, 2.11
Maybe tomorrow ill have the time to invest in it.

Thanks

Thanks a lot!
When will this fix be released?

I will try to release this version within the next couple of days .. need to find some time for this.

released 0.1.2 .. available on Maven Central ..