weka/easypy

SynchronizationCoordinator - improve exception when sync points time out

idanarye opened this issue · 4 comments

from easypy.sync import SYNC, wait
from easypy.concurrency import MultiObject


def foo(i, _sync=SYNC):
    wait(i / 10)
    _sync.wait_for_everyone(timeout=0.01)

MultiObject(range(2)).call(foo)

Running this snippet results in:

MultiException[BrokenBarrierError]: 
┬╼ MultiException[BrokenBarrierError] ╾────────────────────────────────────────╮
│   BrokenBarrierError raised from concurrent invocation (x2/2)                │
│   Timestamp: 2019-12-23T19:30:44.926592                                      │
├───┬╼ BrokenBarrierError ╾────────────────────────────────────────────────────╮
│   │   Timestamp: 2019-12-23T19:30:44.838183                                  │
│   │   Context: int<M000>, host=IdanArye, thread_ident=140063947085640, threadname=Thread-198
│   │   Traceback:                                                             │
│   │     /opt/pyenv/versions/3.4.3/lib/python3.4/concurrent/futures/thread.py:54 .............................................. run >> result = self.fn(*self.args, **self.kwargs)
│   │     /opt/pyenv/versions/teka_3_6_wekapp/lib/python3.4/site-packages/easypy/concurrency.py:527 .... _run_with_exception_logging >> return func(*args, **kwargs)
│   │     /opt/pyenv/versions/teka_3_6_wekapp/lib/python3.4/site-packages/easypy/sync.py:535 ............................... wrapper >> self.abandon()
│   │     /opt/pyenv/versions/teka_3_6_wekapp/lib/python3.4/site-packages/easypy/sync.py:472 ............................... abandon >> self.barrier.wait()
│   │     /opt/pyenv/versions/3.4.3/lib/python3.4/threading.py:607 ............................................................ wait >> self._enter() # Block while the barrier drains.
│   │     /opt/pyenv/versions/3.4.3/lib/python3.4/threading.py:631 .......................................................... _enter >> raise BrokenBarrierError
│   ╰────────────────────────────────────────────────────╼ BrokenBarrierError ╾╯
├───┬╼ BrokenBarrierError ╾────────────────────────────────────────────────────╮
│   │   Timestamp: 2019-12-23T19:30:44.926066                                  │
│   │   Context: int<M001>, host=IdanArye, thread_ident=140063692905288, threadname=Thread-200
│   │   Traceback:                                                             │
│   │     /opt/pyenv/versions/3.4.3/lib/python3.4/concurrent/futures/thread.py:54 .............................................. run >> result = self.fn(*self.args, **self.kwargs)
│   │     /opt/pyenv/versions/teka_3_6_wekapp/lib/python3.4/site-packages/easypy/concurrency.py:527 .... _run_with_exception_logging >> return func(*args, **kwargs)
│   │     /opt/pyenv/versions/teka_3_6_wekapp/lib/python3.4/site-packages/easypy/sync.py:535 ............................... wrapper >> self.abandon()
│   │     /opt/pyenv/versions/teka_3_6_wekapp/lib/python3.4/site-packages/easypy/sync.py:472 ............................... abandon >> self.barrier.wait()
│   │     /opt/pyenv/versions/3.4.3/lib/python3.4/threading.py:607 ............................................................ wait >> self._enter() # Block while the barrier drains.
│   │     /opt/pyenv/versions/3.4.3/lib/python3.4/threading.py:631 .......................................................... _enter >> raise BrokenBarrierError
│   ╰────────────────────────────────────────────────────╼ BrokenBarrierError ╾╯
╰────────────────────────────────────────╼ MultiException[BrokenBarrierError] ╾╯

MultiException[BrokenBarrierError]. The first thread does not wait (waits 0 / 10 == 0 seconds), and times out waiting for the second thread. When the second thread reaches the sync point - the barrier is already broken.

We should not expect this to succeed, of course - it did timed out - but we should improve the exception to explain what's going on and maybe show the threads that haven't reached the sync point yet?

Two challenges:

  1. We get a BrokenBarrierException when we time out and when we reach a broken barrier. How do we distinguish between the two cases?
  2. Do we format the list of late threads in the SynchronizationCoordinator or in asynchronous? The former covers us with cases that don't use easypy for parallelism, but the later is easier to implement because asynchronous already has full knowledge of the participating threads and SynchronizationCoordinator only knows how many there are, and adding that new knowledge to it may be costly.

Seems to me like asynchronous should catch the MultiException[Broken...], and having access to the sync_ object (it could look for it in the kwargs), be able to transform this execption into a new SynchronizationFailure exception.

I don't think using something other than easypy for parallelism is realistic... ;) though honestly, I don't think it's likely that the SyncCoordinator would be used apart from the rest of easypy. And it doesn't seem worth it to make it "standalone".

By the time MultiExceptionBrokenBarrierError] is created, we are too late - since all threads have reached the sync point, and we can't tell which ones have lingered.

Maybe we could do it in _abandon_when_done?

aren't you able to track them in the SyncCoord. object? (wait_for_everyone)

It's possible - though it would increase the memory complexity from O(1) to O(n). Then again - the memory complexity of the code that uses it is already O(n)...

Maybe I can do something else though - make each object know if it started waiting before or after the first timeout induced BrokenBarrier, and only store the late threads once they reached the sync point. This does leave a small room for races (late thread reaches the sync point after the timeout but before it was marked), but:

  1. The chance for this is so low, we might as well ignore it.
  2. We can make

Another thing I want to consider is to subclass Barrier and override _wait() to return a different error on a timeout (and maybe mark that the barrier was broken due to a timeout?). Then again - the only other way a barrier can be broken is if we abort it, so it might be easier to do the special marking there...