Execution terminates early with Dask.distributed
WilliamCVan opened this issue · 4 comments
from time import sleep
import time
import timeit
from dask.distributed import Client
def inc(x):
sleep(1) # simulate actual work
return x + 1
if __name__ == '__main__':
client = Client()
source = Stream()
source.scatter().map(inc).buffer(5).gather().sink(print) # numbers 5-10 get cutoff and not printed
starttime = timeit.default_timer()
for i in range(10):
source.emit(i)
print("The time difference is :", timeit.default_timer() - starttime)
client.close()
print("done")
I'm having an issue where the last emits are getting cut off, so numbers 5-10 are not getting printed at all. The call is exiting even though there are items in the buffer. How do I prevent the execution from moving to print("done") until all numbers are processed? I'm getting this error due to early termination:
1
2
3
4
The time difference is : 1.1211735000000012
distributed.worker - WARNING - Heartbeat to scheduler failed
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x0000025263D613C8>>, <Future finished exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
File "C:\Users\Admin\.conda\envs\PyTorchENV\lib\site-packages\distributed\comm\tcp.py", line 184, in read
n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed
...
done
Streamz supports backpressure. In your pipeline, this is provided by the buffer
node, which will pause accepting inputs whenever it has five items in flight. As soon as it has fewer, a new item is accepted, or if there are no more, your emit
loop will end.
The simplest solution to your situation is to wait (e.g., time.sleep
) enough time.
Alternatively, omit the buffer step, so that your input waits on each event exiting the pipeline.
To do something more sophisticated, you would need to either use the async API, or make use of reference counting.
The time.sleep is too slow and removing the buffer would cause things to execute synchronously which is also a no go. I wanted to add parallel execution if possible to reduce wait times. The reference counting was mentioned for Kafka, although I am not using it.
If I switched to async API, that would pretty much mean not using the Streamz library...for my example above at least?
The time.sleep is too slow
I'm not sure I follow; but you can have your pipeline output to some other object you can watch and wait on that, like:
watchlist = []
source.scatter().map(inc).buffer(5).gather().sink(watchlist.append)
...
while len(watchlist) < 10
time.sleep(0.01)
The reference counting was mentioned for Kafka, although I am not using it.
Yes, it was developed for that use case, but doesn't require it.
If I switched to async API, that would pretty much mean not using the Streamz library...for my example above at least?
Streamz does support async and is implemented with async internally
Thanks for the sleep watchlist example, I think I'll go with that solution for my use case