onyx-platform/onyx

Watermark doesn't work with Kafka topic having > 1 partition

neverfox opened this issue · 6 comments

I'm not sure if this is a bug or a known limitation but I have created a sample project to demonstrate.

Just run the docker-compose to startup zk and kafka, then run lein test. The test will finish after 30 seconds. The job will log when the input watermark is set for each new segment (also logged). Additionally, it logs the :watermarks from the state-event map and any watermark trigger fire. When the num-partitions at the top of the test file is > 1 (default 2), the :input key stays at 0. Obviously, this causes the watermark trigger to never fire. If you change num-partitions to 1, :input is set properly and the triggers fire as expected.

Happy to move to onyx-kafka plugin, if you think it's an issue that lies there.

Hi @neverfox, thanks for the sample project repo, it made this a lot easier for me to look into.

The problem is that all of your data is ending up on one partition, and the watermarks always takes the minimum watermark over all of the input peers (and if using the native kafka watermarks, the minimum watermark for a given peer).

As you call g/send with small amounts of data, and auto partition assignment, all of your data is ending up on one partition, meaning that the other partition's peer continues emitting a watermark of 0.

If you call (g/send p topic (rand-int 2) k (json/generate-string v)) or push more data in, you should see it work like you expect.

@lbradstreet Thanks for looking at this and for the excellent explanation. It certainly makes sense given the way multiple inputs need to stay aligned, but it is somewhat problematic. If topic partitions are being treated as different inputs, then there's a tension between that and the fact (more likely than not) that a topic is really one stream of data, no matter then number of partitions. I wonder if it's not better to treat topic partitions less like independent inputs from an Onyx perspective and more like a single input with parallelism abstracted on a different level. In other words, for a single Kafka topic, maximum might make more sense, but I'm not sure that's something that could be configured uniquely for a type of plugin. Thoughts?

As long as it works this way, we might have to come up with a partition hash that is more random. This is really only an issue, as you point out, if the data is distributed unevenly across partitions with regard to the relevant timestamp.

Right, unfortunately there's no other way for us to do it, while also maintaining correct watermark firing, as the consumer may lag behind on one partition, and this would cause the watermark to fire prematurely.

Another option would be to split it into one job per partition if latency is a big concern.

Understood. In our case, there's really no need for a message key. We can move it to the message itself, null the key, and get automatic random partitioning.