AmpelProject/Ampel-core

Kafka timeout properly taken care of ?

Closed this issue · 1 comments

Following log entry can be seen several times in the output of singularity-stack logs burst alertprocessor -f:

(alertprocessor:12 stdout) Running on epyc.astro.washington.edu:9092 group ampel-v0.3.3
(alertprocessor:15 stderr) %3|1531913500.128|FAIL|rdkafka#consumer-1| [thrd:128.95.79.19:9094/3]: 128.95.79.19:9094/3: 1 request(s) timed out: disconnect (average rtt 282.560ms)
(alertprocessor:14 stdout) Running on epyc.astro.washington.edu:9092 group ampel-v0.3.3

For confirmation: AMPEL handles those timeouts properly right ? ^^

Yes. The ZIAlertFetcher blocks while waiting for messages. When it receives a message, it is passed out to the caller, and the message is marked as consumed (in Kafka jargon, "its offset committed") when the next message is requested:

def consume(self):
"""
Block until one message has arrived, and return it.
Messages returned to the caller marked for committal
upon the _next_ call to consume().
"""
# mark the last emitted message for committal
if self._last_message is not None:
self._consumer.store_offsets(self._last_message)
self._last_message = None
message = None
for _ in range(self._poll_attempts):
# wake up occasionally to catch SIGINT
message = self._consumer.poll(self._poll_interval)
if message is not None:
break
else:
return message
if message.error():
raise RuntimeError(message.error())
else:
self._last_message = message
return message

When a new ZIAlertFetcher starts, it will receive only messages newer than those previously consumed. This means that no alerts can be missed, and alerts will only be delivered twice if the AlertProcessor encounters an error after having consumed the alert, but before it can request the next alert. This should not happen in normal operation.