Adjust sentinels so this does not hang
RhysU opened this issue · 2 comments
RhysU commented
See if the sentinel handling can be amended so that this does not hang:
from typing import Callable
from jobserver import Blocked, Future, Jobserver
def work(jobserver: Jobserver, func: Callable[..., int], head, *rest) -> int:
if not rest:
return func(head)
while True:
try:
print(head, rest)
future = jobserver.submit(
work,
args=(jobserver, func, *rest),
timeout=0.5
)
return func(head) + future.result()
except Blocked:
pass
if __name__ == '__main__':
jobserver = Jobserver(context="forkserver", slots=2)
future = jobserver.submit(
fn=work,
args=(jobserver, len, [1], [1, 2], [1, 2, 3], [1, 2, 3, 4]),
timeout=0.5
)
future.when_done(print, "Done")
print(future.result())
assert future.result() == 10
RhysU commented
Root cause of the hang is that in each subprocess the getstate/setstate logic elides all of the sentinels at the higher recursive levels. That is, getstate is too aggressive.
RhysU commented
Possibly no on my prior hunch-- entirely removing the custom getstate/setstate does not seem to resolve it. That is,
diff --git a/jobserver.py b/jobserver.py
index b893332..619b8b6 100644
--- a/jobserver.py
+++ b/jobserver.py
@@ -344,17 +344,17 @@ class Jobserver:
# Tracks outstanding Futures (and wait-able sentinels)
self._future_sentinels = {} # type: typing.Dict[Future, int]
- def __getstate__(self) -> typing.Tuple:
- """Get instance state without exposing in-flight Futures."""
- # Required because Futures can be neither copied nor pickled.
- # Without custom handling of Futures, submit(...) would fail
- # whenever an instance is part of an argument to a sub-Process.
- return self._context, self._slots, {}
+ #def __getstate__(self) -> typing.Tuple:
+ # """Get instance state without exposing in-flight Futures."""
+ # # Required because Futures can be neither copied nor pickled.
+ # # Without custom handling of Futures, submit(...) would fail
+ # # whenever an instance is part of an argument to a sub-Process.
+ # return self._context, self._slots, {}
- def __setstate__(self, state: typing.Tuple) -> None:
- """Set instance state."""
- assert isinstance(state, tuple) and len(state) == 3
- self._context, self._slots, self._future_sentinels = state
+ #def __setstate__(self, state: typing.Tuple) -> None:
+ # """Set instance state."""
+ # assert isinstance(state, tuple) and len(state) == 3
+ # self._context, self._slots, self._future_sentinels = state
def __copy__(self) -> "Jobserver":
"""Shallow copies return the original Jobserver unchanged."""