eclipse-paho/paho.mqtt.python

Negative inflight messages counter with QoS 2 for subscribe and publish

Opened this issue · 1 comments

Python 3.9, Windows 10
paho-mqtt 1.6.1

The client._inflight_messages counter of the paho client is negative and decreasing with QoS 2 for publish and subscribe on the same topic. Other combinations of the QoS levels will not show the issue.

The log seems to indicate that the published messages are transfered successfully:

DEBUG:__main__:Sending PUBREC (Mid: 4)
DEBUG:__main__:Received PUBREL (Mid: 4)
DEBUG:__main__:received payload 3, mid: 4
DEBUG:__main__: _out_messages 0, _in_messages 0, _inflight_messages -3
DEBUG:__main__:Sending PUBCOMP (Mid: 4)
DEBUG:__main__:Sending PUBLISH (d0, q2, r0, m6), 'b'test'', ... (1 bytes)
DEBUG:__main__:Received PUBREC (Mid: 6)
DEBUG:__main__:Sending PUBREL (Mid: 6)
DEBUG:__main__:Received PUBCOMP (Mid: 6)
DEBUG:__main__:Received PUBLISH (d0, q2, r0, m5), 'test', ...  (1 bytes)
DEBUG:__main__:Sending PUBREC (Mid: 5)
DEBUG:__main__:Received PUBREL (Mid: 5)
DEBUG:__main__:received payload 4, mid: 5
DEBUG:__main__: _out_messages 0, _in_messages 0, _inflight_messages -4
DEBUG:__main__:Sending PUBCOMP (Mid: 5)

Here is a MCVE

import sys
import paho.mqtt.client as mqtt
from time import sleep
import logging

logging.basicConfig(
    level=logging.DEBUG,
    stream=sys.stdout,
)
logger = logging.getLogger(__name__)

MQTT_TOPIC = "test"
MQTT_PUBLISH_QOS = 2
MQTT_SUBSCRIBE_QOS = 2

def on_message(client, userdata, message):

    logger.debug(f"received payload {message.payload.decode('utf-8')}, mid: {message.mid}")
    logger.debug(f" _out_messages {len(client._out_messages)}, _in_messages {len(client._in_messages)}, _inflight_messages {client._inflight_messages}")


mqttc = mqtt.Client(client_id="client")
mqttc.max_inflight_messages_set(20)
mqttc.max_queued_messages_set(0)
mqttc.enable_logger(logger)

mqttc.on_message = on_message
mqttc.connect("localhost", port=1883, keepalive=5)
mqttc.subscribe(MQTT_TOPIC, qos=MQTT_SUBSCRIBE_QOS)
mqttc.loop_start()

n = 0
while True:
    mqttc.publish(MQTT_TOPIC, n, MQTT_PUBLISH_QOS)
    n += 1
    sleep(1.0)

Closed by mistake. Reopening.