The best practice of handling offsets of asynchronous consuming?
asaarashi opened this issue · 6 comments
Hi, thanks for sharing the great library!
I have a case as the following, the consumer consumes the message asynchronously, so the offset may not be committed serially. What is the best practice of handling offsets to guarantee the offset committing correctly?
for {
select {
case m, ok := <- consumer.Messages():
if ok {
go func(m *sarama.ConsumerMessage) {
exportMessage := worker.Ingest(m)
// TODO: Commit the offsets
}(m)
}
}
}
You could use a channel to tell the main grouting about messages that have been completely processed.
However, you should be careful with asynchronously processing messages. Because not all messages will take equally long to process, a message with offset 3 could be done before a message with offset 2. What do you want to do in this case?
On Sep 29, 2016, at 4:52 AM, shiberiu notifications@github.com wrote:
Hi, thanks for sharing the great library!
I have a case as the following, the consumer consumes the message asynchronously, so the offset may not be committed serially. What is the best practice of handling offsets to guarantee the offset committing correctly?for {
select {
case m, ok := <- consumer.Messages():
if ok {
go func(m *sarama.ConsumerMessage) {
exportMessage := worker.Ingest(m)// TODO: Commit the processed offsets }(m) } } }
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub #106, or mute the thread https://github.com/notifications/unsubscribe-auth/AAA9_v8ciuq-LXc6LcuQymkBzUOClDudks5quyfQgaJpZM4KJhwO.
Thanks for quick reply.
I need to consume message at high concurrency, so I fork many goroutines to do that.
Due to avoid the risk of offset management of asynchronous consuming message, I figure out a more safer way: commit the offset immediately, and if fails to upload the message, save it for re-consume.
for {
select {
case m, ok := <- consumer.Messages():
if ok {
if err := consumer.CommitUpto(m); err == nil {
go func(m *sarama.ConsumerMessage) {
exportMessage := worker.Ingest(m)
// TODO: Uploads the consumed message. If fails to upload, saves the message for re-consume.
}(m)
}
}
}
}
Is this a possible solution?
The standard solution to consume faster is by having more partitions and more consumer instances.
If you can get that to work, that would be by far my preferred solution because you can let go of all the complexity, and you get to keep all the guarantees Kafka offers.
If that doesn’t work for you, it really depends on what kind of guarantees your application need.
Saving failed messages for re-consume could work, but it really depends on your application.
You will lose the ordering guarantee with this approach, for instance.
Willem
On Sep 29, 2016, at 8:47 AM, shiberiu notifications@github.com wrote:
Thanks for quick reply.
I need to consume message at high concurrency, so I fork many goroutines to do that.
Due to avoid the risk of offset management of asynchronous consuming message, I figure out a more safer way: commit the offset immediately, and if fails to upload the message, save it for re-consume.
for { select { case m, ok := <- consumer.Messages(): if ok { if err := consumer.CommitUpto(m); err == nil { go func(m *sarama.ConsumerMessage) { exportMessage := worker.Ingest(m) // TODO: Uploads the consumed message. If fails to upload, saves the message for re-consume. }(m) } } } }
Is this a possible solution?
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub #106 (comment), or mute the thread https://github.com/notifications/unsubscribe-auth/AAA9_lIjCGHA0IwWMK_gYFvbVaSQhGsCks5qu18LgaJpZM4KJhwO.
Yes, that may be a better approach. More standard and less complexity.
Assumes that there are 2 partitions in topic1, would it look like the following?
consumer1, consumerErr1 := consumergroup.JoinConsumerGroup("main-group", []string{"topic1"}, zookeeperNodes, kafkaConfig)
consumer2, consumerErr2 := consumergroup.JoinConsumerGroup("main-group", []string{"topic1"}, zookeeperNodes, kafkaConfig)
go func() {
for {
select {
case consumer1.Messages():
// Process message...
}
}
}()
go func() {
for {
select {
case consumer2.Messages():
// Process message...
}
}
}()
// ......
No, you just call JoinConsumerGroup
once. Then, you simply start the application several times simultaneously. Added benefit of this is that you can do this on multiple servers.
Cleared. Thank you!