Azure/azure-sdk-for-python

[ServiceBus] Unrecoverable error: "ServiceBusConnectionError('Link detached unexpectedly. Error condition: amqp:unknown-error.')"

saadshaikh3 opened this issue · 8 comments

  • Package Name: azure-servicebus
  • Package Version: 7.12.1
  • Operating System: Linux (Docker python:3.10.12-slim-bullseye)
  • Python Version: 3.10.12

Describe the bug
I am using service bus as a broker for celery in my django application. When I start the server and deploy on an azure app service, after a few days of everything working, suddenly I receive an error on sentry:

Unrecoverable error: "ServiceBusConnectionError('Link detached unexpectedly. Error condition: amqp:unknown-error.')"

Task chat_app.tasks.ping[f845eb96-722a-4c4d-a3fe-e8759412f358] received
Task chat_app.tasks.ping[75b86cfb-4ccc-4542-aaa3-481898e53542] received
Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACHED: 0>
AMQP error occurred: (AMQPLinkError('Error condition: ErrorCondition.UnknownError\n Error Description: Link detached unexpectedly.')), condition: (<ErrorCondition.UnknownError: b'amqp:unknown-error'>), description: ('Link detached unexpectedly.').
Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
Management link receiver state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
Management link sender state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
Session state changed: <SessionState.MAPPED: 3> -> <SessionState.END_SENT: 4>
Connection state changed: <ConnectionState.OPENED: 9> -> <ConnectionState.CLOSE_SENT: 11>
Connection state changed: <ConnectionState.CLOSE_SENT: 11> -> <ConnectionState.END: 13>
Session state changed: <SessionState.END_SENT: 4> -> <SessionState.DISCARDING: 6>
Link state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0>
Management link sender state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0>
Link state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0>
Management link receiver state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0>
Link state changed: <LinkState.DETACHED: 0> -> <LinkState.DETACHED: 0>
'servicebus.pysdk-0deac4de' operation has timed out. 
Last exception before timeout is (ServiceBusConnectionError('Link detached unexpectedly. Error condition: amqp:unknown-error.'))

To Reproduce
I have deployed the same setting to two different tenants. On one this kind of behaviour has not occurred. On the other it occurred today out of nowhere. I can mention the settings of my application:

celery==5.3.6
django==4.2.3

These are my settings in Django:

In base.py:

CELERY_BROKER_URL=f"azureservicebus://{env('SERVICE_BUS_SAS_POLICY')}:{env('SERVICE_BUS_SAS_KEY')}@{env('SERVICE_BUS_NAMESPACE')}"
CELERY_TASK_DEFAULT_QUEUE=env('SERVICE_BUS_QUEUE')
CELERY_ACCEPT_CONTENT = ['pickle','json']
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
CELERY_RESULT_BACKEND = 'django-db'
CELERY_RESULT_EXTENDED = True

In celery_app.py :

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
import sys
from pathlib import Path
 
BASE_DIR = Path(__file__).resolve(strict=True).parent.parent
sys.path.append(str(BASE_DIR / "project"))
 
if os.environ.get("DJANGO_DEBUG") == False :
     os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.staging')
 else:
     os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local')
 
 os.environ.setdefault('C_FORCE_ROOT', 'true')
 
 app = Celery('project')
 
 app.config_from_object('django.conf:settings', namespace='CELERY')
 
 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

In init.py:

 from .celery_app import app as celery_app
__all__ = ('celery_app',)

This is the command I execute in start.sh
nohup celery -A config.celery_app worker -Q $SERVICE_BUS_QUEUE -l INFO --pool threads &

Expected behavior
Even if it detaches for some unknown reason, it should connect back. You can see below in screen shots that messages are piling up in queue and the worker is not able to pick up these messages. So now I have to restart my azure app service manually.

Screenshots

image

image

Additional context
I know there is a timeout of 10 mins, and I feared that this error was coming because link gets detached after 10 mins. So I scheduled a ping every 8 mins and thought that would solve the issue. but this error still coming.

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @EldertGrootenboer.

Thank you for the feedback @saadshaikh3 . Would you be able to provide DEBUG level logs along with setting logging_enable=True - This way we can see the frames as well as the timestamps of activity to further help you out.

import logging
import sys

handler = logging.StreamHandler(stream=sys.stdout)
logger = logging.getLogger('azure.servicebus')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)

...

from azure.servicebus import ServiceBusClient

client = ServiceBusClient(..., logging_enable=True)

Hi @saadshaikh3. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

Hi @kashifkhan. This is the raw stack trace that I have.

AMQPLinkError: null
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_base_handler.py", line 411, in _do_retryable_operation
    return operation(**kwargs)
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_servicebus_receiver.py", line 461, in _receive
    receiving = amqp_receive_client.do_work()
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_pyamqp/client.py", line 425, in do_work
    if not self.client_ready():
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_pyamqp/client.py", line 405, in client_ready
    if not self._client_ready():
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_pyamqp/client.py", line 840, in _client_ready
    if self._link.get_state().value != 3:  # ATTACHED
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_pyamqp/link.py", line 116, in get_state
    raise self._error
ServiceBusConnectionError: Link detached unexpectedly. Error condition: amqp:unknown-error.
  File "celery/worker/worker.py", line 202, in start
    self.blueprint.start(self)
  File "celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "celery/bootsteps.py", line 365, in start
    return self.obj.start()
  File "celery/worker/consumer/consumer.py", line 340, in start
    blueprint.start(self)
  File "celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "celery/worker/consumer/consumer.py", line 746, in start
    c.loop(*c.loop_args())
  File "celery/worker/loops.py", line 130, in synloop
    connection.drain_events(timeout=2.0)
  File "kombu/connection.py", line 341, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "kombu/transport/virtual/base.py", line 997, in drain_events
    get(self._deliver, timeout=timeout)
  File "kombu/utils/scheduling.py", line 55, in get
    return self.fun(resource, callback, **kwargs)
  File "kombu/transport/virtual/base.py", line 1035, in _drain_channel
    return channel.drain_events(callback=callback, timeout=timeout)
  File "kombu/transport/virtual/base.py", line 754, in drain_events
    return self._poll(self.cycle, callback, timeout=timeout)
  File "kombu/transport/virtual/base.py", line 414, in _poll
    return cycle.get(callback)
  File "kombu/utils/scheduling.py", line 55, in get
    return self.fun(resource, callback, **kwargs)
  File "kombu/transport/virtual/base.py", line 417, in _get_and_deliver
    message = self._get(queue)
  File "kombu/transport/azureservicebus.py", line 266, in _get
    messages = queue_obj.receiver.receive_messages(
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_servicebus_receiver.py", line 702, in receive_messages
    messages: List[ServiceBusReceivedMessage] = self._do_retryable_operation(
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_base_handler.py", line 437, in _do_retryable_operation
    self._backoff(
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_base_handler.py", line 481, in _backoff
    raise last_exception

This is the grouped exception tree, maybe this could help too :

system

  • chained-exception
    • exception

      • stack-trace

        • frame

          • module

            • azure.servicebus._base_handler
          • function

            • _do_retryable_operation
          • context-line

            • return operation(**kwargs)
        • frame

          • module

            • azure.servicebus._servicebus_receiver
          • function

            • _receive
          • context-line

            • receiving = amqp_receive_client.do_work()
        • frame

          • module

            • azure.servicebus._pyamqp.client
          • function

            • do_work
          • context-line

            • if not self.client_ready():
        • frame

          • module

            • azure.servicebus._pyamqp.client
          • function

            • client_ready
          • context-line

            • if not self._client_ready():
        • frame

          • module

            • azure.servicebus._pyamqp.client
          • function

            • _client_ready
          • context-line

            • if self._link.get_state().value != 3: # ATTACHED
        • frame

          • module

            • azure.servicebus._pyamqp.link
          • function

            • get_state
          • context-line

            • raise self._error
      • type

        • AMQPLinkError
    • exception

      • stack-trace

        • frame

          • module

            • celery.worker.worker
          • function

            • start
          • context-line

            • self.blueprint.start(self)
        • frame

          • module

            • celery.bootsteps
          • function

            • start
          • context-line

            • step.start(parent)
        • frame

          • module

            • celery.bootsteps
          • function

            • start
          • context-line

            • return self.obj.start()
        • frame

          • module

            • celery.worker.consumer.consumer
          • function

            • start
          • context-line

            • blueprint.start(self)
        • frame

          • module

            • celery.bootsteps
          • function

            • start
          • context-line

            • step.start(parent)
        • frame

          • module

            • celery.worker.consumer.consumer
          • function

            • start
          • context-line

            • c.loop(*c.loop_args())
        • frame

          • module

            • celery.worker.loops
          • function

            • synloop
          • context-line

            • connection.drain_events(timeout=2.0)
        • frame

          • module

            • kombu.connection
          • function

            • drain_events
          • context-line

            • return self.transport.drain_events(self.connection, **kwargs)
        • frame

          • module

            • kombu.transport.virtual.base
          • function

            • drain_events
          • context-line

            • get(self._deliver, timeout=timeout)
        • frame

          • module

            • kombu.utils.scheduling
          • function

            • get
          • context-line

            • return self.fun(resource, callback, **kwargs)
        • frame

          • module

            • kombu.transport.virtual.base
          • function

            • _drain_channel
          • context-line

            • return channel.drain_events(callback=callback, timeout=timeout)
        • frame

          • module

            • kombu.transport.virtual.base
          • function

            • drain_events
          • context-line

            • return self._poll(self.cycle, callback, timeout=timeout)
        • frame

          • module

            • kombu.transport.virtual.base
          • function

            • _poll
          • context-line

            • return cycle.get(callback)
        • frame

          • module

            • kombu.utils.scheduling
          • function

            • get
          • context-line

            • return self.fun(resource, callback, **kwargs)
        • frame

          • module

            • kombu.transport.virtual.base
          • function

            • _get_and_deliver
          • context-line

            • message = self._get(queue)
        • frame

          • module

            • kombu.transport.azureservicebus
          • function

            • _get
          • context-line

            • messages = queue_obj.receiver.receive_messages(
        • frame

          • module

            • azure.servicebus._servicebus_receiver
          • function

            • receive_messages
          • context-line

            • messages: List[ServiceBusReceivedMessage] = self._do_retryable_operation(
        • frame

          • module

            • azure.servicebus._base_handler
          • function

            • _do_retryable_operation
          • context-line

            • self._backoff(
        • frame

          • module

            • azure.servicebus._base_handler
          • function

            • _backoff
          • context-line

            • raise last_exception
      • type

        • ServiceBusConnectionError

@saadshaikh3 I would need the debug logs to understand what the service sent back to the client that it had to detach ( hence frame level logging). The timestamps could also help me understand if there is a timeout issue or something else. Without that itll be difficult to deduce whats happening.

Hi @saadshaikh3. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.