empicano/aiomqtt

docs/help: how to run with `uvicorn` alongside FastAPI

Closed this issue ยท 5 comments

The docs should be enough for me to figure this out, but I haven't managed.

The side-by-side with web frameworks section in the docs shows how to set up a lifespan event with FastAPI. Good, fine.

Then listening without blocking shows how to combine an async listener with a another generic async task. Also good.

I'm not really sure how to combine these in the context of FastAPI + uvicorn, where uvicorn wants to run the FastAPI app itself and handle the async loop internally.

# main.py
from fastapi import FastAPI
from contextlib import asynccontextmanager

mqtt_client = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global mqtt_client
    async with aiomqtt.Client('host') as client:
        mqtt_client = client
        yield

app = FastAPI(
    lifespan=lifespan,
)

then with the uvicorn CLI or:

# run.py

from main import app

if __name__ == "__main__":
	uvicorn.run(app)

Awaiting listen() (from "listening without blocking") like this doesn't seem to work at all (just blocks on server.serve())

# run.py also
import uvicorn
import asyncio
from main import app, mqtt_client

async def main():
    config = uvicorn.Config(app)
        server = uvicorn.Server(config)
        await server.serve()
        await listen(mqtt_client)

if __name__ == "__main__":
    asyncio.run(main())

It's late and I'll take another crack at this tomorrow, but perhaps someone has a simple pattern that will make this sensible to me. Thanks!

Hi Gerrit, thanks for opening this issue. Let me have a look. :)

You already found the relevant docs so that's good. ๐Ÿ‘

Awaiting listen() (from "listening without blocking") like this doesn't seem to work at all (just blocks on server.serve())

That sounds like the culprit. The current code awaits server.serve and then listen(mqtt_client) in sequence. If you want to run both concurrently, try with either an asyncio.TaskGroup (from python 3.11) or manual task handling with asyncio.create_task. Using the modern approach:

# run.py also
import uvicorn
import asyncio
from main import app, mqtt_client

async def main():
    config = uvicorn.Config(app)
    server = uvicorn.Server(config)
    async with asyncio.TaskGroup() as tg:
        tg.create_task(server.serve())
        tg.create_task(listen(mqtt_client))

if __name__ == "__main__":
    asyncio.run(main())

Let me know if that works or if I'm missing something. :)

To add to Frederik's answer, if you want to use the uvicorn CLI, I believe you have to manage the task manually.

Here's a working example, FastAPI side-by-side with MQTT listener and publication on GET /:

import asyncio
import asyncio_mqtt as aiomqtt
import contextlib
import fastapi


async def listen(client):
    async with client.messages() as messages:
        await client.subscribe("humidity/#")
        async for message in messages:
            print(message.payload)


client = None


@contextlib.asynccontextmanager
async def lifespan(app):
    global client
    async with aiomqtt.Client("test.mosquitto.org") as c:
        # Make client globally available
        client = c
        # Start MQTT listener in (unawaited) asyncio task
        loop = asyncio.get_event_loop()
        task = loop.create_task(listen(client))
        yield
        task.cancel()
        # Wait for the MQTT listener task to be cancelled
        try:
            await task
        except asyncio.CancelledError:
            pass


app = fastapi.FastAPI(lifespan=lifespan)


@app.get("/")
async def publish():
    await client.publish("humidity/outside", 0.38)

If you want to use task groups, you have to invoke uvicorn from within the code. I tried around with this but actually didn't get it working. This is what I came up with, somehow the GET / route doesn't have access to the global client object. Maybe I'm not seeing something obvious?

import asyncio
import asyncio_mqtt as aiomqtt
import fastapi
import uvicorn


async def listen(client):
    async with client.messages() as messages:
        await client.subscribe("humidity/#")
        async for message in messages:
            print(message.payload)


client = None
app = fastapi.FastAPI()


@app.get("/")
async def publish():
    await client.publish("humidity/outside", 0.38)


async def main():
    # Initialize uvicorn server
    config = uvicorn.Config("main:app", host="127.0.0.1", port=8000, reload=True)
    server = uvicorn.Server(config)
    global client
    async with aiomqtt.Client("test.mosquitto.org") as c:
        # Make client globally available
        client = c
        # Start uvicorn and MQTT listener inside asyncio task group
        async with asyncio.TaskGroup() as tg:
            tg.create_task(server.serve())
            tg.create_task(listen(client))


if __name__ == "__main__":
    asyncio.run(main())

Also, this second way doesn't (yet?) cancel the listener task on STRG-C and it's not as flexible in that you can't switch out the web server without changing the code.

I've updated the documentation with the more detailed example. If anybody comes up with a better solution to this, or if there are any questions remaining, please let me know ๐Ÿ˜Š

Hey guys thanks so much! I'm also still struggling to figure out passing the client object around; if I create each listener/publisher with its one client, no problem, but if I pass the client object I get a connection error. ๐Ÿคท

I'll follow up here with a simple version of my final solution

Great! If you're stuck, let us know. Or post a minimal example of what doesn't work, always helps to know what people are struggling with to improve the docs ๐Ÿ˜‹