argoproj/argo-events

Kafka Sensors stop working after a few days

piby180 opened this issue · 3 comments

Describe the bug
Kafka sensors stop processing messages after a few days. Restarting the pods fix the issue

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Kafka sensors should process messages indefinitely

Here are the error logs I receive

Screenshots
If applicable, add screenshots to help explain your problem.

Environment (please complete the following information):

  • Kubernetes: [v1.28.12-eks-2f46c53]
  • Argo-Workflows: [v3.5.8]
  • Argo Events: [v1.9.2]

Additional context
Add any other context about the problem here.

Here are the error logs I receive

{"level":"error","ts":1725366286.0299656,"logger":"argo-events.sensor","caller":"sensor/kafka_sensor.go:222","msg":"Failed to consume","sensorName":"xxx","error":"kafka: response did not contain all the expected topic/partition blocks","stacktrace":"github.com/argoproj/argo-events/eventbus/kafka/sensor.(*KafkaSensor).Listen\n\t/home/runner/work/argo-events/argo-events/eventbus/kafka/sensor/kafka_sensor.go:222"}
{"level":"info","ts":1725366291.0162213,"logger":"argo-events.sensor","caller":"sensors/listener.go:311","msg":"EventBus connection lost, reconnecting...","sensorName":"xxx","triggerName":"xxx"}
{"level":"info","ts":1725366291.016309,"logger":"argo-events.sensor","caller":"sensors/listener.go:317","msg":"reconnected to EventBus.","sensorName":"xxx","triggerName":"xxx","connection":"KafkaTriggerConnection{Sensor:xxx,Trigger:xxx}"}
{"level":"info","ts":1725366291.0163748,"logger":"argo-events.sensor","caller":"sensor/kafka_sensor.go:210","msg":"Consuming","sensorName":"xxx","topics":["argo_event_bus","argo_event_bus-xxx-trigger","argo_event_bus-xxx-action"],"group":"argo-dev-xxx"}
{"level":"info","ts":1725366291.026031,"logger":"argo-events.sensor","caller":"sensor/kafka_handler.go:75","msg":"Kafka setup","sensorName":"xxx","claims":{"argo_event_bus":[0],"argo_event_bus-xxx-action":[0],"argo_event_bus-xxx-trigger":[0]}}
{"level":"info","ts":1725366291.0272484,"logger":"argo-events.sensor","caller":"sensor/kafka_handler.go:124","msg":"Kafka cleanup","sensorName":"xxx","claims":{"argo_event_bus":[0],"argo_event_bus-xxx-action":[0],"argo_event_bus-xxx-trigger":[0]}}
{"level":"error","ts":1725366291.0272818,"logger":"argo-events.sensor","caller":"sensor/kafka_sensor.go:222","msg":"Failed to consume","sensorName":"xxx","error":"kafka: response did not contain all the expected topic/partition blocks","stacktrace":"github.com/argoproj/argo-events/eventbus/kafka/sensor.(*KafkaSensor).Listen\n\t/home/runner/work/argo-events/argo-events/eventbus/kafka/sensor/kafka_sensor.go:222"}


Message from the maintainers:

If you wish to see this enhancement implemented please add a 👍 reaction to this issue! We often sort issues this way to know what to prioritize.