pytransitions/transitions

Race condition when using AsyncTimeout

rgov opened this issue · 8 comments

rgov commented

Consider the following example:

import asyncio

from transitions.extensions.asyncio import AsyncMachine, AsyncTimeout
from transitions.extensions.states import add_state_features


@add_state_features(AsyncTimeout)
class CustomMachine(AsyncMachine):
    pass


class Model:
    async def on_enter_completed(self):
        print('entered completed state')
    
    async def on_enter_waiting(self):
        print('entered waiting state')


async def main():
    model = Model()

    states = [
        'start', 
        { 'name': 'waiting', 'timeout': 0.5, 'on_timeout': 'to_waiting' },
        'completed',
    ]
    machine = CustomMachine(model, states, initial='start')
    machine.add_transition('wait', 'start', 'waiting')
    machine.add_transition('complete', 'waiting', 'completed')

    await model.wait()
    await asyncio.sleep(1.0)
    await model.complete()


loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
loop.close()

The intended behavior is for it to loop on the waiting state a few times, then transition to the completed state and stop. However, the output looks like this:

entered waiting state
entered waiting state
entered completed state
entered waiting state
entered waiting state
entered waiting state
entered waiting state
[...repeats forever...]

It seems that the timer that transitions back to the waiting state does not get properly canceled when we transition to the completed state.

rgov commented

Just to clarify the desired behavior: It should never move from completed back to waiting. It doesn't matter whether it decides to invoke the timeout callback first, or the complete event first, but interleaving them violates the state machine's rules.

The issue happens if the coroutines are interleaved like this:

  1. The _timeout coroutine wakes up from sleep, then awaits the callback coroutine.
  2. But before the callback is invoked, execution passes back to the main coroutine, which moves us to the completed state. Along the way, it tries to cancel the callback, but it can't because it is shielded.
  3. The timeout callback routine gets executed next, and transitions us back to waiting.

I understand the reason to shield the callback (the timer elapsed, and is no longer cancelable). However, I don't know how to resolve this race.

Hi @rgov,

really thoughtful bug report. Much appreciated! The last action of _process_timeout is to remove the future from the runners list which is assumed to be the timeout task that's currently running. But you showed that this assumption can be wrong and may lead to dropping the newly instantiated timeout processor. I removed this action and added a done check when an AsyncState is entered. Your example should behave as intented with a1fa55c. I will prepare a release this week since we are reaching a critical mass of bugfixes. Feel free to comment if you observe more issues with AsyncTimeout or other transitions components

rgov commented

That's a good catch on the assumption.

How confident are you that the async state machine does not have race conditions? It seems like the sequence 1-3 above could still happen, even if it is less common now. But I'm fairly new to Python coroutines, so I may miss how there's a guarantee of no race.

It feels like what I really want is a synchronous state machine where the enter/leave callbacks fire asynchronously, as long as the ordering is guaranteed.

As far as I understand async/await logic, there is no classic race condition as we know from threading since everthing runs in the same thread. However, every await allows another event to be processed and since there are many callbacks being awaited in AsyncState and AsyncTimeout, there is a chance of undesired entanglement. With debug output and some timer tweaking I found such an event:

DEBUG:root:ENTERED WAITING
INFO:transitions.extensions.asyncio:Entered state waiting
DEBUG:transitions.extensions.asyncio:Executed callback after transition.
DEBUG:transitions.extensions.asyncio:Executed machine finalize callbacks

[1] DEBUG:transitions.extensions.asyncio:Timeout state waiting. Timeout triggered. <-- timeout triggered 
[2] DEBUG:transitions.extensions.asyncio:Executed machine preparation callbacks before conditions. <--- but task was suspended and the new trigger is processed.
DEBUG:transitions.extensions.asyncio:Initiating transition from state waiting to state completed...
DEBUG:transitions.extensions.asyncio:Executed callbacks before conditions.

So suspension after [1] needs to be prevented to prevent [2] here.

It feels like what I really want is a synchronous state machine where the enter/leave callbacks fire asynchronously, as long as the ordering is guaranteed.

Technically, AsyncMachine is synchronous since only callbacks of the same category are processed asynchronously. The order of the callback catagories mentioned in the readme (prepare, before, exit, enter, after, finalize, ...) is guaranteed. But 'async/await' logic 'creeps' it's way through the whole code.

One way of making sure that every event is processed without interruption is to introduce locks in AsyncEvent for each model. This way, events will be 'queued'. This alone wont be enough since while complete is being processed, another timeout might already be in the pipe. Adding a transition which is valid from every state (source: '*') but has the condition to be in state waiting, solves this. The timeout will be triggered but since the condition is False, the transition will be halted.

import asyncio

from transitions.extensions.asyncio import AsyncMachine, AsyncTimeout, AsyncEvent
from transitions.extensions.states import add_state_features

import logging


class LockedAsyncEvent(AsyncEvent):

    async def trigger(self, _model, *args, **kwargs):
        self.machine.locks[id(_model)] = self.machine.locks.get(id(_model), asyncio.Lock())
        async with self.machine.locks[id(_model)]:
            return await super(LockedAsyncEvent, self).trigger(_model, *args, **kwargs)


@add_state_features(AsyncTimeout)
class CustomMachine(AsyncMachine):

    event_cls = LockedAsyncEvent

    def __init__(self, *args, **kwargs):
        self.locks = {}
        super(CustomMachine, self).__init__(*args, **kwargs)


class Model:
    async def on_enter_completed(self):
        logging.debug("ENTERED COMPLETE")

    async def on_enter_waiting(self):
        logging.debug("ENTERED WAITING")


async def main():
    model = Model()

    states = [
        'start',
        {'name': 'waiting', 'timeout': 0.5, 'on_timeout': 'timeout_waiting'},
        'completed',
    ]

    transitions = [{'trigger': 'timeout_waiting',
                    'source': '*',
                    'dest': 'waiting',
                    'conditions': 'is_waiting'}]
    machine = CustomMachine(model, states, transitions=transitions, initial='start')
    machine.add_transition('wait', 'start', 'waiting')
    machine.add_transition('complete', 'waiting', 'completed')

    await model.wait()
    await asyncio.sleep(0.5)
    await model.complete()

logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
loop.close()

Ultimately, it might be better to get rid of the shielding in AsyncTimeout (I don't see how though) or maybe wait for the shielded task to finish before processing enter.

rgov commented

If you hold the lock during the execution of all the callbacks, then if a callback transitions to another state, you will deadlock, won't you?

If you hold the lock during the execution of all the callbacks, then if a callback transitions to another state, you will deadlock, won't you?

jupp, pretty much. Unfortunately, there is no asyncio rlock. The discussion I found were like "when you need it, you are not doing it right." You can avoid it by executing the event in a new task:

import asyncio

from transitions.extensions.asyncio import AsyncMachine, AsyncTimeout, AsyncEvent
from transitions.extensions.states import add_state_features

import logging


class LockedAsyncEvent(AsyncEvent):

    async def trigger(self, _model, *args, **kwargs):
        self.machine.locks[id(_model)] = self.machine.locks.get(id(_model), asyncio.Lock())
        async with self.machine.locks[id(_model)]:
            return await super(LockedAsyncEvent, self).trigger(_model, *args, **kwargs)


@add_state_features(AsyncTimeout)
class CustomMachine(AsyncMachine):

    event_cls = LockedAsyncEvent

    def __init__(self, *args, **kwargs):
        self.locks = {}
        super(CustomMachine, self).__init__(*args, **kwargs)


class Model:

    async def on_enter_waiting(self):
        print("entered")
        asyncio.create_task(self.to_completed())  # this would deadlock if awaited


async def main():
    model = Model()

    states = [
        'start',
        'waiting',
        'completed',
    ]
    machine = CustomMachine(model, states, initial='start')

    await model.to_waiting()
    print(model.state)

logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())

I investigated the process a bit further and the problem appears when timout AND new event trigger happen in the same context (no cancellation triggered) and both hit the asynchronous state exit functions at the same time. Unfortunately, change_state cannot be locked since it is possible that exit/enter callbacks contain event triggers. Simulating an RLock could help here but there is no hierachical relationship between tasks. It is possible to override the task factory to create a hierarchical structure (ref). As far as I can tell creating such a factory could mess with the user's code though (if the user also used a custom factory). If anyone has an idea about how to conceptually solve this issue, please let me know.

The only simple solution I see here is to queue tasks by passing queued=True to the constructor to prevent this racing condition. Using unconditional to_waiting should also be avoid because it is valid from every destination. Using unless='is_completed' assures that if timeout event and trigger event happen simoultaneously, the timeout event is aborted when processed without the need of ignore_invalid_trigger

import asyncio

from transitions.extensions.asyncio import AsyncMachine, AsyncTimeout
from transitions.extensions.states import add_state_features


@add_state_features(AsyncTimeout)
class CustomMachine(AsyncMachine):
    pass


class Model:
    async def on_enter_completed(self):
        print('entered completed state')

    async def on_enter_waiting(self):
        print('entered waiting state')

    async def timeout_triggered(self):
        # print('timeout triggered')
        await self.wait()


async def main():

    states = [
        'start',
        {'name': 'waiting', 'timeout': 0.5, 'on_timeout': 'timeout_triggered'},
        'completed',
    ]

    model = Model()
    machine = CustomMachine(model, states=states, initial='start', queued=True)
    machine.add_transition('wait', '*', 'waiting', unless='is_completed')
    machine.add_transition('complete', 'waiting', 'completed')

    await model.wait()
    await asyncio.sleep(0.502)  # fiddle around to check the varying output
    await model.complete()
    print(model.state)

asyncio.run(main())