[servicebus] AttributeError: 'NoneType' object has no attribute 'client_ready_async' when reusing async ServiceBusSender objects
dougli opened this issue · 3 comments
- Package Name: azure-servicebus
- Package Version: 7.12.1
- Operating System: Mac
- Python Version: 3.11
Describe the bug
Connections & sessions to service bus are extremely expensive to set up, taking 0.5~1.5s to initialize and teardown. Reusing the ServiceBusSender object mitigates this, but a race condition in the SDK connection flow causes exceptions:
Traceback (most recent call last):
File "/opt/venv/lib/python3.11/site-packages/azure/servicebus/aio/_base_handler_async.py", line 260, in _do_retryable_operation
return await operation(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/venv/lib/python3.11/site-packages/azure/servicebus/aio/_servicebus_sender_async.py", line 238, in _send
await self._amqp_transport.send_messages_async(
File "/opt/venv/lib/python3.11/site-packages/azure/servicebus/aio/_transport/_pyamqp_transport_async.py", line 141, in send_messages_async
await sender._open()
File "/opt/venv/lib/python3.11/site-packages/azure/servicebus/aio/_servicebus_sender_async.py", line 221, in _open
while not await self._handler.client_ready_async():
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'client_ready_async'
To Reproduce
Steps to reproduce the behavior:
client = ServiceBusClient(...)
topic_sender = client.get_topic_sender(topic_name="my_topic")
def send_message(id: int):
for i in range(100):
await topic_sender.send_messages(ServiceBusMessage(f"Hello, World from {id}"))
# Normally you would send in bulk if you knew you had many messages to send in advance.
# But in a busy webserver, this is usually what happens since you have a variable high load of users
# and your request handler only sends 1 message per request
tasks = []
for i in range(10):
tasks.append(send_message(i))
await asyncio.gather(*tasks)
Expected behavior
async
SDKs should be async safe and throw no exceptions.
Additional context
I've found the smoking gun for this bug. This bug is a race condition near line 222 in _servicebus_sender_async.py. Here's the relevant code:
if self._running:
return
if self._handler:
await self._handler.close_async()
auth = None if self._connection else (await create_authentication(self))
self._create_handler(auth)
try:
await self._handler.open_async(connection=self._connection)
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)
self._running = True
Even though it seems impossible that self._handler
would be None on the client_ready_async
call since the previous line worked, because these are all async functions, there's a chance that other async code can unset self._handler
elsewhere.
The culprit is the if-check right at the top of that code block:
if self._handler:
await self._handler.close_async()
This calls up some superclass which unsets self._handler
. When we're starting a connection, we have an indeterminate state where self._running
is False, but self._handler
is True. If another parallel call comes into this code during the indeterminate state, it will disconnect the handler and null it out while the first call is still waiting in the while-loop.
Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @EldertGrootenboer.
Thank you for the feedback @dougli . As you know the library currently is not coroutine safe, so our recommendation to users is to use a lock when accessing the producer like in your repro.