Kafka Sensors stop working after a few days
piby180 opened this issue · 3 comments
Describe the bug
Kafka sensors stop processing messages after a few days. Restarting the pods fix the issue
To Reproduce
Steps to reproduce the behavior:
- Go to '...'
- Click on '....'
- Scroll down to '....'
- See error
Expected behavior
A clear and concise description of what you expected to happen.
Kafka sensors should process messages indefinitely
Here are the error logs I receive
Screenshots
If applicable, add screenshots to help explain your problem.
Environment (please complete the following information):
- Kubernetes: [v1.28.12-eks-2f46c53]
- Argo-Workflows: [v3.5.8]
- Argo Events: [v1.9.2]
Additional context
Add any other context about the problem here.
Here are the error logs I receive
{"level":"error","ts":1725366286.0299656,"logger":"argo-events.sensor","caller":"sensor/kafka_sensor.go:222","msg":"Failed to consume","sensorName":"xxx","error":"kafka: response did not contain all the expected topic/partition blocks","stacktrace":"github.com/argoproj/argo-events/eventbus/kafka/sensor.(*KafkaSensor).Listen\n\t/home/runner/work/argo-events/argo-events/eventbus/kafka/sensor/kafka_sensor.go:222"}
{"level":"info","ts":1725366291.0162213,"logger":"argo-events.sensor","caller":"sensors/listener.go:311","msg":"EventBus connection lost, reconnecting...","sensorName":"xxx","triggerName":"xxx"}
{"level":"info","ts":1725366291.016309,"logger":"argo-events.sensor","caller":"sensors/listener.go:317","msg":"reconnected to EventBus.","sensorName":"xxx","triggerName":"xxx","connection":"KafkaTriggerConnection{Sensor:xxx,Trigger:xxx}"}
{"level":"info","ts":1725366291.0163748,"logger":"argo-events.sensor","caller":"sensor/kafka_sensor.go:210","msg":"Consuming","sensorName":"xxx","topics":["argo_event_bus","argo_event_bus-xxx-trigger","argo_event_bus-xxx-action"],"group":"argo-dev-xxx"}
{"level":"info","ts":1725366291.026031,"logger":"argo-events.sensor","caller":"sensor/kafka_handler.go:75","msg":"Kafka setup","sensorName":"xxx","claims":{"argo_event_bus":[0],"argo_event_bus-xxx-action":[0],"argo_event_bus-xxx-trigger":[0]}}
{"level":"info","ts":1725366291.0272484,"logger":"argo-events.sensor","caller":"sensor/kafka_handler.go:124","msg":"Kafka cleanup","sensorName":"xxx","claims":{"argo_event_bus":[0],"argo_event_bus-xxx-action":[0],"argo_event_bus-xxx-trigger":[0]}}
{"level":"error","ts":1725366291.0272818,"logger":"argo-events.sensor","caller":"sensor/kafka_sensor.go:222","msg":"Failed to consume","sensorName":"xxx","error":"kafka: response did not contain all the expected topic/partition blocks","stacktrace":"github.com/argoproj/argo-events/eventbus/kafka/sensor.(*KafkaSensor).Listen\n\t/home/runner/work/argo-events/argo-events/eventbus/kafka/sensor/kafka_sensor.go:222"}
Message from the maintainers:
If you wish to see this enhancement implemented please add a 👍 reaction to this issue! We often sort issues this way to know what to prioritize.