dibbhatt/kafka-spark-consumer

Consumer commit not proceeding the offset anymore

Closed this issue · 16 comments

We are using kafka-spark-consumer verison 1.0.6, kafka 0.8.2 and here are our configurations-

zookeeper.consumer.path=/listingmetrics-impressions-kafka
kafka.consumer.id=listingmetricsimpressionssparkconsumer-prod-2
group.id=ListingMetricsImpressionsSparkConsumer-prod
consumer.forcefromstart=false
consumer.fetchsizebytes=102400
consumer.fillfreqm=250
consumer.receivers=1

The issue we are facing is that

Consumer commit not proceeding the offset anymore

Heres what we found upon debugging the code-
PartitionManager -> next() method and then
-> fill() method
However the KafkaUtils.fetchMessages call is not retuning any validBytes as well as no errors
The following call

ByteBufferMessageSet msgs =
          fetchResponse.messageSet(topic, _partition.partition);

has no messages and hence _emittedToOffset is not getting incremented

When the commit is called next in the flow, we are getting the following message-

Last Enqueued offset "
          + _lastEnquedOffset
            + " not incremented since previous Comitted Offset "
            + _lastComittedOffset
            + " for partition  "
            + _partition
            + " for Consumer "
            + _ConsumerId
            + ". Some issue in Process!!"

Can you help us with the resolution. Let us know if you need any other info.
Thanks.

Hi, How you are using the consumer ? Are you getting form spark-packages (https://spark-packages.org/package/dibbhatt/kafka-spark-consumer) ? Do you specify the correct kafka version in your application pom/sbt ? The consumer 1.0.6 by default has Kafka 0.9 but scope defined as provided. So it will take the version you specify in your pom/sbt. . Kindly check which Kafka version your application is taking.

Also if Consumer not fetching messages ( with forcefromstart=false) mean no new message might not be coming to Kafka. You can also check if your Kafka producer is running properly or not .

Yes using the 1.0.6 from spark packages.

  1. We are however using version 0.8.2.2 of kafka since that is what we have installed in our environments.
  2. Also we have set the forcefromstart=false and new messages are coming into kafka verified via Kafka Tool.

Hi,
I do not find any issue in the consumer logic . Are you using it for sometime and now facing this issue ? or you have the issue when you are using for first time ? Can you share full properties ( including Zookeeper consumer connection details ) you are using to launch the Receiver .

If you use zkCli and go to the path /listingmetrics-impressions-kafka, do you see any offset details there under this path ( you have to do : get )

e.g get /listingmetrics-impressions-kafka/listingmetricsimpressionssparkconsumer-prod-2//Parittion_XX

Hello,

So, here is what's happening for us.

  1. The spark streaming application runs fine for few days and then (guessing) due to some env issue (like kafka broker going down and subsequently performing a rolling restart), the consumer can't proceed beyond the offset.
  2. Also we noticed that if we restart the application using a different consumerId, it starts to work fine.

Here are the full properties we are using-

#Kafka ZK host details
zookeeper.hosts=host1.company.com,host2.company.com,host3.company.com
#Kafka ZK Port
zookeeper.port=2181
# Kafka Broker path in ZK 
zookeeper.broker.path=/brokers
#Kafka Topic to consume 
kafka.topic=impression

# ZK Details of Consumer where pulled offset  will be stored.
#Consumer ZK Path
zookeeper.consumer.connection=host2.company.com:2181
#ZK Path for storing Kafka Blur Consumer offset
zookeeper.consumer.path=/listingmetrics-impressions-kafka
# Blur Kafka Consumer ID. This ID will be ised for storing fetched offset details in $zookeeper.blur.consumer.path
kafka.consumer.id=listingmetricsimpressionssparkconsumer-prod-2
group.id=ListingMetricsImpressionsSparkConsumer-prod
consumer.forcefromstart=false
consumer.fetchsizebytes=102400
consumer.fillfreqm=250
consumer.receivers=1
kafka.message.handler.class=


#Spark details
spark.agent=yarn-cluster
spark.streaming.receiver.writeAheadLog.enable=false
spark.appName=InventoryDemandMetricsImpressions-prod
spark.batch.duration=10000

I will try to get you more details on the zkCli util tomorrow and update this comment.

I see. Probably when you did rolling restart , did you check what is the Kafka's Latest offset and what is the offset there in ZK from previous run ? I suspect, when you did rolling restart, you have reset Kafka's offset to 0 to start with. But Consumer still have different offset (Probably higher than what is there in Kafka's Latest offset ) . That probably caused the consumer to stuck. In this cases, you can remove the ZK path for consumer which won't need to create new Consumer Id. I will try to Fix this issue in upcoming release if Consumer Offset is larger than Kafka's Latest offset (which is very unlikely situation) , it should reset back to Kafka's Latest offset.

Here is what I am seeing when using ZKClient. Would help if you can give us clarity on our findings.

  1. When I am using ZKClient and do a "get"
get /listingmetrics-impressions-kafka/listingmetricsimpressionssparkconsumer-ft-2/impression/partition_8
{"consumer":{"id":"listingmetricsimpressionssparkconsumer-ft-2"},"offset":1052051217,"partition":8,"broker":{"host":"host.company.com","port":9092},"topic":"impression"}
cZxid = 0x58001f636e
ctime = Tue Aug 30 17:22:48 CDT 2016
mZxid = 0x58001f64e9
mtime = Tue Aug 30 17:31:33 CDT 2016
pZxid = 0x58001f636e
cversion = 0
dataVersion = 3
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 172
numChildren = 0

The one thing I am noticing is that when I do some activity that ingests data into kafka, I am not seeing the offset increment when I run this query again. Isn't this offset supposed to increase if data is continously coming into to KAFKA? The current offset for partition 8 is at 1055300919 which is way ahead of the offset from the get command.

We don't seem to be having the issue of offset going to 0 as per your earlier comments. Here are additional findings that might give you more context on our issue.

Last time we had this issue of consumer not able to read any data beyond an offset (stuck at offset, lets say 1234), we noticed that-

  1. When I wrote a test program which calls SimpleConsumer.fetch, I noticed that fetching that offset (1234) is not returning back any data but prior offsets lets say (100) or later offsets (1500) are returning back valid data.

Here is the relevant part of the code-

the SimpleConsumer
SimpleConsumer consumer = new SimpleConsumer("host.company.com", 9092, 10000, 1024 * 1024, "listingmetricsimpressionssparkconsumer-prod-2");

fetch data at offset

FetchRequestBuilder builder = new FetchRequestBuilder();
        FetchRequest fetchRequest =
                builder
                        .addFetch("impression", 0, 2681308903L, 512 * 1024) //102400, 2539904832 , 2452199282L,  2442381106L
                        .clientId("listingmetricsimpressionssparkconsumer-prod-2")
                        .maxWait(1000)
                        .minBytes(1)
                        .build();

        FetchResponse response = consumer.fetch(fetchRequest);

        System.out.println("High WaterMark is : " + response.highWatermark("impression", 0));
        List<MessageAndOffset> moList = JavaConverters.asJavaListConverter(response.messageSet("impression", 0).toList()).asJava();

        for (MessageAndOffset mo : moList) {
            ByteBuffer payload = mo.message().payload();

            byte[] bytes = new byte[payload.limit()];
            payload.get(bytes);

            System.out.println(String.valueOf(mo.offset()) + ": " + new String(bytes, "UTF-8") + ": ");

            System.out.println("******************************" + response.messageSet("impression", 0).isEmpty());
        }

By any chance you published any message to Kafka which is much higher than the consumer.fetchsizebytes you used ( 1 MB) ? That could also cause this issue. Which mean at the offset 1234, the message size is larger than 1 MB and with specified pull size of 1 MB , it is not able to pull that message...

thank you for your response. I will need to look at this the next time it happens. We have data in kafka for the last 7 days and this offset had already expired.
Also any thoughts on the ZKCli comments that I posted?

Which mean you are pulling at much slower rate than your ingestion rate. Or you are using too less Receiver than your Kafka Partitions . Data is coming at Kafka but Receiver not puling at same rate, so receiver is lagging. How many Kafka Partition you have ? If you have say 20 Partitions, you can use say 5 receivers to check if that works better.

You can increase both your number of receivers and consumer.fetchsizebytes ( say set it to 4 MB) and observe .

Sure. will try your suggestions - thanks for the insights on lagging. This will help us troubleshoot problems in future.
Also we are using 1 receiver and 9 partitions and not using backpressure.
Do you foresee any spark memory issues if the data is coming in at rate of 102KB in 250 msec (batch size 10 sec) and we have only one receiver and not using backpressure?

What kind of machines you are using ? For 1 Receiver handling N partitions, it spawns N thread for all partitions and pull form Kafka . In our production we have around 160 partitions and we are using around 20 receiver and each receiver handles 8 partitions. But our pull frequency is much higher (2 sec) and batch time is around 2 min ..so all threads get scheduled for CPU cores. As Receiver is a task in Spark which is allocated 1 Core (or number of core per task) , I believe all threads not getting fair cpu cycle. You can try increasing consumer.fillfreq to say 1/2 sec and set fetchSize to 4 MB and see if that works better. What that mean, you go to Kafka lesser time, but each time you pull larger chunk . Also higher number of Receiver will also work. Say you set 3 Receiver to start with and check if it can manage 9 partitions .

Also you need to make sure if your Kafka publisher is equally distributing to all Partitions. Sometime we see due to wrong Partitioning logic in Publisher some Partitions not getting messages at all.

Closing this

hi @krantig , if you do not mind, can you tell me name of your organization also if this consumer is running in production in your organization. I need it just for my reference.