EMP provides a simple and effective way to accelerate your Python code. Under the hook, EMP use Python 's native multiprocessing
package and Ray
as backends, which are named pymp
and ray
respectively in EMP 's API.
Generally, pymp
is more stable but can be slower when the input / output objects of the process is large, ray
is faster for handling large object but can be unstable. Users can choose suitable backends in different cases.
pip install PyEMP
With emp.mapper
, users can easily convert a function or a class with __call__
method into parallelized version.
import emp
@emp.mapper(backend='pymp')
def multiply(x):
return x * 2
results = list(multiply(range(100)))
assert results == [x * 2 for x in range(100)]
import emp
@emp.mapper(backend='pymp')
class Multiply:
def __init__(self, mul):
self.mul = mul
def __call__(self, x):
return x * self.mul
multiply = Multiply(2)
results = list(multiply(range(100)))
assert results == [x * 2 for x in range(100)]
3.Execute with distributed package Ray
as backend.
# ...
@emp.mapper(backend='ray')
# ...
# ...
@emp.mapper(backend='pymp', num_proc=64)
# ...
# ...
@emp.mapper(backend='pymp', chunk_size=100)
# ...
import emp
@emp.mapper(backend='pymp', report_interval=10, report_newline=True)
def multiply(x):
return x * 2
results = list(multiply(range(100)))
Outputs:
[2020-11-02 22:15:38] [multiply] progress [1 / 100]
[2020-11-02 22:15:38] [multiply] progress [10 / 100]
[2020-11-02 22:15:38] [multiply] progress [20 / 100]
[2020-11-02 22:15:38] [multiply] progress [30 / 100]
[2020-11-02 22:15:38] [multiply] progress [40 / 100]
[2020-11-02 22:15:38] [multiply] progress [50 / 100]
[2020-11-02 22:15:38] [multiply] progress [60 / 100]
[2020-11-02 22:15:38] [multiply] progress [70 / 100]
[2020-11-02 22:15:38] [multiply] progress [80 / 100]
[2020-11-02 22:15:38] [multiply] progress [90 / 100]
[2020-11-02 22:15:38] [multiply] progress [100 / 100]
If report_newline=False
(which is the default setting), all the report message will be printed in a single line.
import emp
import time
import random
@emp.mapper(backend='pymp', ordered=False)
def multiply(x):
time.sleep(random.random())
return x * 2
results = list(multiply(range(10)))
print(results) # probably unordered
import emp
@emp.mapper(backend='pymp')
def multiply(x):
return x * 2
results = []
for result in multiply(range(10)):
results.append(result)
With emp.iterator
, users can easily convert an iterator into parallelized version. When the new iterator is invoked, it will run in another process and iteratively yield results one by one to the current process.
import emp
@emp.iterator(backend='pymp')
def read_file(filepath):
for line in open(filepath):
yield line
results = list(read_file('temp.txt'))
# ...
@emp.iterator(backend='ray')
# ...
# ...
@emp.iterator(backend='pymp', chunk_size=100)
# ...
# ...
@emp.iterator(backend='pymp', prefetch_size=10)
# ...