Azure/azure-iot-sdk-csharp

Unobserved task exception when using MQTT websocket when connection cannot be established

ramondeklein opened this issue · 4 comments

Context

  • OS: Windows 10/11 and Linux
  • Platforms: ARM and Intel platforms are affected
  • .NET Core SDK: v7.0.203
  • Microsoft.Azure.Devices.Client package v1.41.3 (although the latest preview is probably also affected, when I look at the code).

Description of the issue

When using Mqtt_WebSocket_Only transport and a host that cannot be reached (i.e. due to DNS issues, because the device is offline) then a WebSocketException is thrown that doesn't seem to be catched, because it is picked up by TaskScheduler.UnobservedTaskException.

Code sample exhibiting the issue

using System.Runtime;
using Microsoft.Azure.Devices.Client;

// Ensure that task are collected to detect the unobserved task exceptions
// (no production code, but just to trigger the bug)
_ = Task.Run(async () =>
{
    while (true)
    {
        await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
        GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce;
        GC.Collect(2, GCCollectionMode.Forced, true, true);
    }
});

// Catch unobserved task exceptions
TaskScheduler.UnobservedTaskException += (_, args) =>
{
    Console.WriteLine($"Caught {(args.Observed ? "observed" : "non-observed")} task exception");
    Console.WriteLine($"Top level [{args.Exception.GetType().Name}]: {args.Exception.Message}\n{args.Exception.StackTrace}");

    foreach (var exc in args.Exception.InnerExceptions)
        Console.WriteLine($"Inner [{exc.GetType().Name}]: {exc.Message}\n{exc.StackTrace}");
};


var deviceId = "invalid-device-id";
var primaryKey = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa=";
var auth = new DeviceAuthenticationWithRegistrySymmetricKey(deviceId, primaryKey);
var assignedHub = "example.com";

await using var deviceClient = DeviceClient.Create(assignedHub, auth, TransportType.Mqtt_WebSocket_Only);
deviceClient.SetConnectionStatusChangesHandler((status, reason) =>
    Console.WriteLine($"Status changed to {status}: {reason}"));
await deviceClient.OpenAsync();

Console log of the issue

The following information is dumped to the console:

Status changed to Disconnected_Retrying: Communication_Error
Caught non-observed task exception
Top level [AggregateException]: A Task's exception(s) were not observed either by Waiting on the Task or accessing its Exception property. As a result, the unobserved exception was rethrown by the finalizer thread. (The server returned status code '404' when status code '101' was expected.)

Inner [WebSocketException]: The server returned status code '404' when status code '101' was expected.
   at System.Net.WebSockets.WebSocketHandle.ValidateResponse(HttpResponseMessage response, String secValue, ClientWebSocketOptions options)
   at System.Net.WebSockets.WebSocketHandle.ConnectAsync(Uri uri, HttpMessageInvoker invoker, CancellationToken cancellationToken, ClientWebSocketOptions options)
   at System.Net.WebSockets.ClientWebSocket.ConnectAsyncCore(Uri uri, HttpMessageInvoker invoker, CancellationToken cancellationToken)
   at Microsoft.Azure.Devices.Client.Transport.Mqtt.MqttTransportHandler.<>c__DisplayClass104_0.<<CreateWebSocketChannelFactory>b__0>d.MoveNext()
--- End of stack trace from previous location ---
   at Microsoft.Azure.Devices.Client.Transport.Mqtt.MqttTransportHandler.OpenInternalAsync(CancellationToken cancellationToken)
   at Microsoft.Azure.Devices.Client.Transport.Mqtt.MqttTransportHandler.OpenAsync(CancellationToken cancellationToken)
   at Microsoft.Azure.Devices.Client.Transport.ProtocolRoutingDelegatingHandler.OpenAsync(CancellationToken cancellationToken)
   at Microsoft.Azure.Devices.Client.Transport.ErrorDelegatingHandler.<>c__DisplayClass27_0.<<ExecuteWithErrorHandlingAsync>b__0>d.MoveNext()
--- End of stack trace from previous location ---
   at Microsoft.Azure.Devices.Client.Transport.ErrorDelegatingHandler.ExecuteWithErrorHandlingAsync[T](Func`1 asyncOperation)

...

Probable cause

This is probably caused by the MqttTransportHandler.OnError method. When an error is encountered, then it sets the exception to both _connectCompletion and _subscribeCompletionSource. Because the tasks are never awaited, because of the exception the task leaks with an un-observed exception.

It would probably best to add the following code to MqttTransportHandler.Dispose(bool):

// touch the underlying tasks to avoid un-observed exceptions
_ = _connectCompletion.Task.Exception;
_ = _subscribeCompletionSource.Task.Exception;

This will briefly touch the exception and this marks the exception as handled inside the TaskExceptionHolder and prevents it from being reported. An even better solution would be to not create the task completion sources, until they are actually needed. This would also release some strain on the GC.

This issue has been fixed as of this release, so I'll close this thread

Hi @timtay-microsoft

I guess that this fix is now causing an MQTT exception on the code below:

    private static async Task<DeviceRegistrationResult> ProvisionDeviceAsync(Parameters parameters)
    {
        Logger.LogInformation("Provisioning the device");

        using var symmetricKeyProvider =
            new SecurityProviderSymmetricKey(parameters.DeviceId, parameters.DeviceSymmetricKey, null);
        using var mqttTransportHandler = new ProvisioningTransportHandlerMqtt();
        var provisioningDeviceClient = ProvisioningDeviceClient.Create(
            parameters.DpsEndpoint,
            parameters.DpsScopeId,
            symmetricKeyProvider,
            mqttTransportHandler);

        var pnpPayload = new ProvisioningRegistrationAdditionalData
        {
            JsonData = PnpConvention.CreateDpsPayload(ModelId),
        };

        while (true)
        {
            try
            {
                return await provisioningDeviceClient.RegisterAsync(pnpPayload);
            }
            catch (Exception ex)
            {
                Logger.LogError("Failed to provision the device. Exception: {ExceptionMessage}", ex.Message);
                while (ex.InnerException is not null)
                {
                    ex = ex.InnerException;
                    Logger.LogError("Inner Exception: {ExceptionMessage}", ex.Message);
                }
            }

            Logger.LogInformation("Retrying to provision the device");
        }
    }

There's no exception if I use the versions below:

    <PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.42.0" />                      <!-- Version 1.42.1 has MQQT Connectivity issues -->
    <PackageReference Include="Microsoft.Azure.Devices.Provisioning.Client" Version="1.19.3" />         <!-- Version 1.19.4 has MQQT Connectivity issues -->
    <PackageReference Include="Microsoft.Azure.Devices.Provisioning.Transport.Mqtt" Version="1.17.3" /> <!-- Version 1.17.4 has MQQT Connectivity issues -->

@nelsonmorais can you open a new thread for the issue you are seeing? And can you include the exception that you are seeing after upgrading?

@nelsonmorais can you open a new thread for the issue you are seeing? And can you include the exception that you are seeing after upgrading?

@timtay-microsoft Just started Christmas Holidays, will do that once I'm back at work.