aws/aws-iot-device-sdk-python-v2

The publish function don't raise an error when the topic does not exist (MQTTv5)

dgbarmac opened this issue · 11 comments

Describe the bug

The publish function don't raise an error when the topic does not exist while using the MQTT version 5 client.

Expected Behavior

I expected an exception to be raised.

Current Behavior

The output of my code is:

Starting Connection
Lifecycle Connection Success
Connection Started!
Starting Publishing
Publishing Completed
Stopping Client
Lifecycle Stopped
Client Stopped!

No error is generated even if when i try to publish to a nonexistent topic.

Reproduction Steps

This is the code:

from awsiot import mqtt5_client_builder
from awscrt import mqtt5, http
from concurrent.futures import Future
import json

future_stopped = Future()
future_connection_success = Future()
TIMEOUT = 4
topic = 'aaaa'

def on_publish_received(publish_packet_data):
    pass

# Callback for the lifecycle event Stopped
def on_lifecycle_stopped(lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
    print("Lifecycle Stopped")
    global future_stopped
    future_stopped.set_result(lifecycle_stopped_data)

# Callback for the lifecycle event Connection Success
def on_lifecycle_connection_success(lifecycle_connect_success_data: mqtt5.LifecycleConnectSuccessData):
    print("Lifecycle Connection Success")
    global future_connection_success
    future_connection_success.set_result(lifecycle_connect_success_data)

# Callback for the lifecycle event Connection Failure
def on_lifecycle_connection_failure(lifecycle_connection_failure: mqtt5.LifecycleConnectFailureData):
    print("Lifecycle Connection Failure")
    print("Connection failed with exception:{}".format(lifecycle_connection_failure.exception))

endpoint = "endpoint"
port = 8883
cert_filepath = "cert"
key_filepath = "key"
root_ca_filepath = "ca"
client_id = "mqtt_test_app"

# Load the certificates
with open(cert_filepath, "rb") as cert_file:
    cert = cert_file.read()
with open(key_filepath, "rb") as key_file:
    key = key_file.read()
with open(root_ca_filepath, "rb") as root_ca_file:
    root_ca = root_ca_file.read()

client = mqtt5_client_builder.mtls_from_path(
    endpoint=endpoint,
    port=port,
    cert_filepath=cert_filepath,
    pri_key_filepath=key_filepath,
    ca_filepath=root_ca_filepath,
    on_publish_received=on_publish_received,
    on_lifecycle_stopped=on_lifecycle_stopped,
    on_lifecycle_connection_success=on_lifecycle_connection_success,
    on_lifecycle_connection_failure=on_lifecycle_connection_failure,
    client_id=client_id)

print("Starting Connection")
client.start()
lifecycle_connect_success_data = future_connection_success.result(TIMEOUT)
print("Connection Started!")

message = {'hello': 'world'}

print("Starting Publishing")
publish_future = client.publish(mqtt5.PublishPacket(
    topic=topic,
    payload=json.dumps(message),
    qos=mqtt5.QoS.AT_LEAST_ONCE
))
publish_completion_data = publish_future.result(TIMEOUT)
print("Publishing Completed")

print("Stopping Client")
client.stop()
future_stopped.result(TIMEOUT)
print("Client Stopped!")

Possible Solution

No response

Additional Information/Context

No response

SDK version used

1.15.1

Environment details (OS name and version, etc.)

macOS Monterey

This behavior is correct. A topic does not need to be subscribed to for a publish to succeed. However, you can check the return code in the puback and, depending on the broker, it may communicate success + no topic subscribers (No matching subscribers - https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901124). That is still a successful publish.

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see.
If you need more assistance, please either tag a team member or open a new issue that references this one.
If you wish to keep having a conversation with other community members under this issue feel free to do so.

@bretambrose This is not the behavior when I used the v3.1.1 client, that is, when a publish and the topic does not exist, I get an exception. I expected the behavior in v5 to be the same. Why does it is not?

The 311 client doesn't throw an exception when there are no listeners on a topic either. MQTT311 doesn't have the capability of rejecting QOS 1 publishes; all the broker can do is drop it and not send a puback, or shut down the connection. I am not sure what you are experiencing, but "exception on no subscribers" doesn't exist anywhere in the codebase.

Is your definition of "existence" the permission policy?

If so, then IoT Core doesn't drop your connection on a violation like in MQTT311. You need to look at the reason code in the puback to realize that the publish failed.

The client throws exceptions when something prevents it from giving you a result. If the protocol yields a result, there's no exception and it's up to you to interpret the result.

@bretambrose Yes, exactly I am using AWS IOT Core and inside the certificate policy I have not allowed publishing to the topic "aaaa" (that's what I mean when I say that the topic does not exist).

I see, so the default behavior on v3.1.1 is really to drop the connection, but that is not the case in v5.

About checking the reason code (of the publish operation): I was not able to find a way to do it, do you know how can I do it in code?

In your sample, publish_completion_data should be a PublishCompletionData instance (https://github.com/awslabs/aws-crt-python/blob/v0.16.18/awscrt/mqtt5.py#L1279). Qos 1 publishes that get a puback response will have the puback field set with a PubackPacket (https://github.com/awslabs/aws-crt-python/blob/v0.16.18/awscrt/mqtt5.py#L1108) and you should be able to check the reason_code field against the PubackReasonCode enum (https://github.com/awslabs/aws-crt-python/blob/v0.16.18/awscrt/mqtt5.py#L413).

Any failure that occurs before puback processing will still generate an exception however.

@bretambrose I have changed the publish part of the code to:

print("Starting Publishing")
publish_future = client.publish(mqtt5.PublishPacket(
    topic=topic,
    payload=json.dumps(message),
    qos=mqtt5.QoS.AT_LEAST_ONCE
))

publish_completion_data = publish_future.result(TIMEOUT)
print(publish_completion_data.puback.reason_code)
print("Publishing Completed")

The output is now:

"Starting Connection
Lifecycle Connection Success
Connection Started!
Starting Publishing
PubackReasonCode.SUCCESS
Publishing Completed
Stopping Client
Lifecycle Stopped
Client Stopped!"

I am also getting a success from PublishCompletionData instance. Was this expected? It appears that it did not work.

I'll take a look at this scenario with a narrowly scoped permission policy when I get some time.

Hi @dgbarmac, thank you for bringing up the bug.
After investigated into the issue, we figured that the publish was actually failed, while the SDK did not expose the error code properly.
The fix is now released in v1.15.2. (The release pipeline just started, it should be available on PyPI soon)

I will leave the issue open for couple days in case there is any feedback.

@bretambrose @xiazhvera Thank you for your support. I have already tested the new solution!

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see.
If you need more assistance, please either tag a team member or open a new issue that references this one.
If you wish to keep having a conversation with other community members under this issue feel free to do so.