How to use subscription identifier - Duplicate messages with overlapping topics
TDYJeffreyDevloo opened this issue · 2 comments
Checklist
- I've searched the project's
issues
. - I've searched the project's
discussions
.
Could not look into the discussions as the link does not seem to work.
❓ Question
MQTT5Client fires the callback multiple times if overlapping subscriptions are used. Mosquitto has an option to disable this (see https://mosquitto.org/man/mosquitto-conf-5.html -- allow_duplicate_messages) but MQTT5 describes the subscription_identifier field that is sent with every message so the client can couple the right callback to the right message.
The hiveMQ client does not expose this as an option directly but it is setting the value automatically if the broker supports it.
A small demo app to demonstrate the issue. I am using Mosquitto 2.0.18 with allow_duplicate_messages=true.
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.reactor.Mqtt5ReactorClient;
import reactor.core.publisher.Mono;
public class Test {
public static void main(String[] args) throws InterruptedException {
Mqtt5ReactorClient serverClient = Mqtt5ReactorClient.from(Mqtt5Client.builder()
.identifier("myServer")
.serverHost("localhost")
.serverPort(1883)
.build());
Mqtt5ReactorClient deviceClient = Mqtt5ReactorClient.from(Mqtt5Client.builder()
.identifier("myDevice")
.serverHost("localhost")
.serverPort(1883)
.build());
// Subscribe to everything wildcard
Mono<Void> subToAll = serverClient.subscribePublishesWith()
.topicFilter("#")
.qos(MqttQos.AT_LEAST_ONCE)
.applySubscribe()
.doOnSingle(subAck -> System.out.println("Successfully subscribed to #"))
.doOnNext(publish -> System.out.println("# - Received message"))
.then();
Mono<Void> subToOne = serverClient.subscribePublishesWith()
.topicFilter("A")
.qos(MqttQos.AT_LEAST_ONCE)
.applySubscribe()
.doOnSingle(subAck -> System.out.println("Successfully subscribed to A"))
.doOnNext(publish -> System.out.println("A - Received message"))
.then();
// Start server
serverClient.connect()
.then(Mono.when(subToAll, subToOne))
.subscribe();
Thread.sleep(100);
// Start client
Mqtt5Publish a = Mqtt5Publish.builder()
.topic("A")
.payload("1".getBytes())
.build();
Mono<Mqtt5PublishResult> sendRequest = deviceClient.publish(Mono.just(a))
.doOnNext(publishResult -> System.out.println("Successfully sent to A"))
.next();
deviceClient.connect()
.then(sendRequest)
.subscribe();
}
}
Outputs:
Successfully subscribed to #
Successfully subscribed to A
Successfully sent to A
A - Received message
# - Received message
A - Received message
# - Received message
Is it possible to use this subscription identifier in our subscriptions to avoid duplicate messages on the client side? Or are there other mechanisms that we can use on the client side of things to avoid this problem? Updating the QoS does not have any effect.
The mosquitto output is
1695716431: Sending CONNACK to myDevice (0, 0)
1695716431: Received PUBLISH from myDevice (d0, q0, r0, m0, 'A', ... (1 bytes))
1695716431: Sending PUBLISH to myServer (d0, q0, r0, m0, 'A', ... (1 bytes))
1695716431: Sending PUBLISH to myServer (d0, q0, r0, m0, 'A', ... (1 bytes))
📎 Additional context
There is support for the subscription identifier. I've delved into the code and seen that it's automatically assigned to every subscription, starting from int 1.
The decoder also looks for the identifier but overlapping subscriptions don't see to be using them to invoke the right callback.
Digging even deeper into the code I've found that it's because of the MqttIncomingPublishService#onPublish
invoking MqttIncomingPublishFlows#findMatching
only reading from global flows. The subscribedFlows
contains the right subscription identifier which can be matched onto the identifier of the MqttStatefulPublish
but the callbacks chosen are selected from the MqttIncomingPublishFlows#globalFlows
. The add functionality there should be taking the subscription identifer into account.
The Paho example that does it correctly is:
import com.hivemq.client.mqtt.datatypes.MqttQos;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
public class TestPaho
{
public static void main(String[] args) throws InterruptedException, MqttException
{
MqttAsyncClient serverClient = new MqttAsyncClient("tcp://localhost:1883", "myServer");
MqttAsyncClient deviceClient = new MqttAsyncClient("tcp://localhost:1883", "myDevice");
serverClient.connect().waitForCompletion();
deviceClient.connect().waitForCompletion();
System.out.println("Connected");
CountDownLatch receivedLatch = new CountDownLatch(2);
MqttProperties subProperties1 = new MqttProperties();
subProperties1.setSubscriptionIdentifiers(List.of(0)); // Bug in paho? This is for publish but subscribe requires it
subProperties1.setSubscriptionIdentifier(1);
serverClient.subscribe(new MqttSubscription("#", MqttQos.AT_LEAST_ONCE.getCode()), null, null, (topic, message) ->
{
System.out.println("# - Received message");
receivedLatch.countDown();
}, subProperties1);
MqttProperties subProperties2 = new MqttProperties();
subProperties2.setSubscriptionIdentifiers(List.of(0));
subProperties2.setSubscriptionIdentifier(2);
serverClient.subscribe(new MqttSubscription("A", MqttQos.AT_LEAST_ONCE.getCode()), null, null, (topic, message) ->
{
System.out.println("A - Received message");
receivedLatch.countDown();
}, subProperties2);
System.out.println("A - Sending");
deviceClient.publish("A", new MqttMessage("1".getBytes(), MqttQos.AT_LEAST_ONCE.getCode(), false, new MqttProperties()))
.waitForCompletion();
System.out.println("A - Sent");
receivedLatch.await();
deviceClient.close();
serverClient.close();
}
}
Thanks for the thorough investigation @TDYJeffreyDevloo. I'm not sure yet if this is by design currently but if so, then this should be a feature request to configure the behavior to use strict topic alias matching. @SgtSilvio should have better insight into this.