noxdafox/pebble

pebble.ProcessFuture still running after TimeoutError is raised

marxin opened this issue · 13 comments

I can't cancel a running pebble.ProcessFuture after a timeout is reached:

#!/usr/bin/env python3

from pebble import ProcessPool
from concurrent.futures import wait, FIRST_COMPLETED

import os
import subprocess
import time

def run():
    subprocess.check_output('md5sum /dev/random', shell=True)

with ProcessPool(max_workers=4) as pool:
    print('new ProcessPool')
    futures = []
    for i in range(4):
        futures.append(pool.schedule(run, timeout=1))
    print('md5sum started')
    time.sleep(3)
    for f in futures:
        print(f)
        f.cancel()
        print('Cancel: %s' % str(f))
    time.sleep(3)
    print('After cancelation:')
    for f in futures:
        print(f)
$ ./pe.py 
new ProcessPool
md5sum started
<ProcessFuture at 0x7fc92b5a4d90 state=finished raised TimeoutError>
Cancel: <ProcessFuture at 0x7fc92b5a4d90 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c130 state=finished raised TimeoutError>
Cancel: <ProcessFuture at 0x7fc929d2c130 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c1f0 state=finished raised TimeoutError>
Cancel: <ProcessFuture at 0x7fc929d2c1f0 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c2b0 state=finished raised TimeoutError>
Cancel: <ProcessFuture at 0x7fc929d2c2b0 state=finished raised TimeoutError>
After cancelation:
<ProcessFuture at 0x7fc92b5a4d90 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c130 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c1f0 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c2b0 state=finished raised TimeoutError>

and

$ ps ax | grep md5
19160 pts/1    R      0:18 md5sum /dev/random
19161 pts/1    R      0:18 md5sum /dev/random
19162 pts/1    R      0:18 md5sum /dev/random
19163 pts/1    R      0:19 md5sum /dev/random

What you observe is the expected behaviour. There are 2 issues with you code.

Firstly, you are cancelling an already terminated future. Cancelling an already finished future is basically a no-op.
In you code you schedule 4 jobs with 1 second timeout each. Then you wait 3 seconds (enough for all jobs to timeout) and then you cancel them.
You can see the futures are in the correct state:
<ProcessFuture at 0x7fc92b5a4d90 state=finished raised TimeoutError>

Secondly, you are launching a new process via the subprocess module inside the pool worker. This is the best way to leak those processes. As the pool worker process is terminated after 1 second, its child is left hanging there. The Pool is in fact tracking its own processes and not any other resource a worker function creates. It is the developer duty to handle any persistent resource a worker is initializing.
Moreover, it is consider a bad pattern to nest multiple levels of processes as it can easily lead to process leaks.
You have no value in using subprocess within a Pool of Processes. The subprocess module in fact, already allows to interrupt a running command. If you want to execute multiple commands asynchronously via the subprocess module, the best way is to either control them in the main loop or to use multithreading.

What you observe is the expected behaviour. There are 2 issues with you code.

Thank you for the helpful answer.

Firstly, you are cancelling an already terminated future. Cancelling an already finished future is basically a no-op.
In you code you schedule 4 jobs with 1 second timeout each. Then you wait 3 seconds (enough for all jobs to timeout) and then you cancel them.
You can see the futures are in the correct state:
<ProcessFuture at 0x7fc92b5a4d90 state=finished raised TimeoutError>

That's what I expected that .cancel is not needed.

Secondly, you are launching a new process via the subprocess module inside the pool worker. This is the best way to leak those processes. As the pool worker process is terminated after 1 second, its child is left hanging there. The Pool is in fact tracking its own processes and not any other resource a worker function creates. It is the developer duty to handle any persistent resource a worker is initializing.
Moreover, it is consider a bad pattern to nest multiple levels of processes as it can easily lead to process leaks.

Well, it seems to me quite natural approach that is used on Unix system.
My Pebble usage is about running parallel tasks (for which I chose ProcessPool) where some of these task need to execute an external program (that why I use subprocess). Moreover, the ProcessPool selection was based on fact that I need an immediate cancel option for futures. Am I right that ThreadPool future can't be easily canceled?

You have no value in using subprocess within a Pool of Processes. The subprocess module in fact, already allows to interrupt a running command. If you want to execute multiple commands asynchronously via the subprocess module, the best way is to either control them in the main loop or to use multithreading.

Ok, so is solution to my task usage of p = subprocess.Popen and p.terminate() being called from termination signal handler? Or am I use os.killpg(os.getpid(), signal.SIGTERM) from such a signal handler?

I'm all ears for proper solution for my workload?
Thanks!

Am I right that ThreadPool future can't be easily canceled?

Cannot be as explained for instance here: #23 (comment).

You can apply two strategy in here which do not require the use of external modules such as pebble.

The first would be a polling routine in your main loop. As subprocess.Popen returns an object which can be checked for completion, you can simply use it to check whether or not your command has finished or timed out.

Some pseudocode as example:

TIMEOUT = 600

processes = []
start = time.time()
now = start

for command in command_list:
    processes.append(subprocess.Popen(command, ...))

while now - start < TIMEOUT:
    for process in processes[:]:
        if process.poll() is not None:
            # process has completed its job on time
            processes.remove(process)
            process_result(process)
 
    time.sleep(0.2)
    now = time.time()
    
if len(processes) > 0:
    print("Processes %s still pending, terminating them")
    for process in processes:
        process.terminate()   

The second would rely on a thread pool and handle the timeout in the worker itself:

TIMEOUT = 600
futures = []

def worker_function(command, timeout):
    process = subprocess.Popen(command, ...)
    try:
        return process.communicate(timeout=timeout)
    except TimeoutExpired:
        process.terminate()
        raise

with concurrent.futures.ThreadPoolExecutor() as pool:
    for command in command_list:
        futures.append(pool.submit(worker_function, command, TIMEOUT))
  
    for future in futures:
        try:
            process_result(future.result())
        except TimeoutExpired:
            print("Timeout while processing a command")

In both cases the logic would be a bit more complicated if you'd like to have different timeouts for each process but not too much, just replace the processes list with a dictionary containing the desired timeout as well.

The first approach is pretty lightweight as it does not require any additional resources to run your commands. Nevertheless, it's logic is a bit more complex.
The second approach requires to spawn few threads for it to work which is additional burden on your workstation. Its logic is a bit cleaner though.

Thank you very much for your answer.

You can apply two strategy in here which do not require the use of external modules such as pebble.
What a pity, I really like the API provided by pebble. But let me describe my use-case and how the suggested solutions do not fit precisely.

My workload:

  1. start parallel jobs in order to utilize all CPU cores
  2. some jobs use only Python code, but some of them run external programs
  3. wait for a first successful job
  4. immediately terminate all running jobs (including Python code and also external programs)
  5. continue

The first would be a polling routine in your main loop. As subprocess.Popen returns an object which can be checked for completion, you can simply use it to check whether or not your command has finished or timed out.

This does not cover my Python jobs that does need to run a sub-process.

Some pseudocode as example:

TIMEOUT = 600

processes = []
start = time.time()
now = start

for command in command_list:
    processes.append(subprocess.Popen(command, ...))

while now - start < TIMEOUT:
    for process in processes[:]:
        if process.poll() is not None:
            # process has completed its job on time
            processes.remove(process)
            process_result(process)
 
    time.sleep(0.2)
    now = time.time()
    
if len(processes) > 0:
    print("Processes %s still pending, terminating them")
    for process in processes:
        process.terminate()   

The second would rely on a thread pool and handle the timeout in the worker itself:

TIMEOUT = 600
futures = []

def worker_function(command, timeout):
    process = subprocess.Popen(command, ...)
    try:
        return process.communicate(timeout=timeout)
    except TimeoutExpired:
        process.terminate()
        raise

with concurrent.futures.ThreadPoolExecutor() as pool:
    for command in command_list:
        futures.append(pool.submit(worker_function, command, TIMEOUT))
  
    for future in futures:
        try:
            process_result(future.result())
        except TimeoutExpired:
            print("Timeout while processing a command")

This is problematic as I want the capability to terminate running jobs (here Threads can't be terminated).

In both cases the logic would be a bit more complicated if you'd like to have different timeouts for each process but not too much, just replace the processes list with a dictionary containing the desired timeout as well.

Different timeouts are not an issue for me, I'll use only one for all of jobs.

The first approach is pretty lightweight as it does not require any additional resources to run your commands. Nevertheless, it's logic is a bit more complex.
The second approach requires to spawn few threads for it to work which is additional burden on your workstation. Its logic is a bit cleaner though.

I see, but I still don't see a scenario for all my needs.
Thanks.

I am afraid there is not a solution for your problem which can be handled with a single framework.

I would suggest you to use pebble.ProcessPool for running your Python workloads concurrently and use the subprocess.Popen polling technique to control the external programs. You can then handle the termination of the two pools together once the stop signal is triggered.

Be wary that mixing subprocess and multiprocessing in the same program is considered dangerous especially in Unix environments due to the way fork handles child process creation.
Example:
https://bugs.python.org/issue25829

I would suggest you to use pebble.ProcessPool for running your Python workloads concurrently and use the subprocess.Popen polling technique to control the external programs.

Do you mean here doing subprocess.Popen in a pebble.ProcessPool context? If so, should I rather register a signal handler that will call the os.kill(pid_from_popen) or is it better to send the PID to the main process that will then do the termination?

You can then handle the termination of the two pools together once the stop signal is triggered.

Be wary that mixing subprocess and multiprocessing in the same program is considered dangerous especially in Unix environments due to the way fork handles child process creation.
Example:
https://bugs.python.org/issue25829

Good to know!

I've tried a little demo:

#!/usr/bin/env python3

from pebble import ProcessPool
from concurrent.futures import wait, FIRST_COMPLETED

import os
import subprocess
import time
import signal
import datetime

popen_pid = None

def gt():
    return str(datetime.datetime.now())

def signal_handler(signum, frame):
    global popen_pid
    os.kill(popen_pid, signum)
    print('%s: killing: %d' % (gt(), popen_pid))

def run():
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    global popen_pid
    try:
        p = subprocess.Popen('md5sum /dev/random', shell=True)                
        popen_pid = p.pid
        print('%s: create PID: %d' % (gt(), popen_pid))
        p.communicate()
    except Exception as e:
        print(e)
        asdf

with ProcessPool(max_workers=4) as pool:
    print('%s New ProcessPool:' % gt())
    futures = []
    for i in range(4):
        futures.append(pool.schedule(run, timeout=1))
    print('%s sleep(3):' % gt())
    time.sleep(3)
    print('%s after sleep:' % gt())
    for f in futures:
        print(f)

print('%s: at the end' % gt())

And I see:

2020-04-22 14:11:08.530282 New ProcessPool:
2020-04-22 14:11:08.535995 sleep(3):
2020-04-22 14:11:08.539071: create PID: 24516
2020-04-22 14:11:08.539189: create PID: 24515
2020-04-22 14:11:08.540193: create PID: 24518
2020-04-22 14:11:08.540494: create PID: 24517
2020-04-22 14:11:09.537216: killing: 24515
2020-04-22 14:11:11.539053 after sleep:
<ProcessFuture at 0x7f09dd71e250 state=finished raised TimeoutError>
<ProcessFuture at 0x7f09dd771fd0 state=running>
<ProcessFuture at 0x7f09dd771f10 state=running>
<ProcessFuture at 0x7f09dd771dc0 state=running>
2020-04-22 14:11:12.641871: killing: 24516
2020-04-22 14:11:15.644228: killing: 24517
2020-04-22 14:11:18.648170: killing: 24518
2020-04-22 14:11:21.761921: at the end

as seen it takes a long time to terminate a Process. It's likely related to this join:

process.join(3)

Am I doing something wrong in the signal handler?
Thanks.

    print('%s: killing: %d' % (gt(), popen_pid))

Calling sys.exit(1) works for me. Hope it's the proper fix.

And there's a working example for multiprocessing.Queue:

#!/usr/bin/env python3

from pebble import ProcessPool
from concurrent.futures import wait, FIRST_COMPLETED
from multiprocessing import Queue, Manager

import os
import subprocess
import time
import signal
import datetime
import sys

popen_pid = None

def gt():
    return str(datetime.datetime.now())

def run(queue):
    try:
        p = subprocess.Popen('md5sum /dev/random', shell=True)                
        popen_pid = p.pid
        queue.put(popen_pid)
        print('%s: create PID: %d' % (gt(), popen_pid))
        p.communicate()
    except Exception as e:
        print(e)
        asdf

with ProcessPool(max_workers=4) as pool:
    m = Manager()
    pid_queue = m.Queue()
    print('%s New ProcessPool:' % gt())
    futures = []
    for i in range(4):
        futures.append(pool.schedule(run, [pid_queue], timeout=1))
    print('%s sleep 2:' % gt())
    time.sleep(2)
    print('%s after sleep:' % gt())
    for f in futures:
        print(f)
    while not pid_queue.empty():
        pid = pid_queue.get()
        print('%s: killing: %d' % (gt(), pid))
        os.kill(pid, signal.SIGTERM)

print('%s: at the end' % gt())
$ 2020-04-22 14:42:07.930466 New ProcessPool:
2020-04-22 14:42:07.935961 sleep 2:
2020-04-22 14:42:07.939869: create PID: 27305
2020-04-22 14:42:07.962820: create PID: 27307
2020-04-22 14:42:07.983714: create PID: 27310
2020-04-22 14:42:07.983889: create PID: 27312
2020-04-22 14:42:09.937035 after sleep:
<ProcessFuture at 0x7fa5c28c96a0 state=finished raised TimeoutError>
<ProcessFuture at 0x7fa5c28c9790 state=finished raised TimeoutError>
<ProcessFuture at 0x7fa5c28c9850 state=finished raised TimeoutError>
<ProcessFuture at 0x7fa5c28c9910 state=finished raised TimeoutError>
2020-04-22 14:42:09.947932: killing: 27305
2020-04-22 14:42:09.948127: killing: 27307
2020-04-22 14:42:09.948299: killing: 27310
2020-04-22 14:42:09.948461: killing: 27312
2020-04-22 14:42:10.049040: at the end

I hope we can close the issue.
Thanks for the brainstorming!

I meant to suggest to use pebble.ProcessPool for jobs which are to be solved natively in Python and to use the subprocess.Popen polling loop for those which are external programs.

They would act as two separate "pools" which you can terminate according to your needs.

odin- commented

And there's a working example for multiprocessing.Queue:

#!/usr/bin/env python3

from pebble import ProcessPool
from concurrent.futures import wait, FIRST_COMPLETED
from multiprocessing import Queue, Manager

import os
import subprocess
import time
import signal
import datetime
import sys

popen_pid = None

def gt():
    return str(datetime.datetime.now())

def run(queue):
    try:
        p = subprocess.Popen('md5sum /dev/random', shell=True)                
        popen_pid = p.pid
        queue.put(popen_pid)
        print('%s: create PID: %d' % (gt(), popen_pid))
        p.communicate()
    except Exception as e:
        print(e)
        asdf

with ProcessPool(max_workers=4) as pool:
    m = Manager()
    pid_queue = m.Queue()
    print('%s New ProcessPool:' % gt())
    futures = []
    for i in range(4):
        futures.append(pool.schedule(run, [pid_queue], timeout=1))
    print('%s sleep 2:' % gt())
    time.sleep(2)
    print('%s after sleep:' % gt())
    for f in futures:
        print(f)
    while not pid_queue.empty():
        pid = pid_queue.get()
        print('%s: killing: %d' % (gt(), pid))
        os.kill(pid, signal.SIGTERM)

print('%s: at the end' % gt())
$ 2020-04-22 14:42:07.930466 New ProcessPool:
2020-04-22 14:42:07.935961 sleep 2:
2020-04-22 14:42:07.939869: create PID: 27305
2020-04-22 14:42:07.962820: create PID: 27307
2020-04-22 14:42:07.983714: create PID: 27310
2020-04-22 14:42:07.983889: create PID: 27312
2020-04-22 14:42:09.937035 after sleep:
<ProcessFuture at 0x7fa5c28c96a0 state=finished raised TimeoutError>
<ProcessFuture at 0x7fa5c28c9790 state=finished raised TimeoutError>
<ProcessFuture at 0x7fa5c28c9850 state=finished raised TimeoutError>
<ProcessFuture at 0x7fa5c28c9910 state=finished raised TimeoutError>
2020-04-22 14:42:09.947932: killing: 27305
2020-04-22 14:42:09.948127: killing: 27307
2020-04-22 14:42:09.948299: killing: 27310
2020-04-22 14:42:09.948461: killing: 27312
2020-04-22 14:42:10.049040: at the end

So this totally solved a similar issue I was having. Thank you :-)