openwpm/OpenWPM

Freezing During Shutdown: Exception Handling in finalize_visit_id Causes Controller to Halt

Closed this issue · 9 comments

Hi,

When the storage controller receives a shutdown signal, it waits for all tasks associated with each visit ID. However, during the shutdown process, if an exception is thrown while processing records in finalize_visit_id, the controller freezes and doesn't continue.
Why does it freeze? Are there any ways to overcome this issue? In the case of an exception, shouldn't it log the error and continue?

Exception (KeyError) is thrown at

del self.store_record_tasks[visit_id]

which is being called from

t = await self.finalize_visit_id(visit_id, success=False)

and at the end causes an infinite loop at

while True:
if self.closing and not self.unsaved_command_sequences:
# we're shutting down and have no unprocessed callbacks
break
visit_id_list = self.storage_controller_handle.get_new_completed_visits()
if not visit_id_list:
time.sleep(1)
continue

because it is waiting for the visit_id to appear in the completion queue.

in my logs i have already that the visit_id is awaited
storage_controller - INFO - Awaited all tasks for visit_id 5095731113063583 while finalizing
which indicates, it has already been finalized

self.logger.debug(
"Awaited all tasks for visit_id %d while finalizing", visit_id
)

but 3 lines below i have the above exception which says that it didn't find the visit id which makes sense since it is finalized 3 lines up and deleted afterwards

Hey,

the function gets called outside of the handler, during shutdown. See here

async def shutdown(self, completion_queue_task: Task[None]) -> None:
self.logger.info("Entering self.shutdown")
completion_tokens = {}
visit_ids = list(self.store_record_tasks.keys())
for visit_id in visit_ids:
t = await self.finalize_visit_id(visit_id, success=False)
if t is not None:
completion_tokens[visit_id] = t
await self.structured_storage.flush_cache()

I didn't expect storage providers to throw exceptions, but if this an issue you are encountering please submit a PR to catch errors that might occur.

What is currently confusing me is that there should be no concurrent access to this list, because we make sure the server is fully shut down before we call self.shutdown.

However, from your description we enter the finalize_visit_id with the same visit_id multiple times in close succession (because it doesn't get caught by this check) which shouldn't be possible because we are iterating over keys of a dict.
That means you should see two "Awaiting all tasks for visit_id 5095731113063583" before you see the "Awaited all tasks for visit_id 5095731113063583 while finalizing".

This points to a deeper logic bug, so while your fix in #1070 will prevent the error, it would also put the same visit id into the completion queue twice which would break other stuff.

As I can't reproduce your issue, could you add something like

self.logger.error(traceback.format_stack(file=sys.stdout, limit=2))

just before the del so that we can figure out where the second caller is coming from?

in the logs we have 2 times Awaiting all tasks for visit_id 1780613490525049
i provide a new log with requested format_stack

'File "python3.10/asyncio/events.py", line 80, in _run\n    self._context.run(self._callback, *self._args)\n', 

'File "storage/storage_controller.py", line 430, in _run\n    await self.shutdown(update_completion_queue)\n', 
                               
'File "storage/storage_controller.py", line 319, in shutdown\n    t = await self.finalize_visit_id(visit_id, success=False)\n', 
                               
'File "storage/storage_controller.py", line 253, in finalize_visit_id\n    {traceback.format_stack()}\n'

additional logs which might be useful

browser_manager      - INFO     - Starting to work on CommandSequence with visit_id 1780613490525049 on browser with id 3578416253

browser_manager      - INFO     - Finished working on CommandSequence with visit_id 1780613490525049 on browser with id 3578416253
storage_controller   - INFO     - Terminating handler, because the underlying socket closed
storage_controller   - INFO     - Awaiting all tasks for visit_id 1780613490525049
storage_controller   - INFO     - ========================================
storage_controller   - INFO     - Received shutdown signal! in async def should_shutdown in StorageController
storage_controller   - INFO     - Entering self.shutdown
storage_controller   - INFO     - Awaiting all tasks for visit_id 1200
storage_controller   - INFO     - Awaited all tasks for visit_id 1200 while finalizing
sql_provider         - WARNING  - Visit with visit_id 1200 got interrupted

storage_controller   - INFO     - Awaiting all tasks for visit_id 1339
storage_controller   - INFO     - Awaited all tasks for visit_id 1339 while finalizing
sql_provider         - WARNING  - Visit with visit_id 1339 got interrupted
storage_controller   - INFO     - MY DEBUG: completion token achieved

storage_controller   - INFO     - Awaiting all tasks for visit_id 1780613490525049

Executing <Task pending name='Task-1' coro=<StorageController._run() running at storage/storage_controller.py:430> wait_for=<Task pending name='Task-69839' coro=<BCSQLiteStorageProvider.store_record() running at storage/sql_provider.py:81> cb=[Task.task_wakeup(), Task.task_wakeup()] created at python3.10/asyncio/tasks.py:337> cb=[_run_until_complete_cb() at python3.10/asyncio/base_events.py:184] created at python3.10/asyncio/tasks.py:636> took 0.951 seconds

Executing <Task finished name='Task-69966' coro=<BCSQLiteStorageProvider.store_record() done, defined at storage/sql_provider.py:81> result=None created at python3.10/asyncio/tasks.py:337> took 1.971 seconds

Executing <Task finished name='Task-69968' coro=<BCSQLiteStorageProvider.store_record() done, defined at storage/sql_provider.py:81> result=None created at python3.10/asyncio/tasks.py:337> took 0.168 seconds

Executing <Task finished name='Task-69969' coro=<BCSQLiteStorageProvider.store_record() done, defined at storage/sql_provider.py:81> result=None created at python3.10/asyncio/tasks.py:337> took 0.112 seconds

Executing <Task finished name='Task-70038' coro=<BCSQLiteStorageProvider.store_record() done, defined at storage/sql_provider.py:81> result=None created at python3.10/asyncio/tasks.py:337> took 1.976 seconds
Executing <Task finished name='Task-70046' coro=<BCSQLiteStorageProvider.store_record() done, defined at storage/sql_provider.py:81> result=None created at python3.10/asyncio/tasks.py:337> took 0.166 seconds
Executing <Task finished name='Task-70048' coro=<BCSQLiteStorageProvider.store_record() done, defined at storage/sql_provider.py:81> result=None created at python3.10/asyncio/tasks.py:337> took 0.112 seconds
Executing <Task finished name='Task-70053' coro=<BCSQLiteStorageProvider.store_record() done, defined at storage/sql_provider.py:81> result=None created at python3.10/asyncio/tasks.py:337> took 0.168 seconds

storage_controller   - INFO     - Awaited all tasks for visit_id 1780613490525049 while finalizing
storage_controller   - ERROR    - visit_id 1780613490525049, is already awaited, skipping...
storage_controller   - ERROR    - 


'File "python3.10/asyncio/events.py", line 80, in _run\n    self._context.run(self._callback, *self._args)\n', 

'File "storage/storage_controller.py", line 430, in _run\n    await self.shutdown(update_completion_queue)\n', 
                               
'File "storage/storage_controller.py", line 319, in shutdown\n    t = await self.finalize_visit_id(visit_id, success=False)\n', 
                               
'File "storage/storage_controller.py", line 253, in finalize_visit_id\n    {traceback.format_stack()}\n'
                               ]
                              
storage_controller   - INFO     - Awaiting all tasks for visit_id 1340
storage_controller   - INFO     - Awaited all tasks for visit_id 1340 while finalizing
sql_provider         - WARNING  - Visit with visit_id 1340 got interrupted

and at the top of the logs i have

- ERROR    - Traceback (most recent call last):
    manager.execute_command_sequence(command_sequence)
    
  File "openwpm/task_manager.py", line 428, in execute_command_sequence
    agg_queue_size = self.storage_controller_handle.get_most_recent_status()
    
  File "storage/storage_controller.py", line 614, in get_most_recent_status
    raise RuntimeError(
RuntimeError: No status update from the storage controller process for 131 seconds.

and after this exception i can not run any manager.execute_command_sequence
they are not related?

but since the function returns None

        if visit_id not in self.store_record_tasks:
            self.logger.error(
                "visit_id %d, is already awaited, skipping...",
                visit_id,
            )
            return None

https://github.com/openwpm/OpenWPM/pull/1070/commits/57adefc07167035faa1cdf9351f927d328d48d93#diff-5c57c8ae8ef4f387b82ff0868d46bde18281d7b78861bf03c0f7dd440effcb22R225#L225

it doesn't go inside completion_tokens, which means it also doesn't go inside completion_queue on

self.completion_queue.put((visit_id, False))

t = await self.finalize_visit_id(visit_id, success=False)
if t is not None:
completion_tokens[visit_id] = t
await self.structured_storage.flush_cache()
await completion_queue_task
for visit_id, token in completion_tokens.items():
await token
self.completion_queue.put((visit_id, False))
await self.structured_storage.shutdown()

🚀 thank you so much for discovering this bug!

     t = await self.finalize_visit_id(visit_id, success=False) 
-    if t is not None: 
-        completion_tokens[visit_id] = t
+    completion_tokens[visit_id] = t

Would be the first part of the fix, that puts items in the completion queue during shutdown, even when they don't have a completion token. (We can await None and it will instantly return None so this is not a problem.)

As for the original issue, I now think the correct patch is

+ store_record_tasks = self.store_record_tasks[visit_id]
+ del self.store_record_tasks[visit_id]
+ for task in store_record_tasks:
-  for task in self.store_record_tasks[visit_id]:
            await task
- del self.store_record_tasks[visit_id]

This way when we enter the function the second time after yielding in await task we won't see the tasks in the dict anymore.

Hey @MohammadMahdiJavid,

I deployed the fix I described as PR #1073.

Could you test on the latest master branch and confirm everything is fixed?

Hi,

Apologies for the delay in response, each experiment takes a few days to finish
I changed things as shown in the PR, and experimented 2 times, however couldn't regenerate the error
I believe the fix works

although i messed up by forgetting to put some visit ids into completion_queue

i'm gonna test the deployed changes and come back again if anything goes wrong

just a quick note for enhancement
since we raise an exception, in case of exception handling we need to
self._last_status = None in order not to use previous one for new command, right? (applied in the PR)