geopython/pywps

More robust queue handling

Opened this issue · 18 comments

As requested in the issue #455 we need to improve the queue handling.

I'm thinking about creating PyWPS deamon, which would be responsible for starting (spawning?) processes in the background, taking requests from the database table.

We already introduced a service to handle the job queue in #505.

I currently working on removing stalled jobs from the queue. Stalled jobs are identified by a timeout parameter (1 day or so) and I can terminate the jobs and set the status to "failed". In addition one needs to update the status document of the job that is pulled by the client. At this point I'm stuck ... the information needed was in the pywps_stored_requests database ... but it was removed when the process was started.

Here the current work:
master...cehbrecht:dev-cancel

@jachym @davidcaron what would you recommend to solve this issue? Update the database and keep the stored request?

I would also add one of the recent requests:

  • Being able to "kill" the process
  • Being able to restart manually failing process

Process fail can be caused not only on subjective manner (e.g. fail in the data), but also because of some objective reason (disk full).

At the moment, process request is stored only in the table pywps_requests - once the process is started, it is deleted from this table and there is now way, how to restore it.

Would be cool to change this mechanism, so that we have record of the full request.

At the moment, process request is stored only in the table pywps_requests - once the process is started, it is deleted from this table and there is now way, how to restore it.

Would be cool to change this mechanism, so that we have record of the full request.

What solution would you propose? Copy the request infos to the job table? Or adding a flag to the request table with a flag/status: new/done.

When I implemented the function pop_first_stored, it was at a time when there was no worker, and most requests would not happen to get stored in the pywps_stored_requests table.

The problem I fixed was that the failed requests were not removed from the pywps_stored_requests table and they counted when came the time to check how many stored requests there was. The queue was filling quickly when the requests were failing.

I think like you said @cehbrecht we should either add a flag in the stored_requests table for the status (but this feels a bit weird because the jobs table is the one holding the status) or add the requests information in the jobs table for the Execute requests. I think the latter would make more sense, as I feel the purpose of the stored_requests table is to act like a message queue.

What information do you need @cehbrecht in the request, that is not currently stored in the jobs table?

we need to create a wps response object to write the status document:

wps_response=job.wps_response)

def _update_status(self, status, message, status_percentage, clean=True):

This is the template used:
https://github.com/geopython/pywps/blob/master/pywps/templates/1.0.0/execute/main.xml

Ok, I see... maybe without doing a database migration, we could:

  • change the pop_first_stored function to not delete the stored request and instead set the status to WPS_STATUS.STARTED in the jobs table using a join on the uuid
  • change the get_process_counts function to take this into account (count stored requests that have their status as WPS_STATUS.ACCEPTED only, or something along those lines)

This way the stored request would still be available. We could delete the row in stored_requests when the process succeeds or fails eventually.

I suggest, we use the flag and eventually remove the request, when it's successfully done (i hoe, this is n line with @davidcaron )

I suggest, we use the flag and eventually remove the request, when it's successfully done (i hoe, this is n line with @davidcaron )

Currently I also would favor to set a flag in the stored_requests table. Changing the database should not be an issue. I can modify the code in this direction.

This issue is IMHO related (with proposed solution) to #491 which could be done together

huard commented

I'd like to revive this issue. There is a discussion planned:
Thursday, April 22 · 9:00 – 10:00am (EST)
Video call link: https://meet.google.com/nnv-ujjp-xao

Objective:

  • Reach shared understanding of issue
  • Tentative roadmap

Agenda:

  • Introduction to this issue (@jachym, @cehbrecht ?) - 10 min
  • Requirements for potential solutions (@tomkralidis I suspect we might want to account for pygeoapi here) - 10 min
  • Options - 30 min
  • Next steps - 10 min

I've tried to identify the various issues that are tied to this topic.

Relevant issues:

  • Failed processes not cleaned-up: #448
  • Dismiss operation: #417
  • Load with concurrency: #346
  • Process prioritization #491
  • multiprocessing: #508
  • Celery: #237
  • #461

We also have reports that mixing sync and async processes can lead to server hanging-up.

Relevant PRs:

  • Queue race conditions: #455

Some material:

  • The default job queue in pywps (simply made) has the main issue that stalled jobs fill up the queue and no new jobs will be accepted.
  • Workaround: a cron job running a job-queue cleaner?
  • Design goal: to get started with pywps we don't want much infrastructure ... but in production we need something robust. It would be ok to have 2 or more implementations to achieve this goal.
  • pywps is already prepared for different job queue implementations. By default it is using multiprocessing. We also have a slurm implementation to get around the default queue issues. See here: https://github.com/geopython/pywps/tree/pywps-4.4/pywps/processing
  • Alternative job queues:
huard commented

Meeting Notes

Present: @cehbrecht, @tlvu, @aulemahal, @dbyrns, @cjauvin, @huard

  • In production, we're regularly having issues with queues filling up.
  • It's possible some processes hang forever, and in this case, there is no clean-up mechanism. Jobs in the queue are triggered when processes either complete or fail. There is no mechanism to detect and clean processes that hang after, ie a timeout.
  • It's important to keep PyWPS simple to launch. The default mechanism should not require additional dependencies or complex setup.
  • A more robust queue management system could be implemented, but as an optional plugin, not the default mechanism.
  • In production, PyWPS can be operated from "application deployment and execution services" that handle the job queue themselves. For example, it could be that pygeoapi does the queue management for PyWPS. So one question is whether the queue manager should be in PyWPS or external.
  • No clear preference for plugin queue manager, but should be "not too complicated" and supported by a strong community.

Action items

  • Create unit tests that expose clearly issues we see in production. #600
  • Get more information on timelines and implementation plans for pygeoapi/PyWPS interactions
  • Explore and document queue management libraries

Hello,

Maybe a naive solution would be a pure python daemon that serve HTTP, this daemon can be accessible by proxy. In that case the daemon can have a good control of his sub-process. This daemon can run as any regular user.

This also make pywps easy to test as standalone HTTP server.

Best regards.

Hello,

I did some successful test regarding having a daemon accessible through a proxy. I did not change the code extensively I just looked if I can transform the current code as http server. The proxy configuration must handle wpsoutput and redirect the WPS request to the server. The code can be found in my repository:
gschwind/PyWPS@058271d
and an example of usage:
gschwind/PyWPS@6f5fcb9

As we can see the current modification are minimal, but the code currently does not solve any issue but allow to do it later.

Best regards

Hi @cehbrecht, @tlvu, @aulemahal, @dbyrns, @cjauvin, @huard

I think I have a general understanding problem regarding async processes and pyWPS. I use mainly async processes and have a NGINX/GUNICORN setup, where I configure the maximum parallel processes based on available CPUs. Now in production I also experience issues with jobs that get submitted, and accepted, but then never processed. I assume this is a queue problem. Jobs get added to the queue when I exceed my maximum parallel processes, but then are left in the (default memory sqlite). Seems that finished jobs are not removed from the queue, so pending jobs are never picket up and stay forever on 0% progress but remain accepted? My question is:

  • Do I need the joblauncher script in this case (not using an external scheduler)?
  • Is pyWPS supposed to pick up pending jobs from the queue or do you need to run an extra scheduler for this?

Thanks

Hello @geotom

The memory sqlite is another issue, look at #495

Hello,

I triggered the case where my WPS does not accept more request due to requests that are killed before their clean termination. In my case this append in out-of-memory situation, but this can be easily triggered by reboot or kill -9. The issue is that request status is updated by the process, and if the process is killed before it write that it have finished, the process will stay in that state forever, counting as running process. I don't think adding flag to the database can solve this issue.

Note that my previous suggest to use daemon does not solve the entire issue, for example reboot may leave unfinished request. I found two heuristic to fix the issue in more or less safe way. In the daemon case, we can generate a daemon session uuid, and at restart of the daemon all un-finished requests that do not belong to the session ID get marked as finished.

The second heuristic is in the case that we do not have daemon. When pywps run out of slot for running a new request he can check if the pid of the request is still alive. If not he can mark this request as failed. This solution is safe but not fully accurate because linux can reuse pid, but at less if the pid is not present, we are sure that the process is finished. To make it more accurate I thought to tag process using /proc/pid/environ, but this require use of execve, and that need change the way that sub-process are spawned. I can also use /proc/pid/cmdline and check if it does not match our /proc/self/cmdline to ensure it's not our process, because fork preserve cmdline. I guess. Notes that this solution may leave status of failled process in running state for a long time because it require a new request without available slot. Idealy this heuristic should be triggered each time the user request the status of the process, which it is not possible in current implementation of the status.

I will try to implement the last method without tag, which should solve 99% of the issue I guess.

Following my previews comment, I did the following related pull request #659

Best regards