RhysU/jobserver

Adjust sentinels so this does not hang

RhysU opened this issue · 2 comments

RhysU commented

See if the sentinel handling can be amended so that this does not hang:

from typing import Callable
from jobserver import Blocked, Future, Jobserver


def work(jobserver: Jobserver, func: Callable[..., int], head, *rest) -> int:
    if not rest:
        return func(head)

    while True:
        try:
            print(head, rest)
            future = jobserver.submit(
                work,
                args=(jobserver, func, *rest),
                timeout=0.5
            )
            return func(head) + future.result()
        except Blocked:
            pass


if __name__ == '__main__':
    jobserver = Jobserver(context="forkserver", slots=2)

    future = jobserver.submit(
        fn=work,
        args=(jobserver, len, [1], [1, 2], [1, 2, 3], [1, 2, 3, 4]),
        timeout=0.5
    )

    future.when_done(print, "Done")
    print(future.result())
    assert future.result() == 10
RhysU commented

Root cause of the hang is that in each subprocess the getstate/setstate logic elides all of the sentinels at the higher recursive levels. That is, getstate is too aggressive.

Possibly no on my prior hunch-- entirely removing the custom getstate/setstate does not seem to resolve it. That is,

diff --git a/jobserver.py b/jobserver.py
index b893332..619b8b6 100644
--- a/jobserver.py
+++ b/jobserver.py
@@ -344,17 +344,17 @@ class Jobserver:
         # Tracks outstanding Futures (and wait-able sentinels)
         self._future_sentinels = {}  # type: typing.Dict[Future, int]
 
-    def __getstate__(self) -> typing.Tuple:
-        """Get instance state without exposing in-flight Futures."""
-        # Required because Futures can be neither copied nor pickled.
-        # Without custom handling of Futures, submit(...) would fail
-        # whenever an instance is part of an argument to a sub-Process.
-        return self._context, self._slots, {}
+    #def __getstate__(self) -> typing.Tuple:
+    #    """Get instance state without exposing in-flight Futures."""
+    #    # Required because Futures can be neither copied nor pickled.
+    #    # Without custom handling of Futures, submit(...) would fail
+    #    # whenever an instance is part of an argument to a sub-Process.
+    #    return self._context, self._slots, {}
 
-    def __setstate__(self, state: typing.Tuple) -> None:
-        """Set instance state."""
-        assert isinstance(state, tuple) and len(state) == 3
-        self._context, self._slots, self._future_sentinels = state
+    #def __setstate__(self, state: typing.Tuple) -> None:
+    #    """Set instance state."""
+    #    assert isinstance(state, tuple) and len(state) == 3
+    #    self._context, self._slots, self._future_sentinels = state
 
     def __copy__(self) -> "Jobserver":
         """Shallow copies return the original Jobserver unchanged."""