for topic of multiple partitions, sometimes not all partitions are consumed by highlevelconsumer
tomwang1013 opened this issue · 18 comments
Does this issue happen in version 0.2.30 ?
@hyperlink: yes
I also encountered this situation。but in version 0.2.29 no this problem
@tomwang1013 and what tools do you use to see the partitions? thank you
@sensoroDj are you sure that 0.2.29 not has this problem? i will revert to 0.2.29
Sorry . I use the 0.2.29 version! my partitions and consumers are one-for-one ! I will let one consumer consume one partitions eg: 3 patitions to 3 consumers ! but when i use 0.3.x highlevel api will not work i want to be. I recommand you to use low version when you use highleve api. I found it cant work well in 0.3.x
@tomwang1013 can you give the output using kafka tools ?
Something like
/opt/kafka/kafka_2.11-0.8.2.1/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group kafka-node-group --zookeeper [ZKHOST:ZKPORT] --topic [TOPIC_NAME]
example:
/opt/kafka/kafka_2.11-0.8.2.1/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group kafka-node-group --zookeeper local.zookeeper.net:2181 --topic feedUpdates2
The only scenario I have been able to reproduce your issue is when you are only sending messages to a few partitions and you don'f have one consumer per partition, for example
- Publishing to partitions 0,1,2,3,4
- Running 3 consumers
You will get something like:
Group Topic Pid Offset logSize Lag Owner
kafka-node-group feedUpdates2 0 170841 178521 7680 kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2 1 160526 167043 6517 kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2 2 171471 177998 6527 kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2 3 158815 166503 7688 kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2 4 159599 166138 6539 kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2 5 7537 7537 0 kafka-node-group_2a574686-29d6-4aaa-ad0d-812691bdf8b9
kafka-node-group feedUpdates2 6 7537 7537 0 kafka-node-group_5cf5a5bf-2d45-4a2a-b108-3277f2c3e43a
kafka-node-group feedUpdates2 7 7537 7537 0 kafka-node-group_5cf5a5bf-2d45-4a2a-b108-3277f2c3e43a
kafka-node-group feedUpdates2 8 7536 7536 0 kafka-node-group_5cf5a5bf-2d45-4a2a-b108-3277f2c3e43a
kafka-node-group feedUpdates2 9 7534 7534 0 kafka-node-group_5cf5a5bf-2d45-4a2a-b108-3277f2c3e43a
kafka-node-group feedUpdates2 10 7534 7534 0 kafka-node-group_5cf5a5bf-2d45-4a2a-b108-3277f2c3e43a
kafka-node-group feedUpdates2 11 7533 7533 0 kafka-node-group_d694cb35-b334-4d67-a393-4e3c3de0588d
kafka-node-group feedUpdates2 12 7530 7530 0 kafka-node-group_d694cb35-b334-4d67-a393-4e3c3de0588d
kafka-node-group feedUpdates2 13 7530 7530 0 kafka-node-group_d694cb35-b334-4d67-a393-4e3c3de0588d
kafka-node-group feedUpdates2 14 7530 7530 0 kafka-node-group_d694cb35-b334-4d67-a393-4e3c3de0588d
kafka-node-group feedUpdates2 15 7529 7529 0 kafka-node-group_d694cb35-b334-4d67-a393-4e3c3de0588d
The distribution is basically (partitions / consumers) + one consumer gets an additional for the fraction part
So you get:
- 1st consumer gets 6 partitions
- 2nd consumer gets 5 partitions
- 3rd consumer gets 5 partitions
So ONLY one consumer is getting messages since there are only messages on the first 5 partitions
@jbarreto in my case, there was only one consumer in this group, so it should consume all the partitions. But *_sometimes *_it did not as i show above. The correct scenario is:
@sensoroDj sorry, i was a little confused, you said: "but when i use 0.3.x highlevel api will not work i want to be" and "I found it cant work well in 0.3.x". Should we use 0.3.x or 0.2.29 when using highlevel api?
I am seeing this same issue and am using 0.2.27
Group Topic Pid Offset logSize Lag Owner
mygroup my_topic 0 129777 129777 0 mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup my_topic 1 128295 128295 0 mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup my_topic 2 132121 132121 0 mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup my_topic 3 132676 132676 0 mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup my_topic 4 139752 139752 0 mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup my_topic 5 137116 137116 0 mygroup_8000d5a3-6f3f-4810-b550-9964b3399cb0
mygroup my_topic 6 147484 147484 0 mygroup_ac3b93f7-a4c0-4d5b-ac2d-f2a448a7022d
mygroup my_topic 7 125432 125432 0 mygroup_ac3b93f7-a4c0-4d5b-ac2d-f2a448a7022d
mygroup my_topic 8 131383 131383 0 mygroup_ac3b93f7-a4c0-4d5b-ac2d-f2a448a7022d
mygroup my_topic 9 129714 129714 0 mygroup_ac3b93f7-a4c0-4d5b-ac2d-f2a448a7022d
mygroup my_topic 10 158580 158580 0 mygroup_ac3b93f7-a4c0-4d5b-ac2d-f2a448a7022d
mygroup my_topic 11 149554 149554 0 mygroup_fa6da3e1-8ea0-4521-bbf0-e418c97a09c6
mygroup my_topic 12 144542 144542 0 mygroup_fa6da3e1-8ea0-4521-bbf0-e418c97a09c6
mygroup my_topic 13 136326 136326 0 mygroup_fa6da3e1-8ea0-4521-bbf0-e418c97a09c6
mygroup my_topic 14 122674 136842 14168 mygroup_fa6da3e1-8ea0-4521-bbf0-e418c97a09c6
mygroup my_topic 15 152034 152034 0 mygroup_fa6da3e1-8ea0-4521-bbf0-e418c97a09c6
I have 3 consumers which should all be taking an equal share of the messages, but partition 14 is stuck. This was happening before and when a rebalance would occur (like when adding or removing a consumer to the group) the stuck partition would suddenly get drained and all the stuck messages would be processed several days late.
Can someone suggest which version, if any, this issue has been resolved in or how this can be remedied?
@tomwang1013 sorry ! my English is not well ! I mean sometimes it works well in 0.2.x ! I all met your problem ! one consumer for all paritions ! but it is not all partitions to be consumered! I fount it may caused by zookeeper ! kafka consumer not balanced ok!
This is a non-obvious part of Kafka node if you ask me. Consumer starvation versus partition "warmup" with default partitioners can be confusing. It can take several minutes for each partition to get messages (or longer) === you should always produce with a partition # or a keyed message IMO. Also after consumer count > partition count, any new consumers ALWAYS starve all the time...... This is proper operation by design.... but yet again confusing if you don't know the technical details around a rebalance operation.
0.2.29 also has this problem. i have given it up and use a client of another language.
I solved this issue! Maybe it isn't the same reason for others, but here is what it was...
I copied the fetchMaxBytes: 1024 * 10
line from the readme thinking this was standard and used it in the options
for my HighLevelConsumer. Apparently, this is a very small buffer. The default if this is left out is 1024 * 1024
bytes.
So, what happened was a message was produced which was too big for my fetchMaxBytes
setting. Since I couldn't consume the whole message, it just sat there in the partition waiting to be consumed. And since messages must be consumed in order, new messages for that partition just piled up behind it!
Once I removed my config and let the default do its thing, my stuck partitions all got consumed and everything is working perfectly.
Note: I am currently using version 0.2.27. Just before writing this I had 11 out of 15 partitions stuck. Now all are perfectly fine.
@ericdolson looks like a bug in the documentation. All the options mirror the defaults except for fetchMaxBytes
. And according to issue #192 there's a bit of tweaking to the options to get kafka-node
to work with each use case. This should also be documented. Interested in submitting a PR? :)
Noticed kafka console consumer will throw an error if the message being fetched exceeds maxBytes. It would be nice if kafka-node
could do something like that instead of failing silently.
[2016-04-11 14:58:32,926] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic ProducerConnectivityTest partition 1 at fetch offset 41. Increase the fetch size, or decrease the maximum message size the broker will allow.
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:90)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at kafka.consumer.OldConsumer.receive(BaseConsumer.scala:79)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:110)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:69)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:47)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 1 messages
Facing the same issue with Kafka-node 1.6.2
kafka-node-group nc_delta 0 34620 34620 0 none
kafka-node-group nc_delta 1 34962 34968 6 none
kafka-node-group nc_delta 2 34497 34497 0 none
kafka-node-group nc_delta 3 34595 34595 0 none
kafka-node-group nc_delta 4 35199 35199 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 5 34544 34544 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 6 34782 34782 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 7 33740 33740 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 8 34474 34474 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 9 34180 34180 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 10 34823 34823 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064
kafka-node-group nc_delta 11 34237 34237 0 kafka-node-group_84aa011e-ef75-429f-8378-533a25bd9064