wialon/gmqtt

[RECV EMPTY] Connection will be reset automatically.

Closed this issue · 1 comments

I'm trying to connect to AmazonMQ (Active MQ) broker.
But the connection will close and the whole operation repeats infinitely.
We verified the configuration and were able to connect to the broker with paho client.

Log

INFO:root:Starting server...
DEBUG:asyncio:Get address info b-ba30ea6c-4f58-4370-9b7b-877debbf579b-1.mq.eu-west-1.amazonaws.com:'8883', type=<SocketKind.SOCK_STREAM: 1>
DEBUG:asyncio:Getting address info b-ba30ea6c-4f58-4370-9b7b-877debbf579b-1.mq.eu-west-1.amazonaws.com:'8883', type=<SocketKind.SOCK_STREAM: 1> took 28.175ms: [(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('34.243.165.192', 8883))]
DEBUG:asyncio:<asyncio.sslproto.SSLProtocol object at 0x7f3d999d7ed0> starts SSL handshake
DEBUG:asyncio:<asyncio.sslproto.SSLProtocol object at 0x7f3d999d7ed0>: SSL handshake took 86.8 ms
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
DEBUG:asyncio:<asyncio.TransportSocket fd=6, family=2, type=1, proto=6, laddr=('192.168.30.13', 59658), raddr=('34.243.165.192', 8883)> connected to b-ba30ea6c-4f58-4370-9b7b-877debbf579b-1.mq.eu-west-1.amazonaws.com:'8883': (<asyncio.sslproto._SSLProtocolTransport object at 0x7f3d99a17e30>, <gmqtt.mqtt.protocol.MQTTProtocol object at 0x7f3d99b4d390>)
DEBUG:gmqtt.mqtt.protocol:[RECV EMPTY] Connection will be reset automatically.
INFO:gmqtt.mqtt.protocol:[CONN CLOSE NORMALLY]
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
DEBUG:asyncio:Get address info b-ba30ea6c-4f58-4370-9b7b-877debbf579b-1.mq.eu-west-1.amazonaws.com:'8883', type=<SocketKind.SOCK_STREAM: 1>
DEBUG:asyncio:Getting address info b-ba30ea6c-4f58-4370-9b7b-877debbf579b-1.mq.eu-west-1.amazonaws.com:'8883', type=<SocketKind.SOCK_STREAM: 1> took 27.229ms: [(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('34.243.165.192', 8883))]
DEBUG:asyncio:<asyncio.sslproto.SSLProtocol object at 0x7f3d99a09f10> starts SSL handshake
DEBUG:asyncio:<asyncio.sslproto.SSLProtocol object at 0x7f3d99a09f10>: SSL handshake took 90.3 ms
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
DEBUG:asyncio:<asyncio.TransportSocket fd=6, family=2, type=1, proto=6, laddr=('192.168.30.13', 44436), raddr=('34.243.165.192', 8883)> connected to b-ba30ea6c-4f58-4370-9b7b-877debbf579b-1.mq.eu-west-1.amazonaws.com:'8883': (<asyncio.sslproto._SSLProtocolTransport object at 0x7f3d99a32150>, <gmqtt.mqtt.protocol.MQTTProtocol object at 0x7f3d999b1c50>)
DEBUG:gmqtt.mqtt.protocol:[RECV EMPTY] Connection will be reset automatically.
INFO:gmqtt.mqtt.protocol:[CONN CLOSE NORMALLY]
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
INFO:root:Shutting down the server

gmqtt snippet

    client = Client(client_id=None)  # client ID will be automatic uuid4
    # we need to inject redis client and stream configuration
    # but keep the rest of API compatible with MQTT client
    client.on_message = partial(on_message, redis, config.redis_stream)
    client.on_connect = on_connect
    client.set_auth_credentials(config.broker_username, config.broker_password)
    ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    ssl_context.verify_mode = ssl.CERT_REQUIRED
    ssl_context.set_default_verify_paths()
    await client.connect(host=config.broker_host, port=config.broker_port, ssl=ssl_context)

Paho client snippet

    client = mqtt_client.Client(client_id)
    client.enable_logger(logger=logger)
    client.tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED,
                   tls_version=ssl.PROTOCOL_TLS, ciphers=None)
    client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

Initially, we thought it was a TLS issue, so we experimented with ssl=True and passing context.
From the debugging I found that for some reason transport will close at this if statement: https://github.com/wialon/gmqtt/blob/master/gmqtt/mqtt/protocol.py#L213

Any help or explanation is appreciated.

So, the problem was after all in the version of the MQTT protocol. ActiveMQ doesn't support MQTT 5. Here is the final working version:

await client.connect(host=config.broker_host, port=config.broker_port, ssl=True, version=MQTTv311)