Yakifo/amqtt

Threads being implemented in the background??

dluckey opened this issue · 0 comments

Hi Everyone,

I have a simple object to encapsulate some details and a coroutine that gets added to the event loop for testing as follows:

MQTTStream

import logging
from amqtt.client import MQTTClient, ClientException
from amqtt.mqtt.constants import QOS_0
from amqtt.session import IncomingApplicationMessage as IncomingMessage

class MQTTStream(object):
    address_prefix = "mqtt://"

    config = {
        "keep_alive": 60,
        "reconnect_max_interval": 5,
        "reconnect_retries": 5,
        "ping_delay": 2,
    }

    def __init__(self, conn_details):
        self.logger = logging.getLogger(__name__)
        self.dev_uid = conn_details.get("dev-uid", "")
        self.name = conn_details.get("name", "")
        self.type = conn_details.get("type", "")

        self.username = conn_details.get("username", "")
        self.password = conn_details.get("password", "")
        self.port = conn_details.get("port", 1883)

        self.host_address = conn_details.get("host_address", "")

        self.topic = conn_details.get("topic", "")

        if self.username and self.password:
            self.client_address = MQTTStream.address_prefix + self.username + ":" + self.password + "@" + self.host_address + '/:' + str(self.port)
        elif self.username:
            self.client_address = MQTTStream.address_prefix + self.username + "@" + self.host_address + '/:' + str(self.port)
        else:
            self.client_address = MQTTStream.address_prefix + self.host_address + '/:' + str(self.port)

        self.client = None

    async def mqtt_function(self):
        self.client = MQTTClient(client_id=self.name.upper(), config=MQTTStream.config)
        try:
            await self.client.connect(self.client_address)
            self.logger.info("Connected")
        except Exception as e:
            self.logger.info("Error:" + str(e))

        await self.client.subscribe([(self.topic, QOS_0)])
        self.logger.info("Subscribed")

        while True:
            self.logger.info("Listening %s" % self.name)
            message: IncomingMessage = await self.client.deliver_message()
            self.logger.info("%s, %s" % (self.name, message))

However when I create several instances with some dummy details(5) using the following:

Runner

import asyncio
from classes import MQTTStream

ttn_conn = {...}    #  Details as required in above code 'config'
cat_conn = {...}
cat0_conn = {...}
cat1_conn = {...}
cat2_conn = {...}

ttn = MQTTStream(ttn_conn)
cat = MQTTStream(cat_conn)
cat0 = MQTTStream(cat0_conn)
cat1 = MQTTStream(cat1_conn)
cat2 = MQTTStream(cat2_conn)

loop = asyncio.new_event_loop()
tasks = {}

tasks[ttn.dev_uid] = loop.create_task(ttn.mqtt_function())
tasks[cat.dev_uid] = loop.create_task(cat.mqtt_function())
tasks[cat0.dev_uid] = loop.create_task(cat0.mqtt_function())
tasks[cat1.dev_uid] = loop.create_task(cat1.mqtt_function())
tasks[cat2.dev_uid] = loop.create_task(cat2.mqtt_function())
loop_thread.run_forever()

I get htop reporting the main python process and 5 user threads.
Capture

Everything else seems to work correctly and I get messages as expected for each Client/Connection, just perplexed as to why the user threads are also there.

I did do a little redacting to the above code but it shouldn't be anything that would alter functionality

Has anyone got any idea whether this is an issue or am I(hopefully) missing something?

I am using:
Python 3.8.14
Ubuntu 18.04
PyCharm