adafruit/Adafruit_CircuitPython_MiniMQTT

MQTT connect() does not timeout, causing hang

Closed this issue · 3 comments

This is pretty much the same as #110 except:

  • it happens for connect()
  • I have fully reproducible test case

I think this is actually a CircuitPython problem, however this was discovered while trying to get adafruit_minimqtt to work on (supposedly) flaky network, so I am filing this here for initial investigation.

This is happening on Adafruit CircuitPython 7.3.2 on 2022-07-20; Adafruit Feather ESP32S2 with ESP32S2, particularly on Adafruit ESP32-S2 Feather with BME280 Sensor - STEMMA QT - 4MB Flash + 2 MB PSRAM. There is 2000 mAh battery and the Adafruit TMP117 ±0.1°C High Accuracy I2C Temperature Sensor connected. I don't think the sensor and battery matter for this issue, I am adding them for completeness.

Assume the following code:

# SPDX-FileCopyrightText: 2021 Kattni Rembor for Adafruit Industries
# SPDX-License-Identifier: Unlicense
"""
Demonstrates MQTT client hang. Run 'nc -k -l 4444' on the '172.40.0.3' server first.
"""
import alarm
import time
import board
import wifi
import digitalio
import ssl
import socketpool
import json
import time
import sys
import adafruit_minimqtt.adafruit_minimqtt as MQTT
try:
    from secrets import secrets
except ImportError:
    print("WiFi and Adafruit IO credentials are kept in secrets.py, please add them there!")
    raise
    
# MQTT Topic
# Use this topic if you'd like to connect to a standard MQTT broker
mqtt_topic = "devices/terasa/shield"

# Duration of sleep in seconds. Default is 600 seconds (10 minutes).
# Feather will sleep for this duration between sensor readings.
sleep_duration = 20

def log(msg):    
    print(f"{time.monotonic()}: {msg}")

### Code ###
# Define callback methods which are called when events occur
# pylint: disable=unused-argument, redefined-outer-name
def connect(mqtt_client, userdata, flags, rc):
    # This function will be called when the mqtt_client is connected
    # successfully to the broker.
    log("Connected to MQTT Broker!")
    log("Flags: {0}\n RC: {1}".format(flags, rc))


def disconnect(mqtt_client, userdata, rc):
    # This method is called when the mqtt_client disconnects
    # from the broker.
    log("Disconnected from MQTT Broker!")


def publish(mqtt_client, userdata, topic, pid):
    # This method is called when the mqtt_client publishes data to a feed.
    log("Published to {0} with PID {1}".format(topic, pid))


def go_to_sleep(sleep_period):
    # Turn off I2C power by setting it to input
    i2c_power = digitalio.DigitalInOut(board.I2C_POWER)
    i2c_power.switch_to_input()

    # Create an alarm that will trigger sleep_period number of seconds from now.
    time_alarm = alarm.time.TimeAlarm(monotonic_time=time.monotonic() + sleep_period)
    # Exit and deep sleep until the alarm wakes us.
    alarm.exit_and_deep_sleep_until_alarms(time_alarm)


def main():
    log(f"Running with {MQTT.__version__}")
    try:
        # Connect to Wi-Fi
        log("Connecting to wifi")
        wifi.radio.connect(secrets["ssid"], secrets["password"], timeout=10)
        log("Connected to {}!".format(secrets["ssid"]))
        log(f"IP: {wifi.radio.ipv4_address}")
    except Exception as e:  # pylint: disable=broad-except
        logger.error("Troubles getting IP connectivity: {e}")
        go_to_sleep(60)
        return

    # Create a socket pool
    pool = socketpool.SocketPool(wifi.radio)

    # Set up a MiniMQTT Client
    mqtt_client = MQTT.MQTT(
        broker=secrets["broker"],
        port=secrets["broker_port"],
        socket_pool=pool,
        ssl_context=ssl.create_default_context(),
    )

    # Connect callback handlers to mqtt_client
    mqtt_client.on_connect = connect
    mqtt_client.on_disconnect = disconnect
    mqtt_client.on_publish = publish

    log("Attempting to connect to MQTT broker %s" % mqtt_client.broker)
    
    try:
        mqtt_client.connect()
    except Exception as e:
        log(f"Got exception when connecting to MQTT broker, shortening sleep timeout to 60s: {e}")
        go_to_sleep(60)
        return

    logger.info(f"Publishing to {mqtt_topic}")
    data = {
        "foo": "bar",
    }
    mqtt_client.publish(mqtt_topic, json.dumps(data))
    log("Disconnecting from MQTT broker")
    mqtt_client.disconnect()

    log("Going to sleep")
    go_to_sleep(sleep_duration)
    

try:
    main()
except Exception as e:
    log(f"error: {e}")

The secrets.py file looks like this:

# This file is where you keep secret settings, passwords, and tokens!
# If you put them in the code you risk committing that info or sharing it

secrets = {
    "ssid": "foo",
    "password": "bar",
    "broker": "172.40.0.3",
    "broker_port": 4444,
}

This is based on CircuitPython example code on https://learn.adafruit.com/mqtt-in-circuitpython/connecting-to-a-mqtt-broker

In my case, sometimes, for some reason, the MQTT broker does not respond to messages sent by the MQTT client, using the above code. The reason might be e.g. packet loss or some software/environment issue, however for this case it does not matter. What happens in such case is that the client hangs, instead of timing out.

If there is similar problem with MQTT broker communication, the MQTT client should properly timeout, however this does not happen. Internally, the socket timeout is set to 1 second (#112). This timeout should matter for various operations as stated on https://docs.python.org/3/library/socket.html#socket.socket.settimeout

Now, instead of letting the above MQTT client connect to a real MQTT broker, I will let it to connect to a fake server, which accepts any messages, however does not send anything back. This is done via Netcat like so: nc -l 4444

Now, when the MQTT client connects to such server, it reports this:

code.py output:
302.687: Running with 5.3.2
302.688: Connecting to wifi
307.472: Connected to Robocop!
307.473: IP: 172.40.0.13
307.476: Attempting to connect to MQTT broker 172.40.0.3

and that's it. It would stay like this for minutes or hours.

Observing the traffic, it goes like this:

00:41:52.160359 IP 172.40.0.13.58368 > 172.40.0.3.4444: Flags [S], seq 1769067182, win 2880, options [mss 1440], length 0
00:41:52.160432 IP 172.40.0.3.4444 > 172.40.0.13.58368: Flags [S.], seq 1220632846, ack 1769067183, win 64240, options [mss 1460], length 0
00:41:52.161571 IP 172.40.0.13.58368 > 172.40.0.3.4444: Flags [.], ack 1, win 2880, length 0
00:41:52.163309 IP 172.40.0.13.58368 > 172.40.0.3.4444: Flags [P.], seq 1:4, ack 1, win 2880, length 3
00:41:52.163354 IP 172.40.0.3.4444 > 172.40.0.13.58368: Flags [.], ack 4, win 64237, length 0
00:41:52.164651 IP 172.40.0.13.58368 > 172.40.0.3.4444: Flags [P.], seq 4:13, ack 1, win 2880, length 9
00:41:52.164677 IP 172.40.0.3.4444 > 172.40.0.13.58368: Flags [.], ack 13, win 64228, length 0
00:41:52.166419 IP 172.40.0.13.58368 > 172.40.0.3.4444: Flags [P.], seq 13:15, ack 1, win 2880, length 2
00:41:52.166450 IP 172.40.0.3.4444 > 172.40.0.13.58368: Flags [.], ack 15, win 64226, length 0
00:41:52.169880 IP 172.40.0.13.58368 > 172.40.0.3.4444: Flags [P.], seq 15:23, ack 1, win 2880, length 8
00:41:52.169909 IP 172.40.0.3.4444 > 172.40.0.13.58368: Flags [.], ack 23, win 64218, length 0

So, the TCP connection is still up, MQTT client is waiting for reply which it is not going to get and the socket timeout value is not honored.

Interesingly, when I terminate the Netcat process, the TCP connection goes down:

00:47:09.406795 IP 172.40.0.3.4444 > 172.40.0.13.58368: Flags [F.], seq 1, ack 23, win 64218, length 0
00:47:09.568277 IP 172.40.0.13.58368 > 172.40.0.3.4444: Flags [.], ack 2, win 2879, length 0

yet the MQTT client would not budge - it is still hung.

Terminating the client via Ctrl-C on the serial console, I get this:

Traceback (most recent call last):
  File "code.py", line 117, in <module>
  File "code.py", line 98, in main
  File "adafruit_minimqtt/adafruit_minimqtt.py", line 518, in connect
  File "adafruit_minimqtt/adafruit_minimqtt.py", line 818, in _wait_for_msg
  File "adafruit_minimqtt/adafruit_minimqtt.py", line 907, in _sock_exact_recv
KeyboardInterrupt:

This means the recv_into() called inside _sock_exact_recv() does not return at all even though the socket timeout was set to 1 second:

Hence my initial claim about this being actually a CircuitPython problem.

Now, when I do not start the Netcat process and run the MQTT client code, the TCP connection cannot be established, and the program properly reports Repeated socket failures and exits.

For some other details see https://forums.adafruit.com/viewtopic.php?p=937186#p937186

nice work. will be keeping an eye on this

I adapted the code to run both in CircuitPython and also standard Python environment (it lives on https://github.com/vladak/shield/tree/mqtt_bare) and when running it on Raspbian (64-bit) with Python 3.9.2 against the "rogue" Netcat server, it also hangs.

strace-ing the Python process, it just loops in poll() syscall:

strace: Process 3214351 attached
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=0, tv_nsec=749073324}, NULL, 0) = 0 (Timeout)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=1, tv_nsec=0}, NULL, 0) = 0 (Timeout)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=1, tv_nsec=0}, NULL, 0) = 0 (Timeout)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=1, tv_nsec=0}, NULL, 0) = 0 (Timeout)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=1, tv_nsec=0}, NULL, 0) = 0 (Timeout)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=1, tv_nsec=0}, NULL, 0) = 0 (Timeout)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=1, tv_nsec=0}, NULL, 0) = 0 (Timeout)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=1, tv_nsec=0}, NULL, 0) = 0 (Timeout)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=1, tv_nsec=0}, NULL, 0) = 0 (Timeout)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=1, tv_nsec=0}, NULL, 0) = 0 (Timeout)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=1, tv_nsec=0}, NULL, 0) = 0 (Timeout)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=1, tv_nsec=0}, NULL, 0) = 0 (Timeout)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=1, tv_nsec=0}, NULL, 0) = 0 (Timeout)
ppoll([{fd=3, events=POLLIN}], 1, {tv_sec=1, tv_nsec=0}, NULL, 0) = 0 (Timeout)
...

ad infinitum (15 minutes to be precise :-) however I am fairly confident this will be looping forever, as long as the TCP connection is alive).

On the terminal running the Netcat process I can see the initial MQTT message being accepted:

$ nc -l 4444
MQTTcpy37452

Interrupting the program it produces this:

$ ./code.py 
2311289.08784875: Running with 5.3.3
2311289.092742285: Attempting to connect to MQTT broker 172.40.0.3
^CTraceback (most recent call last):
  File "/home/pi/shield/./code.py", line 128, in <module>
    main()
  File "/home/pi/shield/./code.py", line 110, in main
    mqtt_client.connect()
  File "/home/pi/shield/env/lib/python3.9/site-packages/adafruit_minimqtt/adafruit_minimqtt.py", line 518, in connect
    op = self._wait_for_msg()
  File "/home/pi/shield/env/lib/python3.9/site-packages/adafruit_minimqtt/adafruit_minimqtt.py", line 813, in _wait_for_msg
    res = self._sock_exact_recv(1)
  File "/home/pi/shield/env/lib/python3.9/site-packages/adafruit_minimqtt/adafruit_minimqtt.py", line 907, in _sock_exact_recv
    self._sock.recv_into(rc, bufsize)
KeyboardInterrupt

so this is likely looping in

while True:
op = self._wait_for_msg()
if op == 32:
rc = self._sock_exact_recv(3)
assert rc[0] == 0x02
if rc[2] != 0x00:
raise MMQTTException(CONNACK_ERRORS[rc[2]])
self._is_connected = True
result = rc[0] & 1
if self.on_connect is not None:
self.on_connect(self, self._user_data, result, rc[2])
return result

I confirmed this hypothesis by editing the adafruit_minimqtt.py inside the Python env directory and adding a debug print (print(f"_wait_for_msg() returned {op}")) right after line 518 and sure enough:

$ ./code.py 
2313334.942731167: Running with 5.3.3
2313334.946518151: Attempting to connect to MQTT broker 172.40.0.3
_wait_for_msg() returned None
_wait_for_msg() returned None
_wait_for_msg() returned None
_wait_for_msg() returned None
_wait_for_msg() returned None
_wait_for_msg() returned None
_wait_for_msg() returned None
_wait_for_msg() returned None
...

The frequency of these debug prints correlates with the timeout values of the poll() syscall observed with strace.

So, this while True cycle needs to employ some timeout. That said, it is still a question whether the code in CircuitPython environment also loops like this.

This sort of while True pattern is also present in other functions like publish(), subscribe(), unsubcribe(). All of these need to be fixed.