"Interrupted consumption" from riemann.kafka and it never recovers
szrumi opened this issue · 3 comments
The Riemann Kafka client stops with "Interrupted consumption" and it never recovers:
ERROR [2022-06-02 13:31:13,434] clojure-agent-send-off-pool-3 - riemann.kafka - Interrupted consumption
java.lang.NullPointerException: null
at riemann.common$event.invokeStatic(common.clj:142)
at riemann.common$event.invoke(common.clj:139)
at riemann.kafka$start_kafka_thread$fn__11154.invoke(kafka.clj:85)
at clojure.core$binding_conveyor_fn$fn__5476.invoke(core.clj:2022)
at clojure.lang.AFn.call(AFn.java:18)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
This has happened sporadically. Correlates with an accumulated backlog like a stopped consumer due to a Riemann restart or sometimes just a busy time where the consumer has to read lots of messages quickly.
Riemann 0.3.6 - 0.3.8 on recent Linux/amd64, OpenJDK.
Happens both using Kafka 2.7.0/ZooKeeper and Kafka 3.2.0/KRaft.
The Riemann Kafka client should attempt to recover gracefully after most issues with a Kafka connection that has been working before.
My investigation suggests that this particular java.lang.NullPointerException
is triggered by a Fetch v6 Response
that's got no Message Set
attached to it. The riemann.kafka code is getting a record
like this: {:key nil, :offset 7053063, :partition 2, :topic "riemann", :value nil}
and then (event nil)
is throwing the java.lang.NullPointerException
.
Shortly after this empty Fetch v6 Response
arrives, the client sends a deliberate LeaveGroup v1 Request
and gets disconnected. According to KIP-219 the empty Fetch v6 Response
may be a legitimate throttling behaviour from Kafka trying to signal to the client to slow down it's requests. However I'm not sure this is the reason because throttle_time_ms
in the empty Fetch v6 Response
is set to 0 whereas in my understanding it should be set to >0 in these throttling situations.
Actually, the bug could be here [1]. In essence, if it receive (event nil)
, it will try to evaluate to (nil :time)
causing exception. Instead, it should be (:time opts)
and (:time nil)
will correctly return nil.
@jamtur01 will you be able to commit it directly or should I create a PR for this? I think the fix looks too small for a PR machinery :D
diff --git a/src/riemann/common.clj b/src/riemann/common.clj
index 8c86ffe..c41d15f 100644
--- a/src/riemann/common.clj
+++ b/src/riemann/common.clj
@@ -139,7 +139,7 @@
(defn event
"Create a new event from a map."
[opts]
- (let [t (long (round (or (opts :time)
+ (let [t (long (round (or (:time opts)
(unix-time))))]
(map->Event (merge opts {:time t}))))
[1] https://github.com/riemann/riemann/blob/main/src/riemann/common.clj#L142