slackapi/python-slack-sdk

aiohttp based socket_mode failed to reconnect and enter a broken state

woolen-sheep opened this issue · 6 comments

Reproducible in:

pip freeze | grep slack
python --version
sw_vers && uname -v # or `ver`

The Slack SDK version

(slack-py3.11) ➜  slack pip freeze | grep slack
slack-bolt==1.18.1
slack-sdk==3.26.1

Python runtime version

(slack-py3.11) ➜  slack python --version
Python 3.11.6

OS info

(slack-py3.11) ➜  slack uname -a
Linux homenas 6.6.4-zen1-1-zen #1 ZEN SMP PREEMPT_DYNAMIC Mon, 04 Dec 2023 00:28:58 +0000 x86_64 GNU/Linux

Steps to reproduce:

( All the following are talking about aiohttp based socket_mode AsyncBaseSocketModeClient )

  1. Receive a disconnect message in run_message_listeners
  2. After run_message_listeners calls connect_to_new_endpoint, meet an Exception at:
    await self.current_session.ping(f"sdk-ping-pong:{t}")

Expected result:

The AsyncBaseSocketModeClient should handle the Exception correctly and reconnect to slack server successfully.

Actual result:

The AsyncBaseSocketModeClient failed to reconnect and then entered a broken state that can't be recovered (without restart the process).

Explaination of the cause

  • The bug starts from run_message_listeners when receives a disconnect mesage:
    await self.connect_to_new_endpoint(force=True)
  • Then it will call SocketModeClient.connect(), and update current_session at:
    self.current_session = await self.aiohttp_client_session.ws_connect(
  • After that it will send a SDK ping message, that's where we meet Exception:
    await self.current_session.ping(f"sdk-ping-pong:{t}")

    If we meet exception here(for example a network issue on host causing the connection closed), it will raise from function connect(). At this moment, the new session is already "dead".
  • Note that we exit from connect() before we execute self.monitor_current_session() at:
    self.current_session_monitor = asyncio.ensure_future(self.monitor_current_session())

    so there is no new monitor created.
  • The old monitor will break because we've already updated current_session:
    if session != self.current_session:
    if self.logger.level <= logging.DEBUG:
    self.logger.debug(f"The monitor_current_session task for {session_id} is now cancelled")
    break
  • Finally, we get a broken session and have no monitor running, nothing will call connect_to_new_endpoint again. AsyncBaseSocketModeClient will enter a bad state and can't be recovered.

Hi, @woolen-sheep! Thank you for submitting this - I am sorry to hear you're running into some issues 😞

I'd like to get some more information on this issue so I know how to proceed with attempting to replicate - at what point does your app disconnect typically? Have you had any successful reconnections at all (intermittent behavior), or is it just consistently failing to connect each time?

My App usually kept running for 3-4 days fine and then entered this bad state - so yes, it has been connected and working fine.

The reason of receive disconnection is too_many_websockets: The SDK will auto renew the session every 5 hours (actually this is requested by the slack server backend) but it didn't call disconnect() explicitly so the connection_num will keep growing and finally reache the limitation of slcak (max 10 connections). This should be fine because I am only using the latest renewed one, all other connections are staled.

However, in this case, the reason why received disconnect message is NOT important. Please check the Explaination of the cause section of my issue. I think it's a logical corner case of slack SDK that rarely happens.

Thanks for your quick reply! @hello-ashleyintech

It's another topic:
I think the SDK needs to call disconnect() explicitly when receive a disconnect message because it seem the slack server side didn't cut off the old connection... I am not very sure about this. I might do some tests and open another issue to talk about that.

@woolen-sheep thanks for the additional info and for providing such comprehensive info in your original issue! 🙇

It seems like a potential solution here might be to check before this code block whether the current_session is currently active and running before comparing it to the session and then implement some sort of retry to attempt to recreate a successful connection for current_session, and then cancel out after a certain amount of failed retry attempts. What do you think?

However, if it was working successfully and is now causing an exception in the line await self.current_session.ping(f"sdk-ping-pong:{t}"), I do wonder if it's also something on the aiohttp side that is causing it to suddenly start failing to reconnect. I will do some more digging to see if there are any recent updates or issues or anything that may have caused this on that end. If it is an aiohttp side issue, then the above retry will likely not be a good solution to move forward with since it will continue to consistent fail even with that implemented.

Let me know if you end up coming across any helpful additional info in the meantime! 🙌

It is an aiohttp Exception ConnectionResetError but I think it's not a continuous issue because if I restart the process it can back to normal immediately. So it's more like something caused by network issue within a very short period.

For the slack-sdk side, we don't need to care about the Exception type when consider the logic. The root issue is that: The following code block should be kind of "atomic":

self.current_session = await self.aiohttp_client_session.ws_connect(
self.wss_uri,
autoping=False,
heartbeat=self.ping_interval,
proxy=self.proxy,
ssl=self.web_client.ssl,
)
session_id: str = await self.session_id()
self.auto_reconnect_enabled = self.default_auto_reconnect_enabled
self.stale = False
self.logger.info(f"A new session ({session_id}) has been established")
# The first ping from the new connection
if self.logger.level <= logging.DEBUG:
self.logger.debug(f"Sending a ping message with the newly established connection ({session_id})...")
t = time.time()
await self.current_session.ping(f"sdk-ping-pong:{t}")
if self.current_session_monitor is not None:
self.current_session_monitor.cancel()
self.current_session_monitor = asyncio.ensure_future(self.monitor_current_session())
if self.logger.level <= logging.DEBUG:
self.logger.debug(f"A new monitor_current_session() executor has been recreated for {session_id}")
if self.message_receiver is not None:
self.message_receiver.cancel()
self.message_receiver = asyncio.ensure_future(self.receive_messages())
if self.logger.level <= logging.DEBUG:
self.logger.debug(f"A new receive_messages() executor has been recreated for {session_id}")
if old_session is not None:
await old_session.close()
old_session_id = self.build_session_id(old_session)
self.logger.info(f"The old session ({old_session_id}) has been abandoned")

By "atomic" I mean: If you update self.current_session to a new session, you MUST ensure a new monitor_current_session() and a new receive_messages() start to run.

One of the possible solutions here is wrapping this block with try ... catch and then retry connect when Exception happens.

If you don't mind, I can try to open a PR to fix this at weekend.

@woolen-sheep That would be fantastic! Thank you so much - please tag me in the PR once it's ready and I'll be happy to take a look! 🙌