Azure/azure-sdk-for-python

[servicebus] AttributeError: 'NoneType' object has no attribute 'client_ready_async' when reusing async ServiceBusSender objects

dougli opened this issue · 3 comments

  • Package Name: azure-servicebus
  • Package Version: 7.12.1
  • Operating System: Mac
  • Python Version: 3.11

Describe the bug
Connections & sessions to service bus are extremely expensive to set up, taking 0.5~1.5s to initialize and teardown. Reusing the ServiceBusSender object mitigates this, but a race condition in the SDK connection flow causes exceptions:

Traceback (most recent call last):
  File "/opt/venv/lib/python3.11/site-packages/azure/servicebus/aio/_base_handler_async.py", line 260, in _do_retryable_operation
    return await operation(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/azure/servicebus/aio/_servicebus_sender_async.py", line 238, in _send
    await self._amqp_transport.send_messages_async(
  File "/opt/venv/lib/python3.11/site-packages/azure/servicebus/aio/_transport/_pyamqp_transport_async.py", line 141, in send_messages_async
    await sender._open()
  File "/opt/venv/lib/python3.11/site-packages/azure/servicebus/aio/_servicebus_sender_async.py", line 221, in _open
    while not await self._handler.client_ready_async():
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'client_ready_async'

To Reproduce
Steps to reproduce the behavior:

client = ServiceBusClient(...)
topic_sender = client.get_topic_sender(topic_name="my_topic")

def send_message(id: int):
    for i in range(100):
        await topic_sender.send_messages(ServiceBusMessage(f"Hello, World from {id}"))

# Normally you would send in bulk if you knew you had many messages to send in advance.
# But in a busy webserver, this is usually what happens since you have a variable high load of users
# and your request handler only sends 1 message per request
tasks = []
for i in range(10):
    tasks.append(send_message(i))

await asyncio.gather(*tasks)

Expected behavior
async SDKs should be async safe and throw no exceptions.

Additional context
I've found the smoking gun for this bug. This bug is a race condition near line 222 in _servicebus_sender_async.py. Here's the relevant code:

if self._running:
    return
if self._handler:
    await self._handler.close_async()
auth = None if self._connection else (await create_authentication(self))
self._create_handler(auth)
try:
    await self._handler.open_async(connection=self._connection)
    while not await self._handler.client_ready_async():
        await asyncio.sleep(0.05)
    self._running = True

Even though it seems impossible that self._handler would be None on the client_ready_async call since the previous line worked, because these are all async functions, there's a chance that other async code can unset self._handler elsewhere.

The culprit is the if-check right at the top of that code block:

if self._handler:
    await self._handler.close_async()

This calls up some superclass which unsets self._handler. When we're starting a connection, we have an indeterminate state where self._running is False, but self._handler is True. If another parallel call comes into this code during the indeterminate state, it will disconnect the handler and null it out while the first call is still waiting in the while-loop.

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @EldertGrootenboer.

Thank you for the feedback @dougli . As you know the library currently is not coroutine safe, so our recommendation to users is to use a lock when accessing the producer like in your repro.