python-streamz/streamz

Execution terminates early with Dask.distributed

WilliamCVan opened this issue · 4 comments

    from time import sleep
    import time
    import timeit
    from dask.distributed import Client

    def inc(x):
        sleep(1)  # simulate actual work
        return x + 1

   if __name__ == '__main__':
    client = Client()
    source = Stream()
    source.scatter().map(inc).buffer(5).gather().sink(print)  # numbers 5-10 get cutoff and not printed

    starttime = timeit.default_timer()
    for i in range(10):
        source.emit(i)

    print("The time difference is :", timeit.default_timer() - starttime)

    client.close()
    print("done")

I'm having an issue where the last emits are getting cut off, so numbers 5-10 are not getting printed at all. The call is exiting even though there are items in the buffer. How do I prevent the execution from moving to print("done") until all numbers are processed? I'm getting this error due to early termination:

1
2
3
4
The time difference is : 1.1211735000000012
distributed.worker - WARNING - Heartbeat to scheduler failed
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x0000025263D613C8>>, <Future finished exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "C:\Users\Admin\.conda\envs\PyTorchENV\lib\site-packages\distributed\comm\tcp.py", line 184, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed
...
done

Streamz supports backpressure. In your pipeline, this is provided by the buffer node, which will pause accepting inputs whenever it has five items in flight. As soon as it has fewer, a new item is accepted, or if there are no more, your emit loop will end.

The simplest solution to your situation is to wait (e.g., time.sleep) enough time.
Alternatively, omit the buffer step, so that your input waits on each event exiting the pipeline.

To do something more sophisticated, you would need to either use the async API, or make use of reference counting.

The time.sleep is too slow and removing the buffer would cause things to execute synchronously which is also a no go. I wanted to add parallel execution if possible to reduce wait times. The reference counting was mentioned for Kafka, although I am not using it.

If I switched to async API, that would pretty much mean not using the Streamz library...for my example above at least?

The time.sleep is too slow

I'm not sure I follow; but you can have your pipeline output to some other object you can watch and wait on that, like:

watchlist = []
source.scatter().map(inc).buffer(5).gather().sink(watchlist.append)
...
while len(watchlist) < 10
    time.sleep(0.01)

The reference counting was mentioned for Kafka, although I am not using it.

Yes, it was developed for that use case, but doesn't require it.

If I switched to async API, that would pretty much mean not using the Streamz library...for my example above at least?

Streamz does support async and is implemented with async internally

Thanks for the sleep watchlist example, I think I'll go with that solution for my use case