Threads being implemented in the background??
dluckey opened this issue · 0 comments
dluckey commented
Hi Everyone,
I have a simple object to encapsulate some details and a coroutine that gets added to the event loop for testing as follows:
MQTTStream
import logging
from amqtt.client import MQTTClient, ClientException
from amqtt.mqtt.constants import QOS_0
from amqtt.session import IncomingApplicationMessage as IncomingMessage
class MQTTStream(object):
address_prefix = "mqtt://"
config = {
"keep_alive": 60,
"reconnect_max_interval": 5,
"reconnect_retries": 5,
"ping_delay": 2,
}
def __init__(self, conn_details):
self.logger = logging.getLogger(__name__)
self.dev_uid = conn_details.get("dev-uid", "")
self.name = conn_details.get("name", "")
self.type = conn_details.get("type", "")
self.username = conn_details.get("username", "")
self.password = conn_details.get("password", "")
self.port = conn_details.get("port", 1883)
self.host_address = conn_details.get("host_address", "")
self.topic = conn_details.get("topic", "")
if self.username and self.password:
self.client_address = MQTTStream.address_prefix + self.username + ":" + self.password + "@" + self.host_address + '/:' + str(self.port)
elif self.username:
self.client_address = MQTTStream.address_prefix + self.username + "@" + self.host_address + '/:' + str(self.port)
else:
self.client_address = MQTTStream.address_prefix + self.host_address + '/:' + str(self.port)
self.client = None
async def mqtt_function(self):
self.client = MQTTClient(client_id=self.name.upper(), config=MQTTStream.config)
try:
await self.client.connect(self.client_address)
self.logger.info("Connected")
except Exception as e:
self.logger.info("Error:" + str(e))
await self.client.subscribe([(self.topic, QOS_0)])
self.logger.info("Subscribed")
while True:
self.logger.info("Listening %s" % self.name)
message: IncomingMessage = await self.client.deliver_message()
self.logger.info("%s, %s" % (self.name, message))
However when I create several instances with some dummy details(5) using the following:
Runner
import asyncio
from classes import MQTTStream
ttn_conn = {...} # Details as required in above code 'config'
cat_conn = {...}
cat0_conn = {...}
cat1_conn = {...}
cat2_conn = {...}
ttn = MQTTStream(ttn_conn)
cat = MQTTStream(cat_conn)
cat0 = MQTTStream(cat0_conn)
cat1 = MQTTStream(cat1_conn)
cat2 = MQTTStream(cat2_conn)
loop = asyncio.new_event_loop()
tasks = {}
tasks[ttn.dev_uid] = loop.create_task(ttn.mqtt_function())
tasks[cat.dev_uid] = loop.create_task(cat.mqtt_function())
tasks[cat0.dev_uid] = loop.create_task(cat0.mqtt_function())
tasks[cat1.dev_uid] = loop.create_task(cat1.mqtt_function())
tasks[cat2.dev_uid] = loop.create_task(cat2.mqtt_function())
loop_thread.run_forever()
I get htop reporting the main python process and 5 user threads.
Everything else seems to work correctly and I get messages as expected for each Client/Connection, just perplexed as to why the user threads are also there.
I did do a little redacting to the above code but it shouldn't be anything that would alter functionality
Has anyone got any idea whether this is an issue or am I(hopefully) missing something?
I am using:
Python 3.8.14
Ubuntu 18.04
PyCharm