riemann/riemann

"Interrupted consumption" from riemann.kafka and it never recovers

szrumi opened this issue · 3 comments

The Riemann Kafka client stops with "Interrupted consumption" and it never recovers:

ERROR [2022-06-02 13:31:13,434] clojure-agent-send-off-pool-3 - riemann.kafka - Interrupted consumption
java.lang.NullPointerException: null
        at riemann.common$event.invokeStatic(common.clj:142)
        at riemann.common$event.invoke(common.clj:139)
        at riemann.kafka$start_kafka_thread$fn__11154.invoke(kafka.clj:85)
        at clojure.core$binding_conveyor_fn$fn__5476.invoke(core.clj:2022)
        at clojure.lang.AFn.call(AFn.java:18)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

This has happened sporadically. Correlates with an accumulated backlog like a stopped consumer due to a Riemann restart or sometimes just a busy time where the consumer has to read lots of messages quickly.

Riemann 0.3.6 - 0.3.8 on recent Linux/amd64, OpenJDK.
Happens both using Kafka 2.7.0/ZooKeeper and Kafka 3.2.0/KRaft.

The Riemann Kafka client should attempt to recover gracefully after most issues with a Kafka connection that has been working before.

My investigation suggests that this particular java.lang.NullPointerException is triggered by a Fetch v6 Response that's got no Message Set attached to it. The riemann.kafka code is getting a record like this: {:key nil, :offset 7053063, :partition 2, :topic "riemann", :value nil} and then (event nil) is throwing the java.lang.NullPointerException.

Shortly after this empty Fetch v6 Response arrives, the client sends a deliberate LeaveGroup v1 Request and gets disconnected. According to KIP-219 the empty Fetch v6 Response may be a legitimate throttling behaviour from Kafka trying to signal to the client to slow down it's requests. However I'm not sure this is the reason because throttle_time_ms in the empty Fetch v6 Response is set to 0 whereas in my understanding it should be set to >0 in these throttling situations.

sanel commented

Actually, the bug could be here [1]. In essence, if it receive (event nil), it will try to evaluate to (nil :time) causing exception. Instead, it should be (:time opts) and (:time nil) will correctly return nil.

@jamtur01 will you be able to commit it directly or should I create a PR for this? I think the fix looks too small for a PR machinery :D

diff --git a/src/riemann/common.clj b/src/riemann/common.clj
index 8c86ffe..c41d15f 100644
--- a/src/riemann/common.clj
+++ b/src/riemann/common.clj
@@ -139,7 +139,7 @@
 (defn event
   "Create a new event from a map."
   [opts]
-  (let [t (long (round (or (opts :time)
+  (let [t (long (round (or (:time opts)
                            (unix-time))))]
     (map->Event (merge opts {:time t}))))

[1] https://github.com/riemann/riemann/blob/main/src/riemann/common.clj#L142

pyr commented

@sanel I still went ahead and created a pr