Azure/azure-iot-sdk-python

[V3] Re-using an IoTHubSession breaks receiver pattern

cartertinney opened this issue · 1 comments

If, instead of re-instantiating a Session object upon each connection, you attempt to re-use a previous one, receive patterns will raise a StopAsyncIteration exception through the context managed incoming data generator.

This can happen from:

  • Network failure
  • SAS Token expiration

It does not occur when manually disconnecting.

Sample code demonstrating behavior (wait for SAS Token to expire, or disconnect from internet to force a drop):

import asyncio
import os
from azure.iot.device import IoTHubSession, MQTTConnectionDroppedError, MQTTConnectionFailedError

import logging
logger = logging.basicConfig(level=logging.DEBUG)

CONNECTION_STRING = os.getenv("IOTHUB_DEVICE_CONNECTION_STRING")
TOTAL_MESSAGES_RECEIVED = 0


async def main():
    global TOTAL_MESSAGES_RECEIVED
    print("Starting C2D sample")
    print("Press Ctrl-C to exit")
    session = IoTHubSession.from_connection_string(CONNECTION_STRING, sastoken_ttl=200)
    while True:
        try:
            print("Connecting to IoT Hub...")
            async with session as session:
                print("Connected to IoT Hub")
                async with session.messages() as messages:
                    print("Waiting to receive messages...")
                    async for message in messages:
                        TOTAL_MESSAGES_RECEIVED += 1
                        print("Message received with payload: {}".format(message.payload))

        except MQTTConnectionDroppedError:
            # Connection has been lost. Reconnect on next pass of loop.
            print("Dropped connection. Reconnecting in 1 second")
            await asyncio.sleep(1)
        except MQTTConnectionFailedError:
            # Connection failed to be established. Retry on next pass of loop.
            print("Could not connect. Retrying in 10 seconds")
            await asyncio.sleep(10)


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        # Exit application because user indicated they wish to exit.
        # This will have cancelled `main()` implicitly.
        print("User initiated exit. Exiting")
    finally:
        print("Received {} messages in total".format(TOTAL_MESSAGES_RECEIVED))

Reverted to v2