pip install concurrent-helper
- Simplest and powerful, very easy to use, only 2 core functions.
- Works well both on
Python2
andPython3
. - Support for multiple concurrent modes:
thread pool, process pool and independent multi-processes
. - Support the mode of
Message Queue + Service
. - Multiple progress bar display modes.
import concurrent_helper
import os
def init(gpu_id):
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
def work(task_id, gpu_id=None):
if gpu_id is not None:
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
print("{}: I am working on {} for {}".format(
os.getpid(),
os.environ.get("CUDA_VISIBLE_DEVICES"),
task_id)
)
return task_id * 2
total_gpu_num = 3
todos = [(x, x % total_gpu_num) for x in range(10)]
rtvs = concurrent_helper.run_with_concurrent(work, todos, "thread")
rtvs = concurrent_helper.run_with_concurrent(work, todos, "process")
rtvs = concurrent_helper.run_with_concurrent(work, todos, "x-process")
rtvs = concurrent_helper.run_with_message_queue(
init, list(range(3)), # start 3 services
work, list(range(10)) # has 10 tasks to do
)
def run_with_concurrent(
func,
args_list,
concurrent_type="thread", # ["single", "thread", "process", "x-process"]
concurrent_num=1,
show_process="print", # ["", "tqdm", "print"]
show_interval=0.01,
):
pass
Run a function by concurrent mode.
concurrent_type
:
Param Value | Description |
---|---|
single | like normal for-loop |
thread | thread pool |
process | process pool |
x-process | independent multi-processes |
Warning:
Arrocding to this issue: agronholm/pythonfutures#29, there is a bug in
concurrent.futures
of Python2. The relevant fix upstream uses Python 3 features and cannot be backported.
This bug only happen when child-process killed by system (for exapmle, memory overflow). If you encounter this problem, use the
x-process
instead ofprocess
when you are using Python2.
show_process
:
Param Value | Description |
---|---|
"" | don't show process |
tqdm | use tqdm style process bar |
print process bar info |
Warning:
Please note that tqdm is not thread safe, use print if you need the guarantee of thread safe.
show_interval
:
Param Value | Description |
---|---|
>= 1 | update progress bar by every N task |
< 1 | update progress bar by percentage |
def run_with_message_queue(
init_func,
init_args_list, # concurrent_num == len(init_args_list)
func,
args_list,
show_process="print", # ["", "tqdm", "print"]
show_interval=0.01,
):
pass
Run function by Message Queue + Service
mode.
Fist, start N (
N=len(init_args_list)
) services, these services will inited byinit_func
.After that, these services will obtain M (
M=len(args_list)
) tasks from message queue and run these byfunc
.
Why we need Message Queue + Service
mode?
In order to maximize resource utilization (like GPU), we should to start a certain number of services according to the number of resources. Then, these services will obtain tasks from the message queue and run them.
import concurrent_helper
import os
def init(gpu_id):
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
def work(task_id, gpu_id=None):
if gpu_id is not None:
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
print("{}: I am working on {} for {}".format(
os.getpid(),
os.environ.get("CUDA_VISIBLE_DEVICES"),
task_id)
)
return task_id * 2
total_gpu_num = 3
todos = [(x, x % total_gpu_num) for x in range(5)]
rtvs = concurrent_helper.run_with_concurrent(work, todos, "thread", 3)
print("----")
rtvs = concurrent_helper.run_with_concurrent(work, todos, "process", 3)
print("----")
rtvs = concurrent_helper.run_with_concurrent(work, todos, "x-process", 3, "tqdm")
print("----")
rtvs = concurrent_helper.run_with_message_queue(
init, list(range(3)),
work, list(range(5))
)
print(rtvs)
outputs:
37059: I am working on 0 for 0
37059: I am working on 1 for 1
37059: I am working on 2 for 2
[ 1/5 ] ...... Fns work with thread ...... in 0.0001 seconds.
37059: I am working on 0 for 3
37059: I am working on 1 for 4
[ 2/5 ] ...... Fns work with thread ...... in 0.0001 seconds.
[ 3/5 ] ...... Fns work with thread ...... in 0.0003 seconds.
[ 4/5 ] ...... Fns work with thread ...... in 0.0002 seconds.
[ 5/5 ] ...... Fns work with thread ...... in 0.0001 seconds.
>>>>>> Fns 5 work with thread total use 0.0020 seconds.
----
37063: I am working on 0 for 0
37064: I am working on 1 for 1
37064: I am working on 0 for 3
37065: I am working on 2 for 2
37063: I am working on 1 for 4
[ 1/5 ] ...... Fns work with process ...... in 0.0003 seconds.
[ 2/5 ] ...... Fns work with process ...... in 0.0003 seconds.
[ 3/5 ] ...... Fns work with process ...... in 0.0000 seconds.
[ 4/5 ] ...... Fns work with process ...... in 0.0000 seconds.
[ 5/5 ] ...... Fns work with process ...... in 0.0006 seconds.
>>>>>> Fns 5 work with process total use 0.0126 seconds.
----
37066: I am working on 0 for 0
37067: I am working on 1 for 1
37068: I am working on 2 for 2
37069: I am working on 0 for 3
37070: I am working on 1 for 4
[work / x-process]: 100%|█████████████████| 5/5 [00:00<00:00, 346.26it/s]
----
37074: I am working on 0 for 0
37075: I am working on 1 for 1
37076: I am working on 2 for 2
[ 1/1 ] ...... Fns work with x-process ...... in 0.0003 seconds.
>>>>>> Fns 1 work with x-process total use 0.0085 seconds.
[ 1/1 ] ...... Fns work with x-process ...... in 0.0004 seconds.
[ 1/5 ] ...... Fns work with run_with_message_queue ...... in 0.0090 seconds.
>>>>>> Fns 1 work with x-process total use 0.0090 seconds.
[ 1/1 ] ...... Fns work with x-process ...... in 0.0003 seconds.
>>>>>> Fns 1 work with x-process total use 0.0087 seconds.
[ 2/5 ] ...... Fns work with run_with_message_queue ...... in 0.0093 seconds.
[ 3/5 ] ...... Fns work with run_with_message_queue ...... in 0.0090 seconds.
37077: I am working on 0 for 3
37078: I am working on 1 for 4
[ 1/1 ] ...... Fns work with x-process ...... in 0.0003 seconds.
>>>>>> Fns 1 work with x-process total use 0.0061 seconds.
[ 4/5 ] ...... Fns work with run_with_message_queue ...... in 0.0063 seconds.
[ 1/1 ] ...... Fns work with x-process ...... in 0.0003 seconds.
>>>>>> Fns 1 work with x-process total use 0.0060 seconds.
[ 5/5 ] ...... Fns work with run_with_message_queue ...... in 0.0061 seconds.
>>>>>> Fns 5 work with run_with_message_queue total use 0.0182 seconds.
[0, 2, 4, 6, 8]
- [DONE] Test codes.
- [DONE] Detail docs & English describe about
run_with_message_queue
& More code examples. - [DONE] Add params
show_process, show_interval
torun_with_message_queue
. - [DONE] Remove
raise_exception
param, it will be default action.