vert-x3/vertx-mqtt

API improvements

vietj opened this issue · 0 comments

vietj commented

The current API lacks of usability regarding the correlation of operations.

NOTE: we are using the client word but it applies to any server endpoint.

Subscription

A single publish handler is set that processes all messages, sub/unsub operations are done separately.

We can provide a message handler when a subscription is done that will deliver only the topic messages on the handler on the context defined at the subscription time.

client.publishHandler(s -> {
  System.out.println("There are new message in topic: " + s.topicName());
  System.out.println("Content(as string) of the message: " + s.payload().toString());
  System.out.println("QoS: " + s.qosLevel());
})
  .subscribe("rpi2/temp", 2);

would be replaced by:

Future<Subscription> fut = client.subscribe("rpi2/temp", 2, s -> {
  System.out.println("There are new message in topic: " + s.topicName());
  System.out.println("Content(as string) of the message: " + s.payload().toString());
  System.out.println("QoS: " + s.qosLevel());
});

fut.onSuccess(sub -> {
  // Subscribed
  vertx.setTimer(1000, id -> {
    sub.cancel(); // Unsubscribe
  });
});

Publish completion

Currently publish completion is performed by a two handlers that provides notification of the completion or the expiration of the completion:

client.publishCompletionHandler(id -> {
  // Id of just received PUBACK or PUBCOMP packet
});
client.publishCompletionExpirationHandler(id -> {
  // Expired
});
client.publish("hello", Buffer.buffer("hello"), MqttQoS.EXACTLY_ONCE, false, false);

We can replace this with a per message notification:

Future<PublishResult> fut = client.publish("hello", Buffer.buffer("hello"));

fut.onSuccess(res -> {
  if (res.expired()) {
    // expired
  } else {
    // ack received
  }
}).onFailure(err -> {
  // Something wrong happened before the message could be sent
});