turtlesoupy/haskakafka

Segmentation fault when looping with consumeMessageBatch

ibizaman opened this issue · 9 comments

Hi,

This is maybe not an issue with haskakafka itself but being relatively knew to haskell in general, I don't know what's going on exactly. Sorry if it's not the case.

So I created a simple consumer consuming and printing all messages in a topic until exhaustion. It then prints the number of messages received. Well, that's what I'm trying to achieve. See code.

But when I launch it:

$ ghc consumer.hs && ./consumer
[1 of 1] Compiling Main             ( consumer.hs, consumer.o )
Linking consumer ...
...
[messages]
...
Segmentation fault (core dumped)
$

See the segmentation fault? I suppose that's not the expected behavior. I correctly see all messages printed out though.

I then tried with the consumer script provided by kafka - which does the same thing as my script - and there nothing seems wrong:

$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic ttp --from-beginning
...
[messages]
...
^CConsumed 200 messages
$

I checked the kafka's server log and with both programs, I get:

[2015-08-09 08:35:21,556] INFO Closing socket connection to /0:0:0:0:0:0:0:1. (kafka.network.Processor)
[2015-08-09 08:35:21,564] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
at kafka.network.Processor.write(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:342)
at java.lang.Thread.run(Thread.java:745)

To be precise, in the case of the kafka provided consumer, the ERROR and subsequent lines appear after issuing the ^C. There's an error but since the log is the same in both cases, I suppose nothing is wrong on the server side.

After a bit more debugging, the segmentation fault appears on the consumeMessageBatch line when there is no more message to fetch. So it consumes every message and after that loops a last time and boom. I would have thought in that case that the consumeMessageBatch would return a Left KafkaError but instead it produces a segmentation fault.
I have the same behavior with consumeMessage

Am I misusing the library in some way?

EDIT: I do not have the same behavior with consumeMessage. It was a false alert.

I tried with the official librdkafka example and there everything is fine:

$ gcc consumer.c -lrdkafka -lz -lpthread -lrt && ./a.out -C -t ttp -p 0 -e
...
[messages]
...
% Consumer reached end of ttp [0] message queue at offset 200
$

And thanks to the -e option, the kafka's server log does not show a broken pipe exception:

[2015-08-11 07:18:07,373] INFO Closing socket connection to /0:0:0:0:0:0:0:1. (kafka.network.Processor)
[2015-08-11 07:18:07,373] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

The latter discovery makes me suspect that haskakafka would behave nicely if only there were no segmentation fault and this line was reached.

This happens to me too! Looks like this is not a known problem at this time.

@ibizaman Thanks for the debugging!

@donatello, in the meantime I investigated a bit more and found a workaround although I'm not satosfied of it. I'll post my finding when I'm back at home.

@ibizaman Thank you in advance!

So the easy fix is to add an if clause around the forM statement inside the consumeMessageBatch function.
Something like this.

I'm not really satisfied because, in fact, I don't understand exactly what's happening in the forM statement and thus my "fix" is more a workaround and I suspect something cleverer could exist.

This is weird. I tried to dig a bit more. Looks like rd_kafka_consume_batch() in librdkafka returns 0, when there is a timeout. The relevant function is here - https://github.com/edenhill/librdkafka/blob/b09ff60c9e3e93815b3491f007460291858b3caf/src/rdkafka.c#L523-L579

Possible better fix is to only change the if condition to numMessages <= 0

In fact using <= was my first guess but it doesn't behave nicely with the getErrno trying to return an error whose code is 0.

The way I fixed it follows the way librdkafka works though. I suppose it would be better to not introduce behavior specific to this kafka binding.

But still, I don't understand why a segfault appears.

Yes, the segfault is a mystery. And I agree, returning a Right on 0 messages is better as it is like librdkafka. Makes sense.

However, reading errno with getErrno is probably better done by instead inspecting the error returned in the librdkafka object - this seems to be the way recommended in librdkafka.

Merged fix