riemann/riemann

Auto-reload the Kafka Consumer

vitorbrandao opened this issue · 3 comments

I'm running Riemann 0.3.6, Dockerised, in an AWS ECS task, producing and consuming events from Kafka. Occasionally, for a reason yet to be determined, I see these messages in the logs:

ERROR [2021-01-21 20:58:34,560] clojure-agent-send-off-pool-3 - riemann.kafka - Interrupted consumption
java.net.SocketException: Broken pipe (Write failed)
at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:233)
at java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:312)
at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:316)
at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:153)
at java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:254)
at riemann.graphite.GraphiteTCPClient.send_lines(graphite.clj:41)
at riemann.graphite$graphite$fn__10762.invoke(graphite.clj:174)
at riemann.core$stream_BANG_$fn__10000.invoke(core.clj:20)
at riemann.core$stream_BANG_.invokeStatic(core.clj:19)
at riemann.core$stream_BANG_.invoke(core.clj:15)
at riemann.kafka$start_kafka_thread$fn__11398.invoke(kafka.clj:86)
at clojure.core$binding_conveyor_fn$fn__5476.invoke(core.clj:2022)
at clojure.lang.AFn.call(AFn.java:18)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)

DEBUG [2021-01-21 20:58:34,570] kafka-coordinator-heartbeat-thread | metrics - org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=riemann-consumer-2-252, groupId=metrics] Heartbeat thread has closed

DEBUG [2021-01-21 20:58:34,575] clojure-agent-send-off-pool-3 - org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:
... (more similar "Removed sensor with name [sensor]" messages)

DEBUG [2021-01-21 20:58:34,585] clojure-agent-send-off-pool-3 - org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=riemann-consumer-2-252, groupId=metrics] Kafka consumer has been closed

WARN [2021-01-22 22:15:06,631] Thread-4 - riemann.core - instrumentation service caught
java.net.SocketException: Connection reset
at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:233)
at java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:312)
at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:316)
at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:153)
at java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:254)
at riemann.graphite.GraphiteTCPClient.send_lines(graphite.clj:41)
at riemann.graphite$graphite$fn__10762.invoke(graphite.clj:174)
at riemann.core$stream_BANG_$fn__10000.invoke(core.clj:20)
at riemann.core$stream_BANG_.invokeStatic(core.clj:19)
at riemann.core$stream_BANG_.invoke(core.clj:15)
at riemann.core$instrumentation_service$measure__10009.invoke(core.clj:59)
at riemann.service.ThreadService$thread_service_runner__6715$fn__6716.invoke(service.clj:101)
at riemann.service.ThreadService$thread_service_runner__6715.invoke(service.clj:100)
at clojure.lang.AFn.run(AFn.java:22)
at java.base/java.lang.Thread.run(Thread.java:844)

At this point, the Kafka consumer has exited and no metrics are being pulled from Kafka. However, other agents such as graphite and the Riemann task itself happily continue executing. As the ECS task is healthy I receive no alerts nor is the task automatically replaced by AWS.

Is there a mechanism to force Riemann to stop if one or all of the Kafka threads exits? Or, alternatively, can the Kafka consumer thread be respawned?

I've posted this same question to the riemann-users mailing list as I'm unsure which medium is more appropriate for this type of query.

sanel commented

I'll try to reply on mailing list.

Please re-open if you still have issues.