[ServiceBus] Unrecoverable error: "ServiceBusConnectionError('Link detached unexpectedly. Error condition: amqp:unknown-error.')"
saadshaikh3 opened this issue · 8 comments
- Package Name: azure-servicebus
- Package Version: 7.12.1
- Operating System: Linux (Docker python:3.10.12-slim-bullseye)
- Python Version: 3.10.12
Describe the bug
I am using service bus as a broker for celery in my django application. When I start the server and deploy on an azure app service, after a few days of everything working, suddenly I receive an error on sentry:
Unrecoverable error: "ServiceBusConnectionError('Link detached unexpectedly. Error condition: amqp:unknown-error.')"
Task chat_app.tasks.ping[f845eb96-722a-4c4d-a3fe-e8759412f358] received
Task chat_app.tasks.ping[75b86cfb-4ccc-4542-aaa3-481898e53542] received
Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACHED: 0>
AMQP error occurred: (AMQPLinkError('Error condition: ErrorCondition.UnknownError\n Error Description: Link detached unexpectedly.')), condition: (<ErrorCondition.UnknownError: b'amqp:unknown-error'>), description: ('Link detached unexpectedly.').
Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
Management link receiver state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
Management link sender state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACH_SENT: 4>
Session state changed: <SessionState.MAPPED: 3> -> <SessionState.END_SENT: 4>
Connection state changed: <ConnectionState.OPENED: 9> -> <ConnectionState.CLOSE_SENT: 11>
Connection state changed: <ConnectionState.CLOSE_SENT: 11> -> <ConnectionState.END: 13>
Session state changed: <SessionState.END_SENT: 4> -> <SessionState.DISCARDING: 6>
Link state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0>
Management link sender state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0>
Link state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0>
Management link receiver state changed: <LinkState.DETACH_SENT: 4> -> <LinkState.DETACHED: 0>
Link state changed: <LinkState.DETACHED: 0> -> <LinkState.DETACHED: 0>
'servicebus.pysdk-0deac4de' operation has timed out.
Last exception before timeout is (ServiceBusConnectionError('Link detached unexpectedly. Error condition: amqp:unknown-error.'))
To Reproduce
I have deployed the same setting to two different tenants. On one this kind of behaviour has not occurred. On the other it occurred today out of nowhere. I can mention the settings of my application:
celery==5.3.6
django==4.2.3
These are my settings in Django:
In base.py:
CELERY_BROKER_URL=f"azureservicebus://{env('SERVICE_BUS_SAS_POLICY')}:{env('SERVICE_BUS_SAS_KEY')}@{env('SERVICE_BUS_NAMESPACE')}"
CELERY_TASK_DEFAULT_QUEUE=env('SERVICE_BUS_QUEUE')
CELERY_ACCEPT_CONTENT = ['pickle','json']
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
CELERY_RESULT_BACKEND = 'django-db'
CELERY_RESULT_EXTENDED = True
In celery_app.py :
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
import sys
from pathlib import Path
BASE_DIR = Path(__file__).resolve(strict=True).parent.parent
sys.path.append(str(BASE_DIR / "project"))
if os.environ.get("DJANGO_DEBUG") == False :
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.staging')
else:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local')
os.environ.setdefault('C_FORCE_ROOT', 'true')
app = Celery('project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
In init.py:
from .celery_app import app as celery_app
__all__ = ('celery_app',)
This is the command I execute in start.sh
nohup celery -A config.celery_app worker -Q $SERVICE_BUS_QUEUE -l INFO --pool threads &
Expected behavior
Even if it detaches for some unknown reason, it should connect back. You can see below in screen shots that messages are piling up in queue and the worker is not able to pick up these messages. So now I have to restart my azure app service manually.
Screenshots
Additional context
I know there is a timeout of 10 mins, and I feared that this error was coming because link gets detached after 10 mins. So I scheduled a ping every 8 mins and thought that would solve the issue. but this error still coming.
Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @EldertGrootenboer.
Thank you for the feedback @saadshaikh3 . Would you be able to provide DEBUG
level logs along with setting logging_enable=True
- This way we can see the frames as well as the timestamps of activity to further help you out.
import logging
import sys
handler = logging.StreamHandler(stream=sys.stdout)
logger = logging.getLogger('azure.servicebus')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
...
from azure.servicebus import ServiceBusClient
client = ServiceBusClient(..., logging_enable=True)
Hi @saadshaikh3. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.
Hi @kashifkhan. This is the raw stack trace that I have.
AMQPLinkError: null
File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_base_handler.py", line 411, in _do_retryable_operation
return operation(**kwargs)
File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_servicebus_receiver.py", line 461, in _receive
receiving = amqp_receive_client.do_work()
File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_pyamqp/client.py", line 425, in do_work
if not self.client_ready():
File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_pyamqp/client.py", line 405, in client_ready
if not self._client_ready():
File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_pyamqp/client.py", line 840, in _client_ready
if self._link.get_state().value != 3: # ATTACHED
File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_pyamqp/link.py", line 116, in get_state
raise self._error
ServiceBusConnectionError: Link detached unexpectedly. Error condition: amqp:unknown-error.
File "celery/worker/worker.py", line 202, in start
self.blueprint.start(self)
File "celery/bootsteps.py", line 116, in start
step.start(parent)
File "celery/bootsteps.py", line 365, in start
return self.obj.start()
File "celery/worker/consumer/consumer.py", line 340, in start
blueprint.start(self)
File "celery/bootsteps.py", line 116, in start
step.start(parent)
File "celery/worker/consumer/consumer.py", line 746, in start
c.loop(*c.loop_args())
File "celery/worker/loops.py", line 130, in synloop
connection.drain_events(timeout=2.0)
File "kombu/connection.py", line 341, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "kombu/transport/virtual/base.py", line 997, in drain_events
get(self._deliver, timeout=timeout)
File "kombu/utils/scheduling.py", line 55, in get
return self.fun(resource, callback, **kwargs)
File "kombu/transport/virtual/base.py", line 1035, in _drain_channel
return channel.drain_events(callback=callback, timeout=timeout)
File "kombu/transport/virtual/base.py", line 754, in drain_events
return self._poll(self.cycle, callback, timeout=timeout)
File "kombu/transport/virtual/base.py", line 414, in _poll
return cycle.get(callback)
File "kombu/utils/scheduling.py", line 55, in get
return self.fun(resource, callback, **kwargs)
File "kombu/transport/virtual/base.py", line 417, in _get_and_deliver
message = self._get(queue)
File "kombu/transport/azureservicebus.py", line 266, in _get
messages = queue_obj.receiver.receive_messages(
File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_servicebus_receiver.py", line 702, in receive_messages
messages: List[ServiceBusReceivedMessage] = self._do_retryable_operation(
File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_base_handler.py", line 437, in _do_retryable_operation
self._backoff(
File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_base_handler.py", line 481, in _backoff
raise last_exception
This is the grouped exception tree, maybe this could help too :
system
- chained-exception
exception
stack-trace
frame
module
azure.servicebus._base_handler
function
_do_retryable_operation
context-line
return operation(**kwargs)
frame
module
azure.servicebus._servicebus_receiver
function
_receive
context-line
receiving = amqp_receive_client.do_work()
frame
module
azure.servicebus._pyamqp.client
function
do_work
context-line
if not self.client_ready():
frame
module
azure.servicebus._pyamqp.client
function
client_ready
context-line
if not self._client_ready():
frame
module
azure.servicebus._pyamqp.client
function
_client_ready
context-line
if self._link.get_state().value != 3: # ATTACHED
frame
module
azure.servicebus._pyamqp.link
function
get_state
context-line
raise self._error
type
AMQPLinkError
exception
stack-trace
frame
module
celery.worker.worker
function
start
context-line
self.blueprint.start(self)
frame
module
celery.bootsteps
function
start
context-line
step.start(parent)
frame
module
celery.bootsteps
function
start
context-line
return self.obj.start()
frame
module
celery.worker.consumer.consumer
function
start
context-line
blueprint.start(self)
frame
module
celery.bootsteps
function
start
context-line
step.start(parent)
frame
module
celery.worker.consumer.consumer
function
start
context-line
c.loop(*c.loop_args())
frame
module
celery.worker.loops
function
synloop
context-line
connection.drain_events(timeout=2.0)
frame
module
kombu.connection
function
drain_events
context-line
return self.transport.drain_events(self.connection, **kwargs)
frame
module
kombu.transport.virtual.base
function
drain_events
context-line
get(self._deliver, timeout=timeout)
frame
module
kombu.utils.scheduling
function
get
context-line
return self.fun(resource, callback, **kwargs)
frame
module
kombu.transport.virtual.base
function
_drain_channel
context-line
return channel.drain_events(callback=callback, timeout=timeout)
frame
module
kombu.transport.virtual.base
function
drain_events
context-line
return self._poll(self.cycle, callback, timeout=timeout)
frame
module
kombu.transport.virtual.base
function
_poll
context-line
return cycle.get(callback)
frame
module
kombu.utils.scheduling
function
get
context-line
return self.fun(resource, callback, **kwargs)
frame
module
kombu.transport.virtual.base
function
_get_and_deliver
context-line
message = self._get(queue)
frame
module
kombu.transport.azureservicebus
function
_get
context-line
messages = queue_obj.receiver.receive_messages(
frame
module
azure.servicebus._servicebus_receiver
function
receive_messages
context-line
messages: List[ServiceBusReceivedMessage] = self._do_retryable_operation(
frame
module
azure.servicebus._base_handler
function
_do_retryable_operation
context-line
self._backoff(
frame
module
azure.servicebus._base_handler
function
_backoff
context-line
raise last_exception
type
ServiceBusConnectionError
@saadshaikh3 I would need the debug logs to understand what the service sent back to the client that it had to detach ( hence frame level logging). The timestamps could also help me understand if there is a timeout issue or something else. Without that itll be difficult to deduce whats happening.
Hi @saadshaikh3. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.