tango-controls/TangoTickets

Concurrently executing a command and pushing event leads to dead lock

sdebionne opened this issue · 5 comments

We have a DS that, when started, continuously push events with push_change_event in the context of an ancillary thread. When executing the Stop command, the ancillary thread is cleaned up, but sometime the thread is already executing push_change_event when the stop command is processed. My understanding is that the command takes a lock and then push_change_event try to take it as well, which gives:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/home/debionne/miniconda3/envs/pytango/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "dead_lock.py", line 26, in run
    self._device.push_change_event("noise", self._device.read_noise())
PyTango.DevFailed: DevFailed[
DevError[
    desc = Not able to acquire serialization (dev, class or process) monitor
  origin = TangoMonitor::get_monitor
  reason = API_CommandTimedOut
severity = ERR]

Setting the SerialModel to NO_SYNC fixes the issue but it's not a good option.

The following code shows the issue:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import threading
import numpy
from tango import DevState
from tango.server import run
from tango.server import Device
from tango.server import attribute, command

class DeadLock(Device) :
    
    noise = attribute(label="Noise",
                  dtype=((int,),),
                  max_dim_x=1024, max_dim_y=1024)
    
    class PushingThread(threading.Thread):
        def __init__(self, device):
            self._stop = False
            self._device = device
            super(DeadLock.PushingThread, self).__init__()
            self.timestamp = time.time()
        def run(self):
            while self._stop is False:
                self._device.push_change_event("noise", self._device.read_noise())
                self.timestamp=time.time()

    def init_device(self):
        Device.init_device(self)
        
        self._lock = threading.Condition()
        self._stop = False
        
        self.set_change_event('noise', True, False)
        
        self._pushing_event_thread = self.PushingThread(self)
        self._pushing_event_thread.start()
    
    def read_noise(self):
        return numpy.random.random_integers(1000, size=(100, 100))
    
    @command
    def start(self):    
        self.set_state(DevState.ON)
    
    @command
    def stop(self):
        with self._lock:
            self._pushing_event_thread._stop=True
            self._lock.notify()
        self._pushing_event_thread.join()
        self.set_state(DevState.OFF)

if __name__ == "__main__":
    DeadLock.run_server()

Hi @sdebionne

Indeed there is a temporary deadlock with this code.
push_change_event() is taking the Tango Monitor lock which is also taken when executing a command or reading/writing an attribute.
push_change_event() is trying to lock the Tango monitor with a timeout (3200ms I think) so if it doesn't succeed before the end of this timeout, you will get this "Not able to acquire serialization (dev, class or process) monitor" exception.

Setting serial model to NO_SYNC disables the Tango Monitor. As you wrote, it would not be a good idea to do that because you would need to handle all the mutex protections yourself.

Please note that if you put push_change_event in a try/catch block and increase the timeout (above 3.2 seconds) on your client side before executing the stop command, the stop command should succeed and the thread won't crash (it will probably report an error but it should exit cleanly).

Hi @bourtemb,

Thanks for looking into this. I am not too familiar with the Tango internals, is there any chance that this limitation could be lifted? I believe that executing a command that generates (asynchronously) an event is a pretty common use case. Could push_change_event() be decoupled from the Tango Monitor lock, maybe introducing an event queue?

Hi @sdebionne

One way to work around this is to wait on join in another thread, instead of trying to do it in the stop command handler. That does change the semantics of the command slightly - it becomes "please stop soon", rather than "stop now, and only reply once you are sure things are stopped".

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import threading
import numpy
from tango import DevState
from tango.server import Device
from tango.server import attribute, command


class LiveLock(Device) :

    noise = attribute(label="Noise",
                      dtype=((int,),),
                      max_dim_x=1024, max_dim_y=1024)

    class PushingThread(threading.Thread):
        def __init__(self, device):
            super().__init__()
            self.stopped = False
            self.device = device

        def run(self):
            while not self.stopped:
                self.device.push_change_event("noise", self.device.read_noise())

    def init_device(self):
        super().init_device()
        self.set_change_event('noise', True, False)
        self._pushing_event_thread = self.PushingThread(self)
        self._pushing_event_thread.start()

    def read_noise(self):
        return numpy.random.random_integers(1000, size=(100, 100))

    @command
    def start(self):
        self.set_state(DevState.ON)

    @command
    def stop(self):
        if not self._pushing_event_thread.stopped:
            one_shot_stopper = threading.Thread(target=self.stop_pushing)
            one_shot_stopper.start()

    def stop_pushing(self):
        self._pushing_event_thread.stopped = True
        self._pushing_event_thread.join()
        self.set_state(DevState.OFF)


if __name__ == "__main__":
    LiveLock.run_server()

P.S. When I was trying out your code, I found discovered that the threading.Thread class has a method called _stop, so setting self._stop = False in PushingThread overwrites it. That causes an exception during join. I used self.stopped instead.

Hi @bourtemb,

Thanks for looking into this. I am not too familiar with the Tango internals, is there any chance that this limitation could be lifted? I believe that executing a command that generates (asynchronously) an event is a pretty common use case. Could push_change_event() be decoupled from the Tango Monitor lock, maybe introducing an event queue?

Hi @sdebionne , I agree with you this limitation is not convenient and I think we should investigate on the reason why the Tango Monitor lock needs to be taken when executing a method like push_change_event.
If we can remove this limitation, we'll try to do it in the future cppTango major releases.

P.S. When I was trying out your code, I found discovered that the threading.Thread class has a method called _stop, so setting self._stop = False in PushingThread overwrites it. That causes an exception during join. I used self.stopped instead.

@ajoubertza Strange that I did not get that error, thanks for taking the time reporting it. It's so easy to fall in this kind of traps with Python... Well, that's the opinion of a C++ developer :-)