gerritjvv/kafka-fast

Producer information appears to be received by Kafka out of order

Closed this issue · 3 comments

I'm new to Kafka and this library, though not to Clojure. I'm trying to get a minimal Kafka library up and send basic stuff to it from Clojure. To that end, I have installed a boot2docker/docker Kafka installation (spotify/kafka), and it seems to work. I used the basic setup in the README to send data:

(use 'kafka-clj.client :reload)

(def c (create-connector [{:host "192.168.59.103" :port 9092}] {}))

(I'm not using localhost, the IP is from boot2docker ip.)

(doseq [i (range 300)](send-msg c "test" %28.getBytes %28str "Test " i))))

When I do this, the consumer I have on the topic "test" sees some of the information, but it appears to be out of order and sometimes missing, in chunks of about 25 or 50. That is to say, within a group of 25, things look great. But some of the groups of 25 are mixed up with others (out of order globally), and the last several groups of 25 don't show up at all. Some of those show up when I do the next send-msg run, but some never show up at all.

Here is output from the line above:

Test 25
Test 26
Test 27
Test 28
Test 29
Test 30
Test 31
Test 32
Test 33
Test 34
Test 35
Test 36
Test 37
Test 38
Test 39
Test 40
Test 41
Test 42
Test 43
Test 44
Test 45
Test 46
Test 47
Test 48
Test 49
Test 0
Test 1
Test 2
Test 3
Test 4
Test 5
Test 6
Test 7
Test 8
Test 9
Test 10
Test 11
Test 12
Test 13
Test 14
Test 15
Test 16
Test 17
Test 18
Test 19
Test 20
Test 21
Test 22
Test 23
Test 24

Now, I've poked around and found the actual Kafka data store, and looked at the log files, and they seem to be mixed up in the same way that the consumer is seeing them. So I'm thinking that something that the producer is doing is causing Kafka to receive the messages in some way out of order. I must be doing something wrong, but I have no idea what.

This is all in the repl, for what that is worth. I am using pretty much all the default values for most of everything. I started a thread to read the producer-error-ch, and nothing came out of there.

Any idea as to what I might be doing wrong?

hi,
Two parts:

A:
the producer has a property called :batch-num-messages default at 25, which means it will batch up 25 messages and then send to kafka, the sending is done asynchronously i.e each batch might arrive or be processed by kafka at different times from each other.

Using batches you are ensured partially ordered messages but there is no particular order between the batches. Note that kafka does not give you globally ordered on send messages especially if your'e sending from multiple machines or threads this is not possible in a performant way. For this it would need to do something like Cassandra or HBase where you order on a key e.g timestamp, and I believe offsets are only assigned by the brokers themselves and cannot be overwritten from the client.

If you wanted to have ordered messages from a single machine producer you could try and reduce :io-threads to 1 at the expense of having less send throughput, but from multiple machines/processes/threads you'll still see out of order.

(create-connector bootstrap-brokers {:io-threads 1 :batch-num-messages 25})

B:

The sending to the TCP connection is done without flushing on a BufferedOutputStream which is done for max performance.

If you wanted to flush on every write of a batch then set :flush-on-write to true when creating the connector

(create-connector bootstrap-brokers {:flush-on-write true})

Thank you for a very clear explanation of where my assumptions were wrong, as well as how to configure the producer to try to make it match my assumptions. I was naively assuming that since I was using the producer is a simplistic way that the underlying mechanisms would also be simple (and serial). Thanks again!

No probs, glad I could explain it clearly :).