njouanin/hbmqtt

Call publish from a different thread

Opened this issue · 3 comments

Hello,

I have issues with mqtt_client.publish_json when it is called from a different thread:


class SomeApp:
    # will be called with a loop using asyncio
    def __init__(self, client_id, connection_string):
        #Init and connect MqttClient

    # Called from a different thread without any loop
    def __on_event(self, ev):
        asyncio.run(self.client.publish(topic, msg))

This is working but sometimes get the error the following error:

Future <Future pending> attached to a different loop

I tried the following:

    def __on_event(self, ev):
        loop = asyncio.new_event_loop()
        loop.run_until_complete(mqtt_client.publish(topic, msg))

but the same error occurs sometimes.

If I'm using

    def __on_event(self, topic, msg):
        loop = self.running_publish_loop # This is a running loop in a separate thread only for publishing
        loop.run_coroutine_threadsafe(mqtt_client.publish(topic, msg))

then it results sometimes in an Assertion error:

File "...\AppData\Local\Programs\Python\Python38-32\lib\asyncio\proactor_events.py", line 375, in _loop_writing
    assert f is self._write_fut

Now, I don't know how to call publish() from a different thread without facing these issues. Could someone help me on this topic?

Many thanks in advance

This library is meant to be used with asyncio and is not thread safe. So this is expected.

It is generally not a good idea to mix threading and asyncio. So to fix your issue please look into asyncio how to use it to replace your threading needs.

have you found the way to solve this problem? i have the same confusion with fastapi websocket @JochenKienzle

@streamsunshine
Unfortunately, no. We changed our approach to avoid this issue.