vert-x3/vertx-mqtt

MQTT client dies if the broker does not send a CONNACK message

deen13 opened this issue · 5 comments

Version

4.0.0-SNAPSHOT

Context

I encountered an exception which looks suspicious while implementing the reconnect mechanism of our service. I'd like our verticle to retry the initial connection until the broker is up and running which works quiet well if the broker is fast enough. Otherwise, if the broker accepts the TCP connection but does not send a CONNACK message, the connectPromise() is not called and the verticle is stuck.

Reproducer

I've made a simple repository to demonstrate the behaviour.

Steps to reproduce

  1. Start the Verticle without having the broker up and running
  2. Wait until the first connect attempt fails
  3. Start the broker
  4. Wait for the established connection
  5. Publish a message on the test topic

Extra

The reconnect mechanism works if the retry interval is higher and the vernemq has some time to start. I wonder if this is a bug within this library or if the broker does not behave correctly.

vietj commented

do you know if that happens in 3.9 ?

Sorry the version might be misleading. Yes that behaviour is exact the same in 3.9.1.

I've just encountered that the client is also stuck if it retries to connect in the closeHandler immediately after the broker has closed the connection and therefore is within the shutdown process.

Errornous behaviour

The following code example shows our reconnect mechanism without a delay which ends up with a stuck verticle.

Logs

Connection with localhost:1883 established successfully 
Succeeded in deploying verticle 
Received message on topic test
Connection closed... Next retry now.
Connection with localhost:1883 established successfully 

Code Example

client.connectAwait(mqttServerConfig.getInteger("port"), mqttServerConfig.getString("hostname"))
client.subscribeAwait("presence", 2)

client.closeHandler {
    println("Connection closed... Next retry now.")

    client.connect(mqttServerConfig.getInteger("port"), mqttServerConfig.getString("hostname")) { reconResult ->
        if (reconResult.failed()) {
            println("Failed to reconnect")
        } else {
            println("Successfully reconnected")
            client.subscribe("presence", 2) { subResult ->
                if (subResult.succeeded()) {
                    println("Subscribed successfully")
                } else {
                    println("Subscription attempt failed.")
                }
            }
        }
    }
}

Successful behaviour

The following code shows the successful reconnect attempt in case the broker recovered within the thirty seconds which is a race-condition.

Code Example

client.connectAwait(mqttServerConfig.getInteger("port"), mqttServerConfig.getString("hostname"))
client.subscribeAwait("test", 2)

client.closeHandler {
    println("Connection closed... Next retry in 30 seconds.")

    vertx.setTimer(30_000) {
        client.connect(mqttServerConfig.getInteger("port"), mqttServerConfig.getString("hostname")) { reconResult -> 
            if (reconResult.failed()) {
                println("Failed to reconnect")
            } else {
                println("Successfully reconnected")
                client.subscribe("presence", 2) { subResult ->
                if (subResult.succeeded()) {
                    println("Subscribed successfully")
                } else {
                    println("Subscription attempt failed.")
                }
            }
        }
    }
} 

Specification:

If the Client does not receive a CONNACK Packet from the Server within a reasonable amount of time, the Client SHOULD close the Network Connection. A "reasonable" amount of time depends on the type of application and the communications infrastructure.

vietj commented

can you provide an actual reproducer ? I tried to reproduce but could not. For the second case I tried also with master and it works, I added a test for this in f3cab00