dibbhatt/kafka-spark-consumer

Not working with Spark 2.2.0

Closed this issue · 11 comments

Since Spark has been updated to version 2.2.0, the following exception is thrown:

Exception in thread "SparkListenerBus" java.lang.AbstractMethodError
ERROR: org.apache.spark.util.Utils - uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.AbstractMethodError
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:69)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:69)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.postToAll(StreamingListenerBus.scala:29)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
at org.apache.spark.streaming.scheduler.StreamingListenerBus.onOtherEvent(StreamingListenerBus.scala:43)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:75)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:75)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63)
at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1279)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1279)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)

Ok. I will take a look at it @swrholdj

hi @swrholdj just now I tested with Spark 2.2.0 and my test applications runs fine. It is consuming from Kafka and updating Zookeeper offsets. There must be some issue with your pom.xml. Please check. You must have referring to older Spark version. By the way, I was trying the consumer latest release 1.0.11 which is most updated one. Which version of consumer you are running ?

In your application pom you need to include spark dependency like this.

Below example to include dependency for Spark 2.2.0 and Kafka 0.11.0

<properties>
  <spark.version>2.2.0</spark.version>
  <kafka.version>0.11.0.0</kafka.version>
</properties>

<dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>${kafka.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
</dependencies>

Hi @dibbhatt

I am using the latest consumer release 1.0.11. I'm using SBT for my build and my SBT file looks like this:

name := "StreamingTest"

version := "1.0"

scalaVersion := "2.11.11"

resolvers ++= Seq(
"Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven")

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0"
libraryDependencies += "dibbhatt" % "kafka-spark-consumer" % "1.0.11"

If I change the Spark version from 2.2.0 to 2.1.1, it all works fine.

When this exception comes ? is it during the stopping the SparkConext or as soon you start the Streaming Application ? Does this exception mean your streaming job is not launching at all ? is it possible to share the code snippet of your logic so that I can try to reproduce . In my hello world (which is very simple) I am not able to reproduce .

The exception occurs when trying to start the streaming context.

The code that I'm running is basically the example you have provided. This is the code:

import consumer.kafka.{ProcessedOffsetManager, ReceiverLauncher}
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingJob {

def main(args: Array[String]): Unit = {

// Create a spark session and context
val spark = SparkSession
  .builder()
  .appName("Spark Streaming Job")
  .config("spark.cores.max", "10")
  .config("spark.sql.warehouse.dir", "file:///C:/tmp")
  .master("local[*]")
  .getOrCreate()

val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(10))

//Specify number of Receivers you need.
val numberOfReceivers = 1

val kafkaProperties: Map[String, String] =
  Map("zookeeper.hosts" -> "x.x.x.x",
    "zookeeper.port" -> "2181",
    "kafka.topic" -> "topic",
    "zookeeper.consumer.connection" -> "x.x.x.x:2181",
    "kafka.consumer.id" -> "kafka-consumer"
  )


val props = new java.util.Properties()
kafkaProperties foreach { case (key,value) => props.put(key, value)}

val kafkaInput = ReceiverLauncher.launch(ssc, props, numberOfReceivers,StorageLevel.MEMORY_ONLY)

//Get the Max offset from each RDD Partitions. Each RDD Partition belongs to One Kafka Partition
val partitonOffset_stream = ProcessedOffsetManager.getPartitionOffset(kafkaInput, props)

kafkaInput.foreachRDD(rdd => {

  println("Number of records in this batch : " + rdd.count())

} )

//Persists the Max Offset of given Kafka Partition to ZK
ProcessedOffsetManager.persists(partitonOffset_stream, props)

ssc.start()
ssc.awaitTermination()

}
}

@swrholdj you are right. There was a new abstract method added in Spark 2.2.0 in StreamingListener which I missed to implement in ReceiverStreamListener . Hence this error. I made the fix and did a new release 1.0.12. Please update your consumer dependency to 1.0.12 and now it should work with Spark 2.2.0 . Thanks for pointing this issue.

Just to note, these changes are not backward compatible, so 1.0.12 wont work with Spark 2.2.1. I will update the ReadMe with this details soon.

Do let me know if this works, then I will close the issue.

I can confirm that version 1.0.12 of the consumer is now working with Spark 2.2.0. Thanks @dibbhatt.

Nice. Thanks @swrholdj .

hi @swrholdj , if you do not mind, can you tell me name of your organization also if this consumer is running in production in your organization. I need it just for my reference.