Yakifo/amqtt

Broker liveness probe failures in k8s

beshoyabdelmalak opened this issue · 2 comments

Hi,
I'm trying to use amqtt broker in Kubernetes, and I want to do a health check using liveness probe which leads to NoDataException

Health checks are done as following:

livenessProbe:
  tcpSocket:
    port: 1883
  timeoutSeconds: 90
  periodSeconds: 120
  successThreshold: 1
  failureThreshold: 3

Here are the Broker's logs:

ERROR :: asyncio :: Task exception was never retrieved
future: <Task finished name='Task-59465' coro=<Broker.stream_connected() done, defined at /Users/b.abdelmalak/Library/Caches/pypoetry/virtualenvs/test-broker-gKCTfSBk-py3.8/lib/python3.8/site-packages/amqtt/broker.py:394> exception=NoDataException('No more data')>
Traceback (most recent call last):
  File "/Users/b.abdelmalak/Library/Caches/pypoetry/virtualenvs/test-broker-gKCTfSBk-py3.8/lib/python3.8/site-packages/amqtt/broker.py", line 395, in stream_connected
    await self.client_connected(
  File "/Users/b.abdelmalak/Library/Caches/pypoetry/virtualenvs/test-broker-gKCTfSBk-py3.8/lib/python3.8/site-packages/amqtt/broker.py", line 416, in client_connected
    handler, client_session = await BrokerProtocolHandler.init_from_connect(
  File "/Users/b.abdelmalak/Library/Caches/pypoetry/virtualenvs/test-broker-gKCTfSBk-py3.8/lib/python3.8/site-packages/amqtt/mqtt/protocol/broker_handler.py", line 130, in init_from_connect
    connect = await ConnectPacket.from_stream(reader)
  File "/Users/b.abdelmalak/Library/Caches/pypoetry/virtualenvs/test-broker-gKCTfSBk-py3.8/lib/python3.8/site-packages/amqtt/mqtt/packet.py", line 249, in from_stream
    variable_header = await cls.VARIABLE_HEADER.from_stream(
  File "/Users/b.abdelmalak/Library/Caches/pypoetry/virtualenvs/test-broker-gKCTfSBk-py3.8/lib/python3.8/site-packages/amqtt/mqtt/connect.py", line 120, in from_stream
    protocol_name = await decode_string(reader)
  File "/Users/b.abdelmalak/Library/Caches/pypoetry/virtualenvs/test-broker-gKCTfSBk-py3.8/lib/python3.8/site-packages/amqtt/codecs.py", line 66, in decode_string
    length_bytes = await read_or_raise(reader, 2)
  File "/Users/b.abdelmalak/Library/Caches/pypoetry/virtualenvs/test-broker-gKCTfSBk-py3.8/lib/python3.8/site-packages/amqtt/codecs.py", line 56, in read_or_raise
    raise NoDataException("No more data")
amqtt.errors.NoDataException: No more data

I also noticed that the broker keeps the connection even after Kubernetes closes the socket

To reproduce:

python example

import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server = ('localhost', 1883)
sock.connect(server)
sock.close()

Hi @beshoyabdelmalak

this is to some extend expected behavior: From the point of the broker the connection is closed unexpectedly, which leads to an error message. A better message might be nice though.

To have a proper probe you should do a mqtt connection.

Please have a look at #114 . Apparently the NoDataException was not caught at all and the other exception cases were missing the call to release the connection.