Problems when running PyWPS with Dask in async and sync modes
Opened this issue · 2 comments
In the context of PAVICS, we are observing that the PyWPS process queues of our services (which are backed by a Postgres database) regularly fill up with stuck processes (in the "stated" state, but not progressing in terms of their complete percentage).
Our first assumption was that the problem is at the PyWPS level, but after a careful analysis I have established that it's rather related with dask, xarray and netcdf, which are used by our processes.
A process gets stuck when it is called in async mode, and following a (successful) process that was called in sync mode, by the same server instance. This is important because a series of exclusively sync calls, or alternatively a series of exclusively async calls, does NOT trigger the problem. It's only when the server needs to operate in both modes, with code using the libraries mentioned above, that the issue arises.
To be more precise, I have been able to reproduce and isolate two different types of crash in our environment, and I'm not certain to what extent they are related. One thing is clear: they are both triggered by the same sync + async sequence described above.
The first problem requires that the server runs through gunicorn
, and hangs exactly there: https://github.com/pydata/xarray/blob/7a65d59637efb0c78d5f657f3178ffb332a54dfb/xarray/backends/common.py#L157, when trying to save a NetCDF file.
After investigating this problem for a while, and because by now the principal suspect was clearly Dask (which makes sense, given that this whole setup is making use of no less than THREE different layers of multiprocessing, with gunicorn, PyWPS and Dask!), I had the idea of trying to refactor our Dask scheduler setup. In our current code, we are using the default scheduler, so I simply introduced a dask.distributed.Client
, connecting to an externally running dask scheduler, at the location in the code which seems to make the most sense for this: the beginning of the PyWPS process _handler
. This experiment lead to the discovery of a second problem, which is described in this issue: dask/distributed#5334, because it is easier to isolate and reproduce.
Given that it's related to other similar existing issues in the dask.distributed
project, this second problem has a potential fix: setting the multiprocessing.set_start_method to spawn
, instead of its default fork
(at least on Linux). This however leads to pickle
-related problems, because the process and WPS request and response classes contain some hard to serialize objects. Among many things, I have tried to replace pickle
with dill
, which is supposedly more powerful, to no avail.
So in the end we are stuck with these problems (or maybe it's a unique problem?) and my only mitigation solution for now is to make birdy, which is the client to talk with our WPS processes, async-only, which is clearly not an ideal solution.
In our Rook WPS we are using clisops with xarray
and dask
. In our production deployment we run async
processes on a Slurm cluster. This gets around the multiprocessing
in pywps
. With Slurm it seems to work ok ... I have not seen stucked jobs ...
There is also still another issue with multiprocessing
, see #508.
@cehbrecht I suggest adding a new dask processing backend to see if it solves some of our issues.
The rough plan would be to have a new mode
option, and new CONFIG values:
[processing]
mode = "dask"
dask_n_workers = "10"
dask_scheduler = "http://xxx.xxx.x.x"
At initialization, pywps would connect to the scheduler if it exists, otherwise create a new one. Then a new DaskProcessing class would instruct PyWPS on how to start and cancel processes using the dask backend.