aws/aws-iot-device-sdk-python-v2

Retrieve topic name with callback passed to named shadow subscription

Frequel opened this issue · 12 comments

Describe the feature

subscribe_to_named_shadow_delta_updated_events callback should be able to retrieve the topic that triggered it.

this method wrap inside it the classic subscribe function used for normal topic in which the callback expect more than one argument (topic, payload, qos, retain, etc...)

to this subscribe method it is passed a wrapped callback because subscribe_to_named_shadow_delta_updated_events expect a callback with only one argument (instead of the callback that subscribe is expecting needs more).

I imagine that it is done on purpose to pass to the callback of the subscribed shadow only the payload but I don't understand why. It could be useful to receive in the callback the same information of the classic subscribe also with subscribe_to_named_shadow_delta_updated_events . At least for the topic name.

Use Case

I need this feature because I needed to subscribe to more than one shadow topic and due to the limit of topic that a thing can be subscribed to and due to the fact that I cannot predict how many shadow there will be because I can have infinite thing that can have their how shadow, I must subscribe to a wildcard topic.

For example, let subscribe to all the thing on the shadow named TestShadow, the topic will be:
$aws/things/+/shadow/name/TestShadow/update/delta

When my script is triggered on this subscription I cannot know the thing name of the shadow that triggered the callback.

Let say that based on which thing triggered the callback I should perform a different action (for example craft a message that say which thing is triggered, or if contains light in the name calculate the lumens or if it contains tempSensor in the name calculate the temperature) I cannot distinguish which action I should perform.

Proposed Solution

One solution could be pass to the message that triggers the subscription the name of the thing but there could be some situation that are not possible and sincerely I prefer another solution. Or rather,

Other Information

No response

Acknowledgements

  • I may be able to implement this feature request
  • This feature might incur a breaking change

SDK version used

1.21.1

Environment details (OS name and version, etc.)

Debian 11 and Debian 12

If you want to know what "thing" a subscribe_..._event callback invocation corresponds to, make the callback a lambda/function object that captures the thing name on creation.

If you want to know what "thing" a subscribe_..._event callback invocation corresponds to, make the callback a lambda/function object that captures the thing name on creation.

I don't know how to do it, can you make an example? I don't understand capture on creation, creation of what? I am subscribing to a wildcard and the thing that is posting on the topic under that wildcard is already created when write the message to the topic that triggers my script. Please, help me. If I can find another solution that doesn't imply to modify the sdk would be perfect.

something like this?

def create_custom_callback(topic):
    def create_custom_callback(delta):
        
        print(f"Received message from topic: {topic}")
        
        print(f"Delta state: {delta.state}")
    return on_shadow_delta_updated


shadow_name = "TestShadow"  
thing_name = "+"  


custom_callback = create_custom_callback(f'$aws/things/{thing_name}/shadow/name/{shadow_name}/update/delta')

delta_subscribed_future, _ = shadow_client.subscribe_to_named_shadow_delta_updated_events(
    request=iotshadow.NamedShadowDeltaUpdatedSubscriptionRequest(
        thing_name=thing_name,
        shadow_name=shadow_name
    ),
    qos=mqtt5.QoS.AT_LEAST_ONCE,
    callback=custom_callback
)

In the context of the shadow sample at https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/samples/shadow.py

Change on_shadow_delta_updated to add the thing name as a parameter:

def on_shadow_delta_updated(delta, thing_name):
    # type: (iotshadow.ShadowDeltaUpdatedEvent) -> None
    try:
        print(f"Received shadow delta event for thing `{thing_name}`.")
        if delta.state and (shadow_property in delta.state):
    ... # everything else remains the same

This callback can't be used directly as a callback because it has an additional parameter. Update the subscribe call with a lambda that captures shadow_thing_name and passes it in:

        print("Subscribing to Delta events...")
        delta_subscribed_future, _ = shadow_client.subscribe_to_shadow_delta_updated_events(
            request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(thing_name=shadow_thing_name),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=lambda event: on_shadow_delta_updated(event, shadow_thing_name))

In the context of the shadow sample at https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/samples/shadow.py

Change on_shadow_delta_updated to add the thing name as a parameter:

def on_shadow_delta_updated(delta, thing_name):
    # type: (iotshadow.ShadowDeltaUpdatedEvent) -> None
    try:
        print(f"Received shadow delta event for thing `{thing_name}`.")
        if delta.state and (shadow_property in delta.state):
    ... # everything else remains the same

This callback can't be used directly as a callback because it has an additional parameter. Update the subscribe call with a lambda that captures shadow_thing_name and passes it in:

        print("Subscribing to Delta events...")
        delta_subscribed_future, _ = shadow_client.subscribe_to_shadow_delta_updated_events(
            request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(thing_name=shadow_thing_name),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=lambda event: on_shadow_delta_updated(event, shadow_thing_name))

probably I am not understanding yet but my shadow_thing_name will be a "+" because I am subscribing to a wildcard, how it can retrive the name of the thing that is triggering my subscription of the wildcard? retrieve the topic name I can extract the thing name, in this way I cannot see how. I am also using named shadow that in the sample are not available.

The client isn't designed with wildcard usage in mind. The expectation is that thing name will be a valid thing name,

The client isn't designed with wildcard usage in mind. The expectation is that thing name will be a valid thing name,

Sorry, but my situation is that I have a number of devices (thing) distributed around the world and they need to communicate with a server in a low energy efficient mode. I though about MQTT and due to the fact that I have the possibility to use AWS I opted for AWS IoT core. Now, the server needs to send and receive MQTT message from those device that in time can increase or decrease in number, so to be sure that the server (another thing that makes more functionality than the other thing distributed) I am subscribing it to a wildcard where the + wildcard character substitute the thing name inside the shadow topic as described here:
$aws/things/+/shadow/name/TestShadow/update/delta

in this way the server can send and receive message from all the device connected to the same broker.
My problem is that when the server receive a message on some topic need to retrieve who is contacting it.
the topic that triggers it could be:

$aws/things/device0/shadow/name/TestShadow/update/delta
$aws/things/device1/shadow/name/TestShadow/update/delta
$aws/things/device2/shadow/name/TestShadow/update/delta
$aws/things/device3/shadow/name/TestShadow/update/delta

how I can achieve something similar without using wildcards? when and how I can use wildcard if a client cannot use it?

Well, you could use wildcards still, just not the service client. Subscribe directly using the mqtt client. For payload unmarshalling, you can probably use the model types and appropriate deserialization utility function/class, like https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/awsiot/iotshadow.py#L1218 (convert the publish payload to json, then call from_payload with the json object)

Architecturally, I would worry about a server using wildcards. The approach offers no way to handle scaling pains and IoT Core limits (https://docs.aws.amazon.com/general/latest/gr/iot-core.html#message-broker-limits) could easily sabotage a solution that maps an unbounded number of remote devices to a single server listening to all of them, depending on the frequency/shadow-size profile of course.

Well, you could use wildcards still, just not the service client. Subscribe directly using the mqtt client. For payload unmarshalling, you can probably use the model types and appropriate deserialization utility function/class, like https://github.com/aws/aws-iot-device-sdk-python-v2/blob/main/awsiot/iotshadow.py#L1218 (convert the publish payload to json, then call from_payload with the json object)

Architecturally, I would worry about a server using wildcards. The approach offers no way to handle scaling pains and IoT Core limits (https://docs.aws.amazon.com/general/latest/gr/iot-core.html#message-broker-limits) could easily sabotage a solution that maps an unbounded number of remote devices to a single server listening to all of them, depending on the frequency/shadow-size profile of course.

I am really sorry but I continue to not understand.

The from_payload it is already used by the wrapped_callback instantiated during the workflow of subscribe_to_named_shadow_delta_updated_events method. So, I already have a class object which derives from the method you linked and it doesn't contain the topic or thing_name value. I cannot find any other model that uses that method and contains the topic name or the thing name, so I continue to not be able to take out the thing name in a proper way.

For the second part I used wildcard to not be subscribed to a lot of topic but only once that listen to a lot, probably I am wrong and it doesn`t work in this way but from the link you provided I haven't find out something related to only one subscription to a wildcard topic. I need to check better the information on this link but if you have more detailed information about it it will be really appreciated. I found out only a maximum of 8 subscription per subscribe request but I am making only one subscription to a topic with wildcard, it is not equivalent to a one subscription only?

Anyway, I find out a workaround for my problem avoiding to modify the SDK but I continue to not understand why the callback for shadow subscription cannot manage the same parameter/argument of the callback for the subscription of normal topic.

the workaround:

import inspect

def on_shadow_delta_updated(delta):
	# Access the caller's local variables
	caller_locals = inspect.currentframe().f_back.f_locals
	# Attempt to extract 'topic' from the caller's local scope
	topic = caller_locals.get('topic', 'Unknown Topic')
	print(f"Event: {delta}, Topic: {topic}")

this workaround can continue to work for the next release or that could be something that broke it?

I am really sorry but I continue to not understand.

The suggestion is to ignore the shadow service client completely and just use the MQTT client directly.

In your case, all the shadow client gives you is topic construction (which you know how to do) and payload deserialization. The on-publish-received callbacks of the protocol (MQTT311 or MQTT5) clients give you the topic in addition to the payload. The payload is JSON, so convert the bytes to JSON and then invoke the appropriate .from_payload method to get a deserialized update event.

(These are just off the cuff, I have not ran them)

Using the 311 client(connection):

def handle_event_update(topic, payload, dup, qos, retain, **kwargs):
    # pull thing name out of topic
    # convert payload to json
    # call ShadowDeltaUpdatedEvent.from_payload on the json value to deserialize into the service model
    # do what you want with the deserialized event data

...

mqtt_connection.subscribe(
        topic="$aws/things/+/shadow/update/delta",
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=handle_event_update)

Using the 5 client (which does not support per-subscribe callbacks, so you must look at the incoming publish's topic and determine if it represents an update event):

def handle_event_update(topic, payload):
    # pull thing name out of topic
    # convert payload to json
    # call ShadowDeltaUpdatedEvent.from_payload on the json value to deserialize into the service model
    # do what you want with the deserialized event data

def on_publish_received(publish_packet_data):
    publish_packet = publish_packet_data.publish_packet
    if is_event_update_topic(publish_packet.topic):
        handle_event_update(publish_packet.topic, publish_packet.payload)

...

    # client is the MQTT client, not a shadow client
    client.subscribe(subscribe_packet=mqtt5.SubscribePacket(
        subscriptions=[mqtt5.Subscription(
            topic_filter="$aws/things/+/shadow/update/delta",
            qos=mqtt5.QoS.AT_LEAST_ONCE)) 


For the second part I used wildcard to not be subscribed to a lot of topic but only once that listen to a lot, probably I am wrong and it doesn`t work in this way but from the link you provided I haven't find out something related to only one subscription to a wildcard topic. I need to check better the information on this link but if you have more detailed information about it it will be really appreciated. I found out only a maximum of 8 subscription per subscribe request but I am making only one subscription to a topic with wildcard, it is not equivalent to a one subscription only?

A wildcard subscription just counts as one subscription, yes.

Anyway, I find out a workaround for my problem avoiding to modify the SDK but I continue to not understand why the callback for shadow subscription cannot manage the same parameter/argument of the callback for the subscription of normal topic.

the workaround:
...

this workaround can continue to work for the next release or that could be something that broke it?

There are no guarantees that that kind of internal stack reflection will continue to work across SDK versions, but these service clients are unlikely to change in ways that would break it.

Ok, perfect Now I have understood everything, thank you.
I have only one question, it should be the last one xD. Using normal subscription and avoiding shadows means that the feature about desired and reported state that triggers the delta only if there is a difference with the last state need to be implemented from sketch accordingly what I need or using the ShadowDeltaUpdatedEvent.from_payload permits to not change the rest of the code already implemented? using, for example, the sample script on shadows I can use the same callback due to the only modification that you showed to me? I will lost only the check on the content of the message if contains the state field from the broker or I will lost also other features?

You could continue to use the shadow client for the update operations. Beyond that, I'm not really able to follow/reason-about hypothetical changes. It seems like something you'll just need to sit down and experiment with.

Edit: Apologies, I seem to have edited your reply instead of replying.

Ok, thank you. I'll exepriment it then. in the meantime I'll close the issue.

This issue is now closed. Comments on closed issues are hard for our team to see.
If you need more assistance, please open a new issue that references this one.