aws/aws-iot-device-sdk-python-v2

Connection interrupted intermittently

jmklix opened this issue · 2 comments

Discussed in #548

Originally posted by tucker-SB January 24, 2024
I have the follow python code, which is modified from the pubsub sample

from awscrt import mqtt, http
from awsiot import mqtt_connection_builder
import sys
import secrets
import threading
import time
import json
import os
class example_settings:
    def __init__(self, cred_dir, sn, mode):
        self.CREDENTIALS_DIR = cred_dir
        self.SERIAL_NUMBER = sn
        self.HEADLESS_START = mode


def on_connection_interrupted(connection, error, **kwargs):
    print("Connection interrupted. error: {}".format(error))


# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))

    if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
        print("Session did not persist. Resubscribing to existing topics...")
        resubscribe_future, _ = connection.resubscribe_existing_topics()

        # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
        # evaluate result with a callback instead.
        resubscribe_future.add_done_callback(on_resubscribe_complete)


def on_resubscribe_complete(resubscribe_future):
    resubscribe_results = resubscribe_future.result()
    print("Resubscribe results: {}".format(resubscribe_results))

    for topic, qos in resubscribe_results['topics']:
        if qos is None:
            sys.exit("Server rejected resubscribe to topic: {}".format(topic))


# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
    print("Received message from topic '{}': {}".format(topic, payload))

# Callback when the connection successfully connects
def on_connection_success(connection, callback_data):
    assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
    print("Connection Successful with return code: {} session present: {}".format(callback_data.return_code, callback_data.session_present))

# Callback when a connection attempt fails
def on_connection_failure(connection, callback_data):
    assert isinstance(callback_data, mqtt.OnConnectionFailureData)
    print("Connection failed with error code: {}".format(callback_data.error))

# Callback when a connection has been disconnected or shutdown successfully
def on_connection_closed(connection, callback_data):
    print("Connection closed")

class mqtt_transmitter:
    def __init__(self, cert_fp, pkey_fp, ca_fp, endpoint="a3dkrncnrub5qb-ats.iot.us-east-2.amazonaws.com"):
        self.mqtt_connection = mqtt_connection_builder.mtls_from_path(
            endpoint=endpoint,
            port=8883,
            cert_filepath=os.path.join(settings.CREDENTIALS_DIR, cert_fp),
            pri_key_filepath=os.path.join(settings.CREDENTIALS_DIR, pkey_fp),
            ca_filepath=os.path.join(settings.CREDENTIALS_DIR, ca_fp),
            on_connection_interrupted=on_connection_interrupted,
            on_connection_resumed=on_connection_resumed,
            client_id=settings.SERIAL_NUMBER,
            clean_session=settings.HEADLESS_START == 'receiver',
            keep_alive_secs=30,
            http_proxy_options=None,
            on_connection_success=on_connection_success,
            on_connection_failure=on_connection_failure,
            on_connection_closed=on_connection_closed)

        '''
        if not cmdData.input_is_ci:
            print(f"Connecting to {cmdData.input_endpoint} with client ID '{cmdData.input_clientId}'...")
        else:
            print("Connecting to endpoint with client ID")
        '''
        connect_future = self.mqtt_connection.connect()

        # Future.result() waits until a result is available
        connect_future.result()


    def subscribe(self, topic, action=on_message_received):
        # Subscribe
        print("Subscribing to topic '{}'...".format(topic))
        subscribe_future, packet_id = self.mqtt_connection.subscribe(
            topic=topic,
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=action)

        subscribe_result = subscribe_future.result()
        print("Subscribed with {}".format(str(subscribe_result)))

    def publish(self, topic, payload:dict):
        self.mqtt_connection.publish(
            topic=topic,
            payload=json.dumps(payload),
            qos=mqtt.QoS.AT_LEAST_ONCE)

    def publish_example_message(self, topic, count):
        hexdata = secrets.token_hex(1280)
        self.publish(topic,
                     {'device':'labnuc',
                      'stream':'DEV_MFAM_13',
                      'count' :count,
                      'data'  :hexdata})
    def disconnect(self):
        fut = self.mqtt_connection.disconnect()
        fut.result()

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('serial_number', help='device serial number, used for mqtt ClientID')
    parser.add_argument('mode', help='receiver or publisher')
    parser.add_argument('cred_path', help='path to directory containing credentials')
    parser.add_argument('cert_fp', help='cert filename')
    parser.add_argument('pkey_fp', help='private key filename')
    parser.add_argument('ca_fp', help='ca filename')
    topic = 'datastream'

    args = parser.parse_args()
    settings = example_settings(args.cred_path, args.serial_number, args.mode)

    transmitter = mqtt_transmitter(args.cert_fp, args.pkey_fp, args.ca_fp)

    if args.mode == 'publisher':
        for i in range(1000):
            transmitter.publish_example_message(topic, i)
            time.sleep(.1)

    if args.mode == 'receiver':
        transmitter.subscribe(topic)
        for i in range(1000): # hold program open
            time.sleep(.1)

else:
    from src.Settings.settingsMgr import settingsManager as settings

I am currently trying to setup a system of one publisher which publishes data 40 times per second, and one subscriber. Each creates a mqtt_transmitter object. The issue is that on both sides I intermittently receive the following

'Connection interrupted. error: AWS_ERROR_MQTT_UNEXPECTED_HANGUP: The connection was closed unexpectedly.'

The data being published is time series data so interpreting it on the subscriber side I can see that it is correct and comes in with low latency (although out of order), but drops out completely for seconds at a time.

Interestingly I created a stripped example in the main block of the above code, where a 'publisher' and 'receiver' can be created where the publisher publishes identically structured data as my system. Running these two the data comes in order, with no issue.

Wondering what about my complete system could cause this issue with intermittent AWS_ERROR_MQTT_UNEXPECTED_HANGUP. It is a multi-threaded program, but I only have one thread publishing to the mqtt_transmitter. Does the connection object somehow need to be contained in it's own dedicated thread?

Can you provide a log from when this is happening to you? And how often are you seeing these connection interruptions?

I don't think you're running into any limits with 40 publishes a second, but AWS_ERROR_MQTT_UNEXPECTED_HANGUP is something that you might see when you go over rate limits. AWS IoT core will just stop responding without giving a reason if you go over the rate limits. You can check them out here and make sure that your not sending too many messages.

Also I would recommend trying the new MQTT5 pubsub found here. This is a newer version of the protocol that can gives better error messages.

Greetings! It looks like this issue hasn’t been active in longer than a week. We encourage you to check if this is still an issue in the latest release. Because it has been longer than a week since the last update on this, and in the absence of more information, we will be closing this issue soon. If you find that this is still a problem, please feel free to provide a comment or add an upvote to prevent automatic closure, or if the issue is already closed, please feel free to open a new one.