multipipe is a Python utility that allows you to create pipelines of functions to execute on any given iterable (e.g., lists, generators) by leveraging multiprocessing. multipipe is built on top of multiprocess.
python>=3.8
pip install multipipe
from multipipe import Multipipe
def add(x):
return x + 1
def mul(x):
return x * 2
pipe = Multipipe([ add, mul ])
pipe(range(10))
Output:
[ 2, 4, 6, 8, 10, 12, 14, 16, 18, 20 ]
Sometimes, you may want to use partials to pass arguments to your functions.
from multipipe import Multipipe
from functools import partial
def add(x, y):
return x + y
def mul(x, y):
return x * y
pipe = Multipipe([ partial(add, y=1), partial(mul, y=2) ])
pipe(range(10))
Output:
[ 2, 4, 6, 8, 10, 12, 14, 16, 18, 20 ]
In this example, we lazily read data from a JSONl file, execute a pipeline of functions lazily, and write the results to a new JSONl file. In practice, this allows you to process huge files without loading their content into memory all-at-once.
from multipipe import Multipipe
from unified_io import read_jsonl, write_jsonl
# Create a pipeline of functions
pipe = Multipipe([ ... ])
# Read a JSONl file line-by-line as a generator, i.e., lazily
in_data = read_jsonl("path/to/input/file.jsonl", generator=True)
# This is still a generator.
# The pipeline will be executed lazily.
out_data = pipe(in_data, generator=True)
# Write a JSONl file from the generator executing the pipeline
write_jsonl(out_data, "path/to/output/file.jsonl")
Would you like to see other features implemented? Please, open a feature request.
Would you like to contribute? Please, drop me an e-mail.
multipipe is an open-sourced software licensed under the MIT license.