Can't recover after connection lost
Closed this issue ยท 13 comments
I can't understand why I can't recover after a connection lost.
import asyncio_mqtt as aiomqtt
client = aiomqtt.Client(
hostname=broker_address, # The only non-optional parameter
port=broker_port
)
# Connect, reconnect, subscribe and check messages
async def mqttConnect():
interval = 5 # Seconds
# Run for ever
while True:
try:
async with client as _client:
async with _client.messages(queue_class=PriorityQueue) as messages:
await _client.subscribe("driver/+/speed/set/w")
await _client.subscribe("driver/+/cmd/power")
await _client.subscribe("driver/+/cmd/control")
async for message in messages:
await handle_message(message)
except aiomqtt.MqttError as e:
print(f'Connection lost; Reconnecting in {interval} seconds ... {e}')
await asyncio.sleep(interval)
async def main():
# Create MQTT connection
# await client.connect()
# Create MQTT
asyncio.create_task(mqttConnect(), name="MQTT")
# Stay here for ever
while True:
await asyncio.sleep(1000)
# start the event loop
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print("Program exiting...")
After restarting my mosquitto server I can't recover:
Connection lost; Reconnecting in 5 seconds ... [code:7] The connection was lost.
Connection lost; Reconnecting in 5 seconds ... [code:7] The connection was lost.
Connection lost; Reconnecting in 5 seconds ... [code:7] The connection was lost.
I've changed the code trying to trace it:
async def mqttConnect():
interval = 5 # Seconds
# Run for ever
while True:
try:
print(f"Trying to connect...")
await client.connect()
async with client.messages(queue_class=PriorityQueue) as messages:
await client.subscribe("driver/+/speed/set/w")
await client.subscribe("driver/+/cmd/power")
await client.subscribe("driver/+/cmd/control")
async for message in messages:
await handle_message(message)
except aiomqtt.MqttError as e:
print(f'Connection lost; Reconnecting in {interval} seconds ... {e}')
await asyncio.sleep(interval)
Connection lost; Reconnecting in 5 seconds ... Disconnected during message iteration
Trying to connect...
Connection lost; Reconnecting in 5 seconds ... Disconnected during message iteration
Trying to connect...
Connection lost; Reconnecting in 5 seconds ... Disconnected during message iteration
Trying to connect...
Connection lost; Reconnecting in 5 seconds ... Disconnected during message iteration
Any idea?
Hi thalesmaoa, thanks for opening this issue. :) Let me take a look.
The asyncio_mqtt.Client
instance is not reusable. You have to create a new instance for every connection. So do
client = aiomqtt.Client(
hostname=broker_address, # The only non-optional parameter
port=broker_port
)
inside your reconnect loop. That should do the trick. ๐
I hope that helps! Let me know if it works or not. :)
Best regards,
~Frederik
Thanks @frederikaalund . In my code, client is a global var. I use it inside other functions to publish messages. Is there a workaround?
I try to:
async def mqttConnect():
interval = 5 # Seconds
# Run for ever
while True:
try:
global client
print(f"Trying to connect...")
print(client)
await client.connect()
async with client.messages(queue_class=PriorityQueue) as messages:
await client.subscribe("driver/+/speed/set/w")
await client.subscribe("driver/+/cmd/power")
await client.subscribe("driver/+/cmd/control")
async for message in messages:
await handle_message(message)
except aiomqtt.MqttError as e:
print(f'Connection lost; Reconnecting in {interval} seconds ... {e}')
await asyncio.sleep(interval)
client = aiomqtt.Client(
hostname="10.254.0.253", # The only non-optional parameter
port=1883,
)
It does connect again, however, the publish used outside doesn't.
Thanks @frederikaalund . In my code, client is a global var. I use it inside other functions to publish messages. Is there a workaround?
No. :) That's the short answer.
Humm... I need to rewrite my code to pass client var forward.
It's gonna be difficult...
I solve it using this piece of code:
async def mqttConnect():
global client
interval = 5 # Seconds
# Run for ever
while True:
try:
async with client.messages(queue_class=PriorityQueue) as messages:
await client.subscribe("driver/+/speed/set/w")
await client.subscribe("driver/+/cmd/power")
await client.subscribe("driver/+/cmd/control")
async for message in messages:
await handle_message(message)
except aiomqtt.MqttError:
print(f'Connection lost; Reconnecting in {interval} seconds ...')
try:
client = aiomqtt.Client(hostname="10.254.1.253",port=1883)
await client.connect()
except aiomqtt.MqttError as e:
print(f"{e}")
await asyncio.sleep(interval)
For the publish functions inside this file I use:
try:
await client.publish(f"driver/{x.name}/speed/get", json.dumps({"value": x.speed._get}))
except aiomqtt.MqttError as e:
print(f"Can't publish driver/{x.name}/speed/get. Error: {e}")
And for my class, inside the other files I use:
async def publish_mqtt(self, topic, msg):
try:
await client.publish(topic, msg, 0)
except aiomqtt.MqttError as e:
try:
await client.connect()
except:
pass
print(f"Can't publish {topic}. Error: {e}")
It is working, but I'm not sure if it should since you stated that wasn't possible to use it as global.
Hi, I found the key to the problem. Just reset the _disconnected
attribute of the client
to asyncio.Future()
after the error is reported.
# Connect, reconnect, subscribe and check messages
async def mqttConnect():
interval = 5 # Seconds
# Run for ever
while True:
try:
async with client as _client:
async with _client.messages() as messages:
await _client.subscribe("driver/+/speed/set/w")
await _client.subscribe("driver/+/cmd/power")
await _client.subscribe("driver/+/cmd/control")
async for message in messages:
print(message.payload)
except aiomqtt.MqttError as e:
client._disconnected = asyncio.Future()
print(f'Connection lost; Reconnecting in {interval} seconds ... {e}')
await asyncio.sleep(interval)
The more detailed reason is that after you restart, you will call back the _on_disconnect
function to set the result of _disconnected
.
https://github.com/sbtinstruments/asyncio-mqtt/blob/511c8c5c6bdd823be3b23f0aca9db69ca14a5e7b/asyncio_mqtt/client.py#L750
The connection can be re-established when looping again, but when you execute to get the message, _disconnected
has been set as a result.
And _disconnected
is not reset when reconnecting again, so _disconnected
error will be triggered.
It worked! Thanks.
Great detective efforts, @vvanglro. :) Indeed, there is a workaround if you modify the internals of asyncio-mqtt (e.g., change/assign a private member like _disconnected
).
Just for the record: I want to strongly discourage that anyone modifies the asyncio-mqtt internals. We (the asyncio-mqtt maintainers) can't help you if you do so. The behaviour may break in a future release. We may rename/rearrange the internals in a future release. There will be no warning about it. In other words: "This breaks the warranty". :)
Now, if that is okay with you then go ahead. It may very will be okay for your specific use case. I don't know. Just be warned!
I hope that it makes sense. I comment on this issue for future reference (if anyone lands here via, e.g., a search engine).
The PR is more interesting! I'll have a look at it right away. ๐
Great detective efforts, @vvanglro. :) Indeed, there is a workaround if you modify the internals of asyncio-mqtt (e.g., change/assign a private member like
_disconnected
).Just for the record: I want to strongly discourage that anyone modifies the asyncio-mqtt internals. We (the asyncio-mqtt maintainers) can't help you if you do so. The behaviour may break in a future release. We may rename/rearrange the internals in a future release. There will be no warning about it. In other words: "This breaks the warranty". :)
Now, if that is okay with you then go ahead. It may very will be okay for your specific use case. I don't know. Just be warned!
I hope that it makes sense. I comment on this issue for future reference (if anyone lands here via, e.g., a search engine).
The PR is more interesting! I'll have a look at it right away. ๐
Yes, it is generally not recommended to modify private properties unless you are familiar with its logic and tested it.
What is modified in the PR is just a normal logical judgment.
Today I have a fault. I got the mosquitto log:
1685387031: New client connected from 172.17.0.1:57404 as nodered_1c4fef16ce462131 (p2, c1, k60, u'admin').
1685436564: Client <unknown> has exceeded timeout, disconnecting.
1685457168: Client <unknown> has exceeded timeout, disconnecting.
1685530324: New connection from 10.254.1.254:64085 on port 1883.
1685530324: New client connected from 10.254.1.254:64085 as mqttx_a64f354a (p2, c1, k60, u'admin').
At 1685436564
and 1685457168
mosquitto disconnected me (not sure why). After that, connection was reestablished at 1685530324
. At 1685530324
I've entered to check if everything was ok. The program was publishing as expected, however, subscribe was not working.
@vvanglro , I'm not familiar with your reply to @frederikaalund , but you can see any reason for that? Also, I thought that paho would assign a client_id automatically, but from the log, seems not. I should assign it, correct?
client_id = "python_" + str(random.random(0,1000))
Great detective efforts, @vvanglro. :) Indeed, there is a workaround if you modify the internals of asyncio-mqtt (e.g., change/assign a private member like
_disconnected
).
A single underscore is for "protected" (okay for subclasses to modify) where as a double underscore is private (subclasses should not modify -- in most languages they cannot -- but in python we can albeit using a mangled attribute name).
That clarification made; I believe the developers of this project are unwilling to subclass the Paho synchronous client and make use of the protected values it provides.
@vvanglro , I'm not familiar with your reply to @frederikaalund , but you can see any reason for that? Also, I thought that paho would assign a client_id automatically, but from the log, seems not. I should assign it, correct?
Yes, it's a good idea to manually assign a client ID so that you can "resume" a session. Otherwise, the broker doesn't know that it's the same client that connects again. Make sure that it's the same client ID (and not a random one for each connection).
In general, though, if you start to modify the internals (e.g., stuff like _disconnected
) then this breaks the warranty. :) This is what skewty points out as well. ๐
A single underscore is for "protected" (okay for subclasses to modify) where as a double underscore is private (subclasses should not modify -- in most languages they cannot -- but in python we can albeit using a mangled attribute name).
That clarification made; I believe the developers of this project are unwilling to subclass the Paho synchronous client and make use of the protected values it provides.
In general I agree. However, when the protected members are those of a 3rd party (e.g., paho) class, it's more complicated. paho can change any of its protected members without warning (because they are not part of the public API). That's difficult to maintain and that's why I steer strongly away from this practice. :)