empicano/aiomqtt

Reconnect and shared connection

YraganTron opened this issue Β· 8 comments

I have the following code example which I am trying to get to work.
The point is that I have two coroutines that I run on a startup event in fastapi, one should consume messages, and the other should produce.
It is not possible to use the latest version of fastapi with lifespan now (although how to ensure reconnect is still a question there). Also I need to provide
reconnect so that any crash does not stop the application.
I'm trying to run the following piece of code and it doesn't work.

import uvicorn
import asyncio

from fastapi import FastAPI
from aiomqtt import Client, MqttError

from app.config import settings

def create_app():
    app = FastAPI()
    client = Client(hostname=settings.mqtt_host, port=settings.mqtt_port)
    Test(client=client).init_app(app)
    return app

async def _handle_mqtt_msg(message):
    print(message)


async def consume_mqtt(client):
    reconnect_interval = 5
    while True:
        try:
            await client.__aenter__()
            async with client.messages() as messages:
                await client.subscribe('vehicles/v1/+/events/telemetry')
                async for message in messages:
                    await _handle_mqtt_msg(message=message)
        except MqttError as e:
            await client.__aexit__(None, None, None)
            await asyncio.sleep(reconnect_interval)


async def publish_some_messages(client):
    await client.publish(topic='test', payload='test')

class Test:
    def __init__(self, client):
        self.client = client
    def init_app(self, app):
        @app.on_event('startup')
        async def startup():
            loop = asyncio.get_running_loop()
            loop.create_task(consume_mqtt(self.client), name='blabla')
            loop.create_task(publish_some_messages(self.client))


if __name__ == '__main__':
    port = settings.port
    uvicorn.run(app=create_app(), port=port, access_log=False)
future: <Task finished name='Task-4' coro=<publish_some_messages() done, defined at /Users/yragantron/evo_projects/message-processor/./main.py:35> exception=MqttCodeError('Could not publish message')>
Traceback (most recent call last):
  File "/Users/yragantron/evo_projects/message-processor/./main.py", line 36, in publish_some_messages
    await client.publish(topic='test', payload='test')
  File "/Users/yragantron/Library/Caches/pypoetry/virtualenvs/message-proccesor-yNLPJJ9m-py3.10/lib/python3.10/site-packages/aiomqtt/client.py", line 117, in decorated
    return await method(self, *args, **kwargs)
  File "/Users/yragantron/Library/Caches/pypoetry/virtualenvs/message-proccesor-yNLPJJ9m-py3.10/lib/python3.10/site-packages/aiomqtt/client.py", line 514, in publish
    raise MqttCodeError(info.rc, "Could not publish message")
aiomqtt.error.MqttCodeError: [code:4] The client is not currently connected.

Surely this is some kind of common use case, but for some reason I don’t see a solution. Can you tell me how to run this code

Hi Arseny,

Thank you for the reproducible example πŸ‘

The code does not correctly initialize the client object. Specifically, you need to call __aenter__ already inside the startup event. Tom has some good examples in issue #230 that are related.

how to ensure reconnect is still a question there

Very true; that's a nice opportunity for a self-contained first contribution to update the docs on that πŸ˜‰

Hi, I ran and debugged the example, Client initialization is placed in the event, otherwise different event loop errors will be generated, sleep for 0.5s to switch to connect. This way is not recommended.

I'm still thinking about how to gracefully handle the two scenarios of receiving messages in an infinite loop and sending messages through routing in the web framework.

import uvicorn
import asyncio

from fastapi import FastAPI
from aiomqtt import Client, MqttError


def create_app():
    app = FastAPI()

    @app.on_event('startup')
    async def startup():
        loop = asyncio.get_running_loop()
+      client = Client(hostname="localhost")
        loop.create_task(consume_mqtt(client), name='blabla')
+      await asyncio.sleep(0.5)
        loop.create_task(publish_some_messages(client))
    return app


async def _handle_mqtt_msg(message):
    print(message.payload)


async def consume_mqtt(client):
    reconnect_interval = 5
    while True:
        try:
            await client.__aenter__()
            async with client.messages() as messages:
                await client.subscribe('test')
                await client.publish(topic='test', payload='test')
                async for message in messages:
                    await _handle_mqtt_msg(message=message)
        except MqttError as e:
            print(e)
            await client.__aexit__(None, None, None)
            await asyncio.sleep(reconnect_interval)


async def publish_some_messages(client):
    await client.publish(topic='test', payload='test')

if __name__ == '__main__':

    uvicorn.run(app=create_app(), access_log=False)

Seems like a worry-free solution, The disadvantage is if routing is not restricted, concurrency will lead to too many connections.

import uvicorn
import asyncio

from fastapi import FastAPI
from aiomqtt import Client, MqttError, ProtocolVersion

app = FastAPI()


@app.on_event('startup')
async def startup():
    asyncio.create_task(consume_mqtt())


@app.on_event("shutdown")
async def shutdown():
    ...


async def consume_mqtt():
    reconnect_interval = 5
    while True:
        try:
            async with Client(hostname="localhost") as client:
                await client.subscribe('vehicles/v1')
                await client.publish(topic='vehicles/v1', payload='test')
                async with client.messages() as messages:
                    async for message in messages:
                        await asyncio.sleep(0.1)
                        print(message.payload)
        except MqttError as e:
            print(e)
            await asyncio.sleep(reconnect_interval)


@app.get("/qwer")
async def q():
    async with Client(hostname="localhost") as client:
        await client.publish(topic='vehicles/v1', payload='test')

    return "success"

if __name__ == '__main__':
    uvicorn.run(app=app, access_log=False)

I also have this issue where I have a consumer and producer task and At first I got aiomqtt.error.MqttReentrantError: The client context manager is reusable, but not reentrant. and realized this was from me defining a second async with client within one of my tasks, but the producer and consumer both still worked fine.

However, in attempting to correct this error and removing the redundant async with client I get the same error as the author.
I am not (yet) using FastAPI with my service but the intention was to do so in future so I am trying to follow this pattern in anticipation.

Your proposed solution @vvanglro would not work for me as I am connecting to IoT Core which only allows a single connection per Client ID, so I wish to re-use the same Client and therefore client ID for both subscribe and publish operations.

I'm using aiomqtt for the first time today and am also fairly new to asyncio in general so I may be doing something wrong, but I will just have to use the redundant context manager for now until I can perhaps provide more detail/reproduction steps in addition to the author's.

Hi Dylan,

Maybe the documentation on sharing the connection might help you? From what you wrote I don't believe your problem has the same root cause as this issue. We also have some information specifically on aiomqtt with FastAPI in case you haven't seen that, yet. Let me know if that helps πŸ˜‹

I was following your Sharing the connection doc already, but with the added complexity of a TaskGroup.

Attached is my reproduction of the issue, cut down as much as I could before losing all semblance of representing my actual code. The jist is that I have two classes, one for handling messages to and from an MQTT broker and another for to and from RabbitMQ. That is why I have a TaskGroup, to subscribe to both brokers and forward messages from one to the other. I've mocked out the MQ side in this base case.

Thankfully, I think I found the issue. (Or at least the most visible one. I know not what other sins I may be committing with your library. For one I should probably now move the retry loop to main() to go along with the context manager.)

Running the code as-is I get an error on publishing that the client isn't connected, however, simply switching the ordering of the context managers, the TaskGroup and mqtt_client in main(), that solves my issue! Not sure if this is expected or not.

My issue may now be unrelated to the original author's issue, but I along with them and perhaps others could probably benefit from some documentation on what is the best practice for this kind of two-way communications scenario, or any potential gotchas. The sharing the connection demo is just a simple fire and forget two publish calls, not a long-running example encompassing all your guides like for handling Reconnection or parallel processing. I guess I had issues merging them all together myself.

I thought my code was close enough to your Listening without blocking example, as my TaskGroup is on the outset as well, but perhaps this has some gotchas when trying to share the connection as well. Or just let me know this was some asyncio rookie mistake and I can go kick rocks. πŸ˜‚

(Edit for syntax highlighting.)

import asyncio
import logging
from asyncio import sleep as AsyncSleep
from functools import partial
from time import time
from typing import Any, Awaitable, Callable

import aiomqtt
from aiomqtt import Client as MQTTClient
from aiomqtt import MqttError
from aiomqtt.client import Message as MQTTMessage
from aiomqtt.types import PayloadType

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


class MqttDao:
    def __init__(self):
        self.client = aiomqtt.Client("test.mosquitto.org")


    async def start(self, client: aiomqtt.Client, callback: Callable[[MQTTMessage], Awaitable[Any]]):
        retry_delay = 0 - 5
        while True:
            try:
                # FIXME - This redundant context manager is illegal: https://sbtinstruments.github.io/aiomqtt/reconnection.html#reconnection
                # async with client:
                async with client.messages() as messages:
                    logger.info('[start] Subscribing to topics temperature/outside and humidity/inside...')

                    await client.subscribe('temperature/outside', qos=1)
                    await client.subscribe('humidity/inside', qos=1)

                    retry_delay = 0 - 5

                    async for message in messages:
                        logger.debug(f'[start] Invoking callback for message from topic {message.topic}')
                        await callback(message)
            except MqttError:
                logger.exception('[start] An MqttError occured...')
                if retry_delay <= 30:
                    retry_delay += 5
                logger.info(f"[start] Reconnecting to MQTT broker in {retry_delay} seconds ...")
                await AsyncSleep(retry_delay)
            except Exception as e:
                logger.exception('[start] An unknown exception occurred...')
                raise e


    # Publishes a given payload to our MQTT broker with the given topic and QoS level
    async def publish_message(self, client: MQTTClient, topic: str, payload: PayloadType, qos: int):
        logger.debug(f"[publish_message] Publishing MQTT message to topic '{topic}' with qos {qos}: {payload}")
        await client.publish(topic=topic, payload=payload, qos=qos)
        logger.debug(f"[publish_message] Successfully published MQTT message to topic '{topic}' with qos {qos}: {payload}")


class MqDao:
    def __init__(self):
        pass


    async def start(self, callback: Callable[[MqttDao, MQTTClient, dict], Awaitable[Any]], mqtt_dao: MqttDao, mqtt_client: MQTTClient):
        while True:
            logger.info('[start] Sleep for 2 seconds instead of waiting on a message from RabbitMQ...')
            await AsyncSleep(2)
            await callback(mqtt_dao, mqtt_client, {'body': 'Mock message from mock broker!', 'routing_key': 'humidity.inside'})


    async def publish_message(self, body: bytes, routing_key: str, timestamp: float = time()):
        logger.info('[publish_message] Sleep for 0.25 seconds instead of actually sending a message to RabbitMQ...')
        await AsyncSleep(0.25)


# Callback to be invoked by MQTT client responsible for converting MQTT messages into MQ Messages to be published
async def inbound_consumer(mq_dao: MqDao, mqtt_message: MQTTMessage):
    payload: bytes = mqtt_message.payload  # type: ignore - Our payloads are bytes not floats
    routing_key = mqtt_message.topic.value.replace('/', '.')  # Slash delimiters in MQTT are dots in AMQP
    await mq_dao.publish_message(body=payload, routing_key=routing_key)


# Callback to be invoked by AMQP client responsible for converting MQTT messages into MQ Messages to be published
async def outbound_consumer(mqtt_dao: MqttDao, mqtt_client: MQTTClient, amqp_message: dict):
    logger.debug(f"[outbound_consumer] Received MQ message with routing key '{amqp_message['routing_key']}': {amqp_message['body']}")
    topic = amqp_message['routing_key'].replace('.', '/')  # type: ignore - We will always have a routing key defined
    await mqtt_dao.publish_message(client=mqtt_client, topic=topic, payload=amqp_message['body'], qos=1)


def init():
    mqtt_dao = MqttDao()
    mq_dao = MqDao()

    return (mqtt_dao, mq_dao)


async def main():
    (mqtt_dao, mq_dao) = init()

    try:
        async with asyncio.TaskGroup() as tg:
            async with mqtt_dao.client as mqtt_client:
                inbound_producer_task = tg.create_task(mqtt_dao.start(mqtt_client, partial(inbound_consumer, mq_dao)))
                outbound_consumer_task = tg.create_task(mq_dao.start(outbound_consumer, mqtt_dao, mqtt_client))
        logger.info('[main] Producer and consumer tasks have now finished: '
                    + f'{inbound_producer_task.result()}, {outbound_consumer_task.result()}')
    except (asyncio.CancelledError, Exception) as e:
        logger.exception('[main] An error occured during the consumer TaskGroup.')
        raise e


asyncio.run(main())

That's a fun one πŸ˜„ I simlified your example down to this, which still gives the same error, and still let's us switch the TaskGroup line with the async with client line to fix it:

import asyncio
import aiomqtt


class ABC:
    async def start(self, client):
        await asyncio.sleep(1)
        await client.publish(topic='foo')


class XYZ:
    async def start(self, client):
        await asyncio.sleep(1)
        await client.publish(topic='bar')


async def main():
    client = aiomqtt.Client("test.mosquitto.org")
    async with asyncio.TaskGroup() as tg:
        async with client:
            task_1 = tg.create_task(ABC().start(client))
            task_2 = tg.create_task(XYZ().start(client))


asyncio.run(main())

The asyncio documentation describes a TaskGroup as "An asynchronous context manager holding a group of tasks. Tasks can be added to the group using create_task(). All tasks are awaited when the context manager exits." (emphasis mine)

The problem here is that both task_1 and task_2 are only started inside the Client context manager, but they're not awaited inside its context, they're awaited when the TaskGroup exits.

So what happens is something like:

  1. Enter TaskGroup context
  2. Enter Client context (connect to broker)
  3. Start the two publish tasks
  4. Exit Client context (disconnect from broker)
  5. Exit TaskGroup context (await the two publish tasks, which now find themselves with a disconnected client)

Does that make sense? I hope that also explains why switching the two with statements fixes the error πŸ˜‹


... could probably benefit from some documentation on what is the best practice for this kind of two-way communications scenario

This is out of scope for our docs in my opinion. We're explicitely focusing on small examples to keep them understandable and to cover the most amount of use cases.

Ahhhhh, I believe that makes sense to me! You're a legend for this response.

If I understand correctly from your explanation and some additional reading, what you're saying is that the client connects when entering its context, but since nothing is being awaited inside its context (because no awaits occur until reaching the end of the outer context, the TaskGroup), the code is able to progress to the end of the client's context manager, which triggers a disconnect.

Context managers are cool, but I've never coded one before myself. Looking at the aiomqtt source code for the magic aenter/aexit functions helped me understand further. It's nice to be back to Python development so I can learn how async is done here after only doing it myself with Typescript for several years.

I agree that this seems beyond the scope of the docs, and hope your excellent response may help another confused individual in future digging through Issues like me. πŸ˜„