SOHU-Co/kafka-node

for topic of multiple partitions, sometimes not all partitions are consumed by highlevelconsumer

tomwang1013 opened this issue · 18 comments

it happened quite a few times: i use highlevelconsumer to consume a topic of 4 partitions, but only 2 partitions were consumed, like:
image

Sometime reboot the consumer fixed it, sometimes not. why?

Does this issue happen in version 0.2.30 ?

I also encountered this situation。but in version 0.2.29 no this problem

@tomwang1013 and what tools do you use to see the partitions? thank you

@sensoroDj are you sure that 0.2.29 not has this problem? i will revert to 0.2.29

Sorry . I use the 0.2.29 version! my partitions and consumers are one-for-one ! I will let one consumer consume one partitions eg: 3 patitions to 3 consumers ! but when i use 0.3.x highlevel api will not work i want to be. I recommand you to use low version when you use highleve api. I found it cant work well in 0.3.x

@tomwang1013 can you give the output using kafka tools ?

Something like
/opt/kafka/kafka_2.11-0.8.2.1/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group kafka-node-group --zookeeper [ZKHOST:ZKPORT] --topic [TOPIC_NAME]
example:
/opt/kafka/kafka_2.11-0.8.2.1/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group kafka-node-group --zookeeper local.zookeeper.net:2181 --topic feedUpdates2

The only scenario I have been able to reproduce your issue is when you are only sending messages to a few partitions and you don'f have one consumer per partition, for example

  • Publishing to partitions 0,1,2,3,4
  • Running 3 consumers

You will get something like:

Group           Topic                          Pid Offset          logSize         Lag             Owner
kafka-node-group feedUpdates2                   0   170841          178521          7680            kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2                   1   160526          167043          6517            kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2                   2   171471          177998          6527            kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2                   3   158815          166503          7688            kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2                   4   159599          166138          6539            kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2                   5   7537            7537            0               kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2                   6   7537            7537            0               kafka-node-group_5cf5a5bf-2d45-4a2a-b108-3277f2c3e43a
kafka-node-group feedUpdates2                   7   7537            7537            0               kafka-node-group_5cf5a5bf-2d45-4a2a-b108-3277f2c3e43a
kafka-node-group feedUpdates2                   8   7536            7536            0               kafka-node-group_5cf5a5bf-2d45-4a2a-b108-3277f2c3e43a
kafka-node-group feedUpdates2                   9   7534            7534            0               kafka-node-group_5cf5a5bf-2d45-4a2a-b108-3277f2c3e43a
kafka-node-group feedUpdates2                   10  7534            7534            0               kafka-node-group_5cf5a5bf-2d45-4a2a-b108-3277f2c3e43a
kafka-node-group feedUpdates2                   11  7533            7533            0               kafka-node-group_d694cb35-b334-4d67-a393-4e3c3de0588d
kafka-node-group feedUpdates2                   12  7530            7530            0               kafka-node-group_d694cb35-b334-4d67-a393-4e3c3de0588d
kafka-node-group feedUpdates2                   13  7530            7530            0               kafka-node-group_d694cb35-b334-4d67-a393-4e3c3de0588d
kafka-node-group feedUpdates2                   14  7530            7530            0               kafka-node-group_d694cb35-b334-4d67-a393-4e3c3de0588d
kafka-node-group feedUpdates2                   15  7529            7529            0               kafka-node-group_d694cb35-b334-4d67-a393-4e3c3de0588d

The distribution is basically (partitions / consumers) + one consumer gets an additional for the fraction part

So you get:

  • 1st consumer gets 6 partitions
  • 2nd consumer gets 5 partitions
  • 3rd consumer gets 5 partitions

So ONLY one consumer is getting messages since there are only messages on the first 5 partitions

@jbarreto in my case, there was only one consumer in this group, so it should consume all the partitions. But *_sometimes *_it did not as i show above. The correct scenario is:
image

@sensoroDj sorry, i was a little confused, you said: "but when i use 0.3.x highlevel api will not work i want to be" and "I found it cant work well in 0.3.x". Should we use 0.3.x or 0.2.29 when using highlevel api?

I am seeing this same issue and am using 0.2.27

Group              Topic                      Pid Offset          logSize         Lag             Owner
mygroup            my_topic                   0   129777          129777          0               mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup            my_topic                   1   128295          128295          0               mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup            my_topic                   2   132121          132121          0               mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup            my_topic                   3   132676          132676          0               mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup            my_topic                   4   139752          139752          0               mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup            my_topic                   5   137116          137116          0               mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup            my_topic                   6   147484          147484          0               mygroup_ac3b93f7-a4c0-4d5b-ac2d-f2a448a7022d
mygroup            my_topic                   7   125432          125432          0               mygroup_ac3b93f7-a4c0-4d5b-ac2d-f2a448a7022d
mygroup            my_topic                   8   131383          131383          0               mygroup_ac3b93f7-a4c0-4d5b-ac2d-f2a448a7022d
mygroup            my_topic                   9   129714          129714          0               mygroup_ac3b93f7-a4c0-4d5b-ac2d-f2a448a7022d
mygroup            my_topic                   10  158580          158580          0               mygroup_ac3b93f7-a4c0-4d5b-ac2d-f2a448a7022d
mygroup            my_topic                   11  149554          149554          0               mygroup_fa6da3e1-8ea0-4521-bbf0-e418c97a09c6
mygroup            my_topic                   12  144542          144542          0               mygroup_fa6da3e1-8ea0-4521-bbf0-e418c97a09c6
mygroup            my_topic                   13  136326          136326          0               mygroup_fa6da3e1-8ea0-4521-bbf0-e418c97a09c6
mygroup            my_topic                   14  122674          136842          14168           mygroup_fa6da3e1-8ea0-4521-bbf0-e418c97a09c6
mygroup            my_topic                   15  152034          152034          0               mygroup_fa6da3e1-8ea0-4521-bbf0-e418c97a09c6

I have 3 consumers which should all be taking an equal share of the messages, but partition 14 is stuck. This was happening before and when a rebalance would occur (like when adding or removing a consumer to the group) the stuck partition would suddenly get drained and all the stuck messages would be processed several days late.

Can someone suggest which version, if any, this issue has been resolved in or how this can be remedied?

@tomwang1013 sorry ! my English is not well ! I mean sometimes it works well in 0.2.x ! I all met your problem ! one consumer for all paritions ! but it is not all partitions to be consumered! I fount it may caused by zookeeper ! kafka consumer not balanced ok!

This is a non-obvious part of Kafka node if you ask me. Consumer starvation versus partition "warmup" with default partitioners can be confusing. It can take several minutes for each partition to get messages (or longer) === you should always produce with a partition # or a keyed message IMO. Also after consumer count > partition count, any new consumers ALWAYS starve all the time...... This is proper operation by design.... but yet again confusing if you don't know the technical details around a rebalance operation.

0.2.29 also has this problem. i have given it up and use a client of another language.

I solved this issue! Maybe it isn't the same reason for others, but here is what it was...

I copied the fetchMaxBytes: 1024 * 10 line from the readme thinking this was standard and used it in the options for my HighLevelConsumer. Apparently, this is a very small buffer. The default if this is left out is 1024 * 1024 bytes.

So, what happened was a message was produced which was too big for my fetchMaxBytes setting. Since I couldn't consume the whole message, it just sat there in the partition waiting to be consumed. And since messages must be consumed in order, new messages for that partition just piled up behind it!

Once I removed my config and let the default do its thing, my stuck partitions all got consumed and everything is working perfectly.

Note: I am currently using version 0.2.27. Just before writing this I had 11 out of 15 partitions stuck. Now all are perfectly fine.

@ericdolson looks like a bug in the documentation. All the options mirror the defaults except for fetchMaxBytes. And according to issue #192 there's a bit of tweaking to the options to get kafka-node to work with each use case. This should also be documented. Interested in submitting a PR? :)

Noticed kafka console consumer will throw an error if the message being fetched exceeds maxBytes. It would be nice if kafka-node could do something like that instead of failing silently.

[2016-04-11 14:58:32,926] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic ProducerConnectivityTest partition 1 at fetch offset 41. Increase the fetch size, or decrease the maximum message size the broker will allow.
    at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:90)
    at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
    at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
    at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
    at kafka.consumer.OldConsumer.receive(BaseConsumer.scala:79)
    at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:110)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:69)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:47)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 1 messages

Facing the same issue with Kafka-node 1.6.2

kafka-node-group nc_delta 0 34620 34620 0 none
kafka-node-group nc_delta 1 34962 34968 6 none
kafka-node-group nc_delta 2 34497 34497 0 none
kafka-node-group nc_delta 3 34595 34595 0 none
kafka-node-group nc_delta 4 35199 35199 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 5 34544 34544 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 6 34782 34782 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 7 33740 33740 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 8 34474 34474 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 9 34180 34180 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 10 34823 34823 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 11 34237 34237 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064