[QUERY] How to properly acknowledge IoT hub messages
jsquire opened this issue · 6 comments
Issue Transfer
This issue has been transferred from the Azure SDK for .NET repository, #42405.
Please be aware that @the-programmer is the author of the original issue and include them for any questions or replies.
Details
Library name and version
Microsoft.Azure.Devices 1.39.1
Query/Question
Project overview
In my current project I need to control embedded devices (running the Azure SDK for C V1.1.6 on an ESP32, using MQTT) via Azure IoT hub.
So far I managed to get the Device to Cloud (D2C) and Cloud to Device (C2D) functionality working without issues.
The current communication flow for D2C messages is as follows and works great.
- The device sends a message to the IoT hub via MQTT.
- IoT hub writes directly to a CosmosDB container.
- An Azure function has a CosmosDBTrigger on this container.
- The function decodes the body and stores there result in a different container. (This is done to base64 decode the Body of the message)
The current communication flow for C2D messages is as follows and is giving some issues.
- The front-end writes to a CosmosDB container
- An Azure function has a CosmosDBTrigger on this container.
- The function prepares the message and transmits it to IoT hub.
- IoT hub sends the message to the device via MQTT.
Finally, there is a timed Azure function to parse all the feedback from the IoT hub. The feedback is than used to update the container that contained the original command.
This is a timed function since Azure functions doe not allow the "normal" while(true)
method due to a maximum runtime of 5 minutes. This method is inspired by this article.
Some additional details
Program.cs
In the program.cs
of the Azure functions I registered the IoT hub client as follows
//Get the connectionstring for the iotHubServiceClient and create an instance
var iotHubConnectionString = Environment.GetEnvironmentVariable("IotHubConnectionString", EnvironmentVariableTarget.Process);
var iotHubServiceClient = ServiceClient.CreateFromConnectionString(iotHubConnectionString);
var host = new HostBuilder()
.ConfigureFunctionsWebApplication()
.ConfigureServices(services =>
{
//Other code
//Add the IotHub Serviceclient
services.AddSingleton(iotHubServiceClient);
})
.Build();
When it started
Before setting the Ack
to DeliveryAcknowledgement.Full
I didn't notice this issue (and as far as I can tell it wasn't there).
Timed function code
The timed function had got the following code
//Get the feedbackReceiver
var feedbackReceiver = _iotHubServiceClient.GetFeedbackReceiver();
//Create a cancellationToken with a timeout of 1 minute
var timeoutCancellationTokenSource = new CancellationTokenSource(new TimeSpan(0, 1, 0));
//Start waiting for new messages
var message = await feedbackReceiver.ReceiveAsync(timeoutCancellationTokenSource.Token);
_logger.LogInformation("Got {count} status messages from IoT hub", message?.Records.Count() ?? 0);
//Did we get anything
if (message == null)
//No, abort here
return;
//Update the statuscode in the database
//[...]
//Complete all the received messages
var cancellationTokenSource = new CancellationTokenSource();
await feedbackReceiver.CompleteAsync(message, cancellationTokenSource.Token);
The issue
The function that sends the message to the IoT hub has the following code.
var message = new Message(Encoding.ASCII.GetBytes(JsonSerializer.Serialize(commands)))
{
Ack = DeliveryAcknowledgement.Full,
MessageId = command.Id,
};
await _iotHubServiceClient.SendAsync(deviceId, message);
So, when the front-end now inserts a new document into the command container the message is correctly transmitted to the device (via MQTT). It seems that the command get's "stuck" somewhere in transmission. My device keeps on receiving duplicate MQTT messages that have been received already.
My timed function also had this issue. It keeps on getting the acknowledgements for the same message over and over.
I would really like to have full acknowledgement sine this is the only way for the operators to get (delayed) feedback.
Environment
Hosting platform: Azure function app, .NET 8 (LTS), isolated worker model, windows. Runtime version 4.30.0.22097
Microsoft Visual Studio Professional 2022 (64-bit) Version 17.9.2
Thanks for the transfer @jsquire.
I did some more testing and if I disable the timed function the retry seems to be gone. Somehow it seems that requesting the status causes the MQTT messages to be re-transmitted.
Can you share a code snippet of the device-side code that handles receiving the cloud-to-device message?
I believe acknowledging a received cloud-to-device message with the C SDK should be as simple as this. If you are already doing this, then
My device keeps on receiving duplicate MQTT messages that have been received already.
I don't believe the message acknowledgement full flag is related to this behavior. However, are you seeing the message feedback being received by your service client in the Azure Function? And if so, are you seeing the same feedback message over and over as well?
Currently I am using this template for the MQTT connection.
From your sample I can tell that, in case of an MQTT connection the "puback" response is sufficient to confirm the message to the IoT hub.
To validate that this is happening correctly I looked up the source of the MQTT client for the ESP32. Here you can find the publish event handler and if the QoS of the message transmitted is 1 the "puback" response is transmitted. Note that I can't find any samples of what the QoS of the IoT hub packets actually are. However I recently had a bug in my code that prevented the "puback" from being transmitted. This caused the messages to get "stuck" in the C2D queue.
So for full reference, here is the complete flow.
- In my Azure function I make a call to
_iotHubServiceClient.SendAsync(deviceId, message);
with in the message theAck = DeliveryAcknowledgement.Full,
flag set. - IoT hub parses the message.
- IoT hub sends the message to my device with a QoS of 1 (ESP32).
- The ESP32 MQTT client receives this, parses the message and answers with a "puback".
- IoT hub removes it from the "outgoing" queue and stores the result in the feedback queue.
- The timed function comes along and grabs the feedback via the
FeedbackReceiver
. - The result is logged in CosmosDB
- The feedback is "Completed" (and according to the documentation "Deletes a received message from the queue.")
However, if I start my timed function (step 6) to handle the feedback somehow step 3 is triggered. So far I have been unable to determine what is causing this.
Note that I can't find any samples of what the QoS of the IoT hub packets actually are
Cloud to device messages like these should be QoS 1, FYI
Since you are receiving feedback messages, I believe it is safe to assume that your acknowledgement logic on the ESP32 MQTT client side is correct.
Do you see this same repeated delivery of duplicate messages when you set the DeliveryAcknowledgement to PositiveOnly, NegativeOnly, or None?
Today I found what went wrong. It was a bug in the code, not in IoT hub.
The issue was that I mis-understood the CosmosDBTrigger
. It not only triggers when a record is created (what I assumed) but also when a record is updated.
So when my timed function fired it generated an update into the monitored container. This update, caused the CosmosDBTrigger
to fire (again) and send out a new IoT hub message (that was still linked to the old database entry).
This was the cause of the loop. By checking the updated element the trigger still fires but no messages are generated.
This issue can be closed. Thanks for the help anyway.
Gotcha. Glad to hear the issue was found. I'll close this thread, then.