dibbhatt/kafka-spark-consumer

AbstractMethodError with Spark 1.6.0 and Kafka 0.10.2

MLNW opened this issue · 9 comments

MLNW commented

I'm trying to use this library with older versions of Spark (1.6.0-cdh5.11.1) and Kafka (0.10.2-kafka-2.2.0), but while trying to persist the offsets after the application logic happened I get the mentioned error.

It seems to me that it is a version miss match between Scala versions. For me its not easy to switch to 2.11 scala so I guess my question would be: Is there a way to make your library work with my versions?

Below is the observed exception and the important bits of my pom file:

java.lang.AbstractMethodError: consumer.kafka.PartitionOffsetPair.call(Ljava/lang/Object;)Ljava/lang/Iterable;
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205)
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/11/17 12:02:52 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.AbstractMethodError: consumer.kafka.PartitionOffsetPair.call(Ljava/lang/Object;)Ljava/lang/Iterable;
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205)
	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$4$1.apply(JavaDStreamLike.scala:205)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
17/11/17 12:02:52 INFO storage.DiskBlockManager: Shutdown hook called
               <dependency>
			<groupId>org.scala-lang</groupId>
			<artifactId>scala-library</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			<exclusions>
				<exclusion>
					<groupId>org.scala-lang</groupId>
					<artifactId>scala-library</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<exclusion>
				<artifactId>org.apache.kafka</artifactId>
				<groupId>kafka_2.10</groupId>
			</exclusion>
			<exclusion>
				<groupId>org.scala-lang</groupId>
				<artifactId>scala-library</artifactId>
			</exclusion>
		</dependency>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-spark</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<exclusions>
				<exclusion>
					<groupId>org.apache.zookeeper</groupId>
					<artifactId>zookeeper</artifactId>
				</exclusion>
				<exclusion>
					<groupId>log4j</groupId>
					<artifactId>log4j</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>dibbhatt</groupId>
			<artifactId>kafka-spark-consumer</artifactId>
			<version>1.0.12</version>
		</dependency>

Which version of the consumer you are running ? It is due to version of your Spark (1.6.0) and version in pom doesn't match. You can git clone the code and update the consumer pom to match your version and try. But using spark 1.6 you may see couple of compilation issue which are easy to solve.

Here are the steps you can try.

  1. git clone the latest code.

  2. modify pom.xml to match your kafka and spark version ( including scala version)

e.g.

**
<spark.version>1.6.0</spark.version>
<kafka.version>0.10.2.0</kafka.version>

  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.10</artifactId>**

and

  **<groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.10</artifactId>**

and

  **<groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.10</artifactId>**
  1. As Spark 1.6 and 2.0 has some incompatible changes, you need to remove one Listener call back from
    consumer.kafka.ReceiverStreamListener.java

remove this import

import org.apache.spark.streaming.scheduler.StreamingListenerStreamingStarted;

and remove this call back

**@Override
public void onStreamingStarted(StreamingListenerStreamingStarted arg0) {
}**
  1. Spark 1.6 and 2.0 has another incompatibility for return type of PairFlatMapFunction. So you need to modify this file consumer.kafka.PartitionOffsetPair.java

change return type of call method

public Iterator<Tuple2<Integer, Long>> call(Iterator<MessageAndMetadata> it)

to

public Iterable<Tuple2<Integer, Long>> call(Iterator<MessageAndMetadata> it)

And change the return type from

    return kafkaPartitionToOffsetList.iterator();

to

    **return kafkaPartitionToOffsetList;**

That's it. Build the consumer and you should be all set to use it for Spark 1.6 and Kafka 0.10.2

Let me know if you face any issues.

Dibyendu

Or another option is use consumer version 1.0.9. That will work with Spark 1.6

	<dependency>
		<groupId>dibbhatt</groupId>
		<artifactId>kafka-spark-consumer</artifactId>
		**<version>1.0.9</version>**
	</dependency>
MLNW commented

Thank you for your quick response!

I used your first approach and modified the latest code to use my versions of Kafka, Spark and Scala. Seems to work.

I will do some more extensive testing during this week. If I find anything else I'll let you know.

Cheers!

Perfect. Do let me know if you see any issues or need any help on tuning various knobs .

When spark job was submitted The system loaded the default jar of CDH(spark-assembly-1.6.0-cdh5.14.4-hadoop2.6.0-cdh5.14.4.jar)。The Kafka version is not 010。(0.9.0)

Hi @LinMingQiang , in your Application pom, what version of jars you have specified ?

spark 1.6.0 kafka 0.10.0

Whats the issue you see ? Is the streaming job not running ?