empicano/aiomqtt

Can't recover after connection lost

Closed this issue ยท 13 comments

I can't understand why I can't recover after a connection lost.

import asyncio_mqtt as aiomqtt


client = aiomqtt.Client(
    hostname=broker_address,  # The only non-optional parameter
    port=broker_port
)

# Connect, reconnect, subscribe and check messages
async def mqttConnect():
    interval = 5  # Seconds
    # Run for ever
    while True:
        try:
            async with client as _client:
                async with _client.messages(queue_class=PriorityQueue) as messages:
                    await _client.subscribe("driver/+/speed/set/w")
                    await _client.subscribe("driver/+/cmd/power")
                    await _client.subscribe("driver/+/cmd/control")  
                    async for message in messages:
                        await handle_message(message)

        except aiomqtt.MqttError as e:
            print(f'Connection lost; Reconnecting in {interval} seconds ... {e}')
            await asyncio.sleep(interval)

async def main():
    # Create MQTT connection
    # await client.connect()
    # Create MQTT
    asyncio.create_task(mqttConnect(), name="MQTT")
    # Stay here for ever
    while True:
        await asyncio.sleep(1000)

# start the event loop
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    except KeyboardInterrupt:
        print("Program exiting...")

After restarting my mosquitto server I can't recover:

Connection lost; Reconnecting in 5 seconds ... [code:7] The connection was lost.
Connection lost; Reconnecting in 5 seconds ... [code:7] The connection was lost.
Connection lost; Reconnecting in 5 seconds ... [code:7] The connection was lost.

I've changed the code trying to trace it:

async def mqttConnect():
    interval = 5  # Seconds
    # Run for ever
    while True:
        try:
            print(f"Trying to connect...")
            await client.connect()
            async with client.messages(queue_class=PriorityQueue) as messages:
                await client.subscribe("driver/+/speed/set/w")
                await client.subscribe("driver/+/cmd/power")
                await client.subscribe("driver/+/cmd/control")  
                async for message in messages:
                    await handle_message(message)

        except aiomqtt.MqttError as e:
            print(f'Connection lost; Reconnecting in {interval} seconds ... {e}')
            await asyncio.sleep(interval)
Connection lost; Reconnecting in 5 seconds ... Disconnected during message iteration
Trying to connect...
Connection lost; Reconnecting in 5 seconds ... Disconnected during message iteration
Trying to connect...
Connection lost; Reconnecting in 5 seconds ... Disconnected during message iteration
Trying to connect...
Connection lost; Reconnecting in 5 seconds ... Disconnected during message iteration

Any idea?

Hi thalesmaoa, thanks for opening this issue. :) Let me take a look.

The asyncio_mqtt.Client instance is not reusable. You have to create a new instance for every connection. So do

client = aiomqtt.Client(
    hostname=broker_address,  # The only non-optional parameter
    port=broker_port
)

inside your reconnect loop. That should do the trick. ๐Ÿ‘

I hope that helps! Let me know if it works or not. :)

Best regards,
~Frederik

Thanks @frederikaalund . In my code, client is a global var. I use it inside other functions to publish messages. Is there a workaround?

I try to:

async def mqttConnect():
    interval = 5  # Seconds
    # Run for ever
    while True:
        try:
            global client
            print(f"Trying to connect...")
            print(client)
            
            await client.connect()
            async with client.messages(queue_class=PriorityQueue) as messages:
                await client.subscribe("driver/+/speed/set/w")
                await client.subscribe("driver/+/cmd/power")
                await client.subscribe("driver/+/cmd/control")  
                async for message in messages:
                    await handle_message(message)

        except aiomqtt.MqttError as e:
            print(f'Connection lost; Reconnecting in {interval} seconds ... {e}')
            await asyncio.sleep(interval)
            client = aiomqtt.Client(
                hostname="10.254.0.253",  # The only non-optional parameter
                port=1883,
            )

It does connect again, however, the publish used outside doesn't.

Thanks @frederikaalund . In my code, client is a global var. I use it inside other functions to publish messages. Is there a workaround?

No. :) That's the short answer.

Humm... I need to rewrite my code to pass client var forward.
It's gonna be difficult...

I solve it using this piece of code:

async def mqttConnect():
    global client
    interval = 5  # Seconds
    # Run for ever
    while True:
        try:
            async with client.messages(queue_class=PriorityQueue) as messages:
                await client.subscribe("driver/+/speed/set/w")
                await client.subscribe("driver/+/cmd/power")
                await client.subscribe("driver/+/cmd/control")  
                async for message in messages:
                    await handle_message(message)

        except aiomqtt.MqttError:
            print(f'Connection lost; Reconnecting in {interval} seconds ...')
            try:
                client = aiomqtt.Client(hostname="10.254.1.253",port=1883)
                await client.connect()
            except aiomqtt.MqttError as e:
                print(f"{e}")
                await asyncio.sleep(interval)

For the publish functions inside this file I use:

try:
    await client.publish(f"driver/{x.name}/speed/get", json.dumps({"value": x.speed._get}))
except aiomqtt.MqttError as e:
    print(f"Can't publish driver/{x.name}/speed/get. Error: {e}")

And for my class, inside the other files I use:

    async def publish_mqtt(self, topic, msg):
        try:
            await client.publish(topic, msg, 0)
        except aiomqtt.MqttError as e:
            try:
                await client.connect()
            except:
                pass
            print(f"Can't publish {topic}. Error: {e}")

It is working, but I'm not sure if it should since you stated that wasn't possible to use it as global.

Hi, I found the key to the problem. Just reset the _disconnected attribute of the client to asyncio.Future() after the error is reported.

# Connect, reconnect, subscribe and check messages
async def mqttConnect():
    interval = 5  # Seconds
    # Run for ever
    while True:
        try:
            async with client as _client:
                async with _client.messages() as messages:
                    await _client.subscribe("driver/+/speed/set/w")
                    await _client.subscribe("driver/+/cmd/power")
                    await _client.subscribe("driver/+/cmd/control")
                    async for message in messages:
                        print(message.payload)

        except aiomqtt.MqttError as e:
            client._disconnected = asyncio.Future()
            print(f'Connection lost; Reconnecting in {interval} seconds ... {e}')
            await asyncio.sleep(interval)

The more detailed reason is that after you restart, you will call back the _on_disconnect function to set the result of _disconnected .
https://github.com/sbtinstruments/asyncio-mqtt/blob/511c8c5c6bdd823be3b23f0aca9db69ca14a5e7b/asyncio_mqtt/client.py#L750

The connection can be re-established when looping again, but when you execute to get the message, _disconnected has been set as a result.
And _disconnected is not reset when reconnecting again, so _disconnected error will be triggered.

https://github.com/sbtinstruments/asyncio-mqtt/blob/511c8c5c6bdd823be3b23f0aca9db69ca14a5e7b/asyncio_mqtt/client.py#L647-L649

It worked! Thanks.

Great detective efforts, @vvanglro. :) Indeed, there is a workaround if you modify the internals of asyncio-mqtt (e.g., change/assign a private member like _disconnected).

Just for the record: I want to strongly discourage that anyone modifies the asyncio-mqtt internals. We (the asyncio-mqtt maintainers) can't help you if you do so. The behaviour may break in a future release. We may rename/rearrange the internals in a future release. There will be no warning about it. In other words: "This breaks the warranty". :)

Now, if that is okay with you then go ahead. It may very will be okay for your specific use case. I don't know. Just be warned!

I hope that it makes sense. I comment on this issue for future reference (if anyone lands here via, e.g., a search engine).

The PR is more interesting! I'll have a look at it right away. ๐Ÿ‘

Great detective efforts, @vvanglro. :) Indeed, there is a workaround if you modify the internals of asyncio-mqtt (e.g., change/assign a private member like _disconnected).

Just for the record: I want to strongly discourage that anyone modifies the asyncio-mqtt internals. We (the asyncio-mqtt maintainers) can't help you if you do so. The behaviour may break in a future release. We may rename/rearrange the internals in a future release. There will be no warning about it. In other words: "This breaks the warranty". :)

Now, if that is okay with you then go ahead. It may very will be okay for your specific use case. I don't know. Just be warned!

I hope that it makes sense. I comment on this issue for future reference (if anyone lands here via, e.g., a search engine).

The PR is more interesting! I'll have a look at it right away. ๐Ÿ‘

Yes, it is generally not recommended to modify private properties unless you are familiar with its logic and tested it.
What is modified in the PR is just a normal logical judgment.

Today I have a fault. I got the mosquitto log:

1685387031: New client connected from 172.17.0.1:57404 as nodered_1c4fef16ce462131 (p2, c1, k60, u'admin').
1685436564: Client <unknown> has exceeded timeout, disconnecting.
1685457168: Client <unknown> has exceeded timeout, disconnecting.
1685530324: New connection from 10.254.1.254:64085 on port 1883.
1685530324: New client connected from 10.254.1.254:64085 as mqttx_a64f354a (p2, c1, k60, u'admin').

At 1685436564 and 1685457168 mosquitto disconnected me (not sure why). After that, connection was reestablished at 1685530324. At 1685530324 I've entered to check if everything was ok. The program was publishing as expected, however, subscribe was not working.

@vvanglro , I'm not familiar with your reply to @frederikaalund , but you can see any reason for that? Also, I thought that paho would assign a client_id automatically, but from the log, seems not. I should assign it, correct?

client_id = "python_" + str(random.random(0,1000)) 
skewty commented

Great detective efforts, @vvanglro. :) Indeed, there is a workaround if you modify the internals of asyncio-mqtt (e.g., change/assign a private member like _disconnected).

A single underscore is for "protected" (okay for subclasses to modify) where as a double underscore is private (subclasses should not modify -- in most languages they cannot -- but in python we can albeit using a mangled attribute name).

That clarification made; I believe the developers of this project are unwilling to subclass the Paho synchronous client and make use of the protected values it provides.

@vvanglro , I'm not familiar with your reply to @frederikaalund , but you can see any reason for that? Also, I thought that paho would assign a client_id automatically, but from the log, seems not. I should assign it, correct?

Yes, it's a good idea to manually assign a client ID so that you can "resume" a session. Otherwise, the broker doesn't know that it's the same client that connects again. Make sure that it's the same client ID (and not a random one for each connection).

In general, though, if you start to modify the internals (e.g., stuff like _disconnected) then this breaks the warranty. :) This is what skewty points out as well. ๐Ÿ‘

A single underscore is for "protected" (okay for subclasses to modify) where as a double underscore is private (subclasses should not modify -- in most languages they cannot -- but in python we can albeit using a mangled attribute name).

That clarification made; I believe the developers of this project are unwilling to subclass the Paho synchronous client and make use of the protected values it provides.

In general I agree. However, when the protected members are those of a 3rd party (e.g., paho) class, it's more complicated. paho can change any of its protected members without warning (because they are not part of the public API). That's difficult to maintain and that's why I steer strongly away from this practice. :)