SynchronizationCoordinator - improve exception when sync points time out
idanarye opened this issue · 4 comments
from easypy.sync import SYNC, wait
from easypy.concurrency import MultiObject
def foo(i, _sync=SYNC):
wait(i / 10)
_sync.wait_for_everyone(timeout=0.01)
MultiObject(range(2)).call(foo)
Running this snippet results in:
MultiException[BrokenBarrierError]:
┬╼ MultiException[BrokenBarrierError] ╾────────────────────────────────────────╮
│ BrokenBarrierError raised from concurrent invocation (x2/2) │
│ Timestamp: 2019-12-23T19:30:44.926592 │
├───┬╼ BrokenBarrierError ╾────────────────────────────────────────────────────╮
│ │ Timestamp: 2019-12-23T19:30:44.838183 │
│ │ Context: int<M000>, host=IdanArye, thread_ident=140063947085640, threadname=Thread-198
│ │ Traceback: │
│ │ /opt/pyenv/versions/3.4.3/lib/python3.4/concurrent/futures/thread.py:54 .............................................. run >> result = self.fn(*self.args, **self.kwargs)
│ │ /opt/pyenv/versions/teka_3_6_wekapp/lib/python3.4/site-packages/easypy/concurrency.py:527 .... _run_with_exception_logging >> return func(*args, **kwargs)
│ │ /opt/pyenv/versions/teka_3_6_wekapp/lib/python3.4/site-packages/easypy/sync.py:535 ............................... wrapper >> self.abandon()
│ │ /opt/pyenv/versions/teka_3_6_wekapp/lib/python3.4/site-packages/easypy/sync.py:472 ............................... abandon >> self.barrier.wait()
│ │ /opt/pyenv/versions/3.4.3/lib/python3.4/threading.py:607 ............................................................ wait >> self._enter() # Block while the barrier drains.
│ │ /opt/pyenv/versions/3.4.3/lib/python3.4/threading.py:631 .......................................................... _enter >> raise BrokenBarrierError
│ ╰────────────────────────────────────────────────────╼ BrokenBarrierError ╾╯
├───┬╼ BrokenBarrierError ╾────────────────────────────────────────────────────╮
│ │ Timestamp: 2019-12-23T19:30:44.926066 │
│ │ Context: int<M001>, host=IdanArye, thread_ident=140063692905288, threadname=Thread-200
│ │ Traceback: │
│ │ /opt/pyenv/versions/3.4.3/lib/python3.4/concurrent/futures/thread.py:54 .............................................. run >> result = self.fn(*self.args, **self.kwargs)
│ │ /opt/pyenv/versions/teka_3_6_wekapp/lib/python3.4/site-packages/easypy/concurrency.py:527 .... _run_with_exception_logging >> return func(*args, **kwargs)
│ │ /opt/pyenv/versions/teka_3_6_wekapp/lib/python3.4/site-packages/easypy/sync.py:535 ............................... wrapper >> self.abandon()
│ │ /opt/pyenv/versions/teka_3_6_wekapp/lib/python3.4/site-packages/easypy/sync.py:472 ............................... abandon >> self.barrier.wait()
│ │ /opt/pyenv/versions/3.4.3/lib/python3.4/threading.py:607 ............................................................ wait >> self._enter() # Block while the barrier drains.
│ │ /opt/pyenv/versions/3.4.3/lib/python3.4/threading.py:631 .......................................................... _enter >> raise BrokenBarrierError
│ ╰────────────────────────────────────────────────────╼ BrokenBarrierError ╾╯
╰────────────────────────────────────────╼ MultiException[BrokenBarrierError] ╾╯
MultiException[BrokenBarrierError]
. The first thread does not wait (waits 0 / 10 == 0
seconds), and times out waiting for the second thread. When the second thread reaches the sync point - the barrier is already broken.
We should not expect this to succeed, of course - it did timed out - but we should improve the exception to explain what's going on and maybe show the threads that haven't reached the sync point yet?
Two challenges:
- We get a
BrokenBarrierException
when we time out and when we reach a broken barrier. How do we distinguish between the two cases? - Do we format the list of late threads in the
SynchronizationCoordinator
or inasynchronous
? The former covers us with cases that don't use easypy for parallelism, but the later is easier to implement becauseasynchronous
already has full knowledge of the participating threads andSynchronizationCoordinator
only knows how many there are, and adding that new knowledge to it may be costly.
Seems to me like asynchronous
should catch the MultiException[Broken...]
, and having access to the sync_
object (it could look for it in the kwargs), be able to transform this execption into a new SynchronizationFailure
exception.
I don't think using something other than easypy for parallelism is realistic... ;) though honestly, I don't think it's likely that the SyncCoordinator would be used apart from the rest of easypy. And it doesn't seem worth it to make it "standalone".
By the time MultiExceptionBrokenBarrierError]
is created, we are too late - since all threads have reached the sync point, and we can't tell which ones have lingered.
Maybe we could do it in _abandon_when_done
?
aren't you able to track them in the SyncCoord. object? (wait_for_everyone
)
It's possible - though it would increase the memory complexity from O(1)
to O(n)
. Then again - the memory complexity of the code that uses it is already O(n)
...
Maybe I can do something else though - make each object know if it started waiting before or after the first timeout induced BrokenBarrier
, and only store the late threads once they reached the sync point. This does leave a small room for races (late thread reaches the sync point after the timeout but before it was marked), but:
- The chance for this is so low, we might as well ignore it.
- We can make
Another thing I want to consider is to subclass Barrier
and override _wait()
to return a different error on a timeout (and maybe mark that the barrier was broken due to a timeout?). Then again - the only other way a barrier can be broken is if we abort
it, so it might be easier to do the special marking there...