Concurrently executing a command and pushing event leads to dead lock
sdebionne opened this issue · 5 comments
We have a DS that, when started, continuously push events with push_change_event
in the context of an ancillary thread. When executing the Stop command, the ancillary thread is cleaned up, but sometime the thread is already executing push_change_event
when the stop command is processed. My understanding is that the command takes a lock and then push_change_event
try to take it as well, which gives:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/home/debionne/miniconda3/envs/pytango/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "dead_lock.py", line 26, in run
self._device.push_change_event("noise", self._device.read_noise())
PyTango.DevFailed: DevFailed[
DevError[
desc = Not able to acquire serialization (dev, class or process) monitor
origin = TangoMonitor::get_monitor
reason = API_CommandTimedOut
severity = ERR]
Setting the SerialModel
to NO_SYNC
fixes the issue but it's not a good option.
The following code shows the issue:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import threading
import numpy
from tango import DevState
from tango.server import run
from tango.server import Device
from tango.server import attribute, command
class DeadLock(Device) :
noise = attribute(label="Noise",
dtype=((int,),),
max_dim_x=1024, max_dim_y=1024)
class PushingThread(threading.Thread):
def __init__(self, device):
self._stop = False
self._device = device
super(DeadLock.PushingThread, self).__init__()
self.timestamp = time.time()
def run(self):
while self._stop is False:
self._device.push_change_event("noise", self._device.read_noise())
self.timestamp=time.time()
def init_device(self):
Device.init_device(self)
self._lock = threading.Condition()
self._stop = False
self.set_change_event('noise', True, False)
self._pushing_event_thread = self.PushingThread(self)
self._pushing_event_thread.start()
def read_noise(self):
return numpy.random.random_integers(1000, size=(100, 100))
@command
def start(self):
self.set_state(DevState.ON)
@command
def stop(self):
with self._lock:
self._pushing_event_thread._stop=True
self._lock.notify()
self._pushing_event_thread.join()
self.set_state(DevState.OFF)
if __name__ == "__main__":
DeadLock.run_server()
Hi @sdebionne
Indeed there is a temporary deadlock with this code.
push_change_event() is taking the Tango Monitor lock which is also taken when executing a command or reading/writing an attribute.
push_change_event() is trying to lock the Tango monitor with a timeout (3200ms I think) so if it doesn't succeed before the end of this timeout, you will get this "Not able to acquire serialization (dev, class or process) monitor" exception.
Setting serial model to NO_SYNC disables the Tango Monitor. As you wrote, it would not be a good idea to do that because you would need to handle all the mutex protections yourself.
Please note that if you put push_change_event in a try/catch block and increase the timeout (above 3.2 seconds) on your client side before executing the stop command, the stop command should succeed and the thread won't crash (it will probably report an error but it should exit cleanly).
Hi @bourtemb,
Thanks for looking into this. I am not too familiar with the Tango internals, is there any chance that this limitation could be lifted? I believe that executing a command that generates (asynchronously) an event is a pretty common use case. Could push_change_event()
be decoupled from the Tango Monitor lock, maybe introducing an event queue?
Hi @sdebionne
One way to work around this is to wait on join in another thread, instead of trying to do it in the stop
command handler. That does change the semantics of the command slightly - it becomes "please stop soon", rather than "stop now, and only reply once you are sure things are stopped".
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import numpy
from tango import DevState
from tango.server import Device
from tango.server import attribute, command
class LiveLock(Device) :
noise = attribute(label="Noise",
dtype=((int,),),
max_dim_x=1024, max_dim_y=1024)
class PushingThread(threading.Thread):
def __init__(self, device):
super().__init__()
self.stopped = False
self.device = device
def run(self):
while not self.stopped:
self.device.push_change_event("noise", self.device.read_noise())
def init_device(self):
super().init_device()
self.set_change_event('noise', True, False)
self._pushing_event_thread = self.PushingThread(self)
self._pushing_event_thread.start()
def read_noise(self):
return numpy.random.random_integers(1000, size=(100, 100))
@command
def start(self):
self.set_state(DevState.ON)
@command
def stop(self):
if not self._pushing_event_thread.stopped:
one_shot_stopper = threading.Thread(target=self.stop_pushing)
one_shot_stopper.start()
def stop_pushing(self):
self._pushing_event_thread.stopped = True
self._pushing_event_thread.join()
self.set_state(DevState.OFF)
if __name__ == "__main__":
LiveLock.run_server()
P.S. When I was trying out your code, I found discovered that the threading.Thread
class has a method called _stop
, so setting self._stop = False
in PushingThread
overwrites it. That causes an exception during join
. I used self.stopped
instead.
Hi @bourtemb,
Thanks for looking into this. I am not too familiar with the Tango internals, is there any chance that this limitation could be lifted? I believe that executing a command that generates (asynchronously) an event is a pretty common use case. Could
push_change_event()
be decoupled from the Tango Monitor lock, maybe introducing an event queue?
Hi @sdebionne , I agree with you this limitation is not convenient and I think we should investigate on the reason why the Tango Monitor lock needs to be taken when executing a method like push_change_event.
If we can remove this limitation, we'll try to do it in the future cppTango major releases.
P.S. When I was trying out your code, I found discovered that the threading.Thread class has a method called _stop, so setting self._stop = False in PushingThread overwrites it. That causes an exception during join. I used self.stopped instead.
@ajoubertza Strange that I did not get that error, thanks for taking the time reporting it. It's so easy to fall in this kind of traps with Python... Well, that's the opinion of a C++ developer :-)