pipelines is a library for webapp developers to help them create async processing pipelines quickly and effortlessly. The motivation being -
"The dev should only be worried about defining coroutines that DO the actual processing"
The central idea behind
pipelinesis - a series of producer-consumer relationships between connected nodes
A processing pipeline is represented as a directed graph of nodes where each node represents a processor. Each processor has associated with it a coroutine that performs some arbitrary operation. This operation could be anything, ranging from - reversing a string, querying a database to sending raw data to a cluster for processing.
The Plumber class object is responsible for building producer-consumer connections between different Processor's. It creates the pipeline based on an input graph and instantiates the Processors.
I highly recommend using virtualenvs for using/testing the library at this stage -
Assuming you have already sourced your venv, cd into the root directory of the repo and
(venv) $ pip install -e .
(venv) $ echo "from pipelines.processor import Processor\nprint(Processor.__doc__)" | pythonIf everything worked fine, the docstring for the Processor class should be printed.
As an example - albeit a bad one as it doesn't highlight the good side of asyncio, Lets set up a pipeline to simply reverse a string
Lets define it in a class to group related coros together
from pipelines.processor import Processor
from pipelines.plumber import Plumber
import asyncio
class StringFuncs:
@classmethod
async def reverse(cls, self:Processor=None, q_elt:tuple=None):
return q_elt[0][::-1]
# This coro shall be called as StringFuncs.reverse The arguments self and q_elt have to be there in the coroutine signature as the first two arguments. They contain the references to the Processor instance the coro is associated with and the tuple containing the input to the coro respectively.
We need a way to provide input to the pipeline. This is achieved by creating a node of type InputProcessor. However, the developer does not need to worry about this. The only change that we need to keep in mind is the coroutine signature.
Let's define an input coroutine that generates random hex strings and add it to the StringFuncs class.
class StringFuncs:
...
@classmethod
async def input_coro(cls, self:Processor=None, output_q:asyncio.Queue=None):
# This coroutine generates 20 random input strings and populates the
# output_q of whatever node it runs on.
import uuid
acc = [ str(uuid.uuid4()) for _ in range(20) ]
for i in acc:
await output_q.put(i)
# unnecessary but async
await asyncio.sleep(0)class StringFuncs:
...
@classmethod
async def output_coro(cls, self, q_elt):
print('output~> ', q_elt)The pipeline is represented by a graph like so -
input_d = {
'nodes': {
'inp': { 'coro': StringFuncs.input_coro },
'rev': { 'coro': StringFuncs.reverse },
'out': { 'coro': StringFuncs.output_coro },
},
'graph': {
'inp': ('rev', 'out'), # output of node 'inp' ~> 'rev' and 'out'
'rev': ('out', ), # and so on...
'out': None,
},
}Now that the graph defining the pipeline is built, we need to instantiate it using the Plumber. The Plumber takes two arguments - the graph dict and a coro_map which is basically a function that maps the coro value in the nodes dict to the appropriate function object i.e. it maps
input_d['nodes']['inp']['coro'] ~> StringFuncs.input_coro In our toy application, it can be trivially defined as -
coro_map = lambda x: xAnd so we can build and run the pipeline as follows -
_t = Plumber(input_d, coro_map=lambda x: x)
_t.create_pipeline()output~> ('afcaae36-213f-46ff-bdb0-ab417fef65c9', '9c56fef714ba-0bdb-ff64-f312-63eaacfa')
output~> ('81456b84-efb1-4791-baa9-c9555a70bfbd', 'dbfb07a5559c-9aab-1974-1bfe-48b65418')
output~> ('8a480d0f-6f3c-4733-92f9-ae5cfa1748d9', '9d8471afc5ea-9f29-3374-c3f6-f0d084a8')
...The example code can be found in demos/readme_demo.py
# To test the Plumber class
$ python test_plumber.pyThe pipeline set up in test_plumber.py is -
graph LR;
inp((inp))-->n1((n1));
n1((n1))-->n2((n2));
n1((n1))-->n3((n3));
n2((n2))-->n3((n3));
n2((n2))-->n4((n4));
n3((n3))-->n4((n4));
where,
inp~> input node that generates a configurable number of random stringsn1~> reverses stringsn2~> touppers stringn3~> tolowers stringn4~> output coroutine withaggregate_inputset toFalse
Note that there might be some
asyncio.sleep()s in a few coros.
The purpose of the demo is to show the flexibility in setting up of general pipelines based on user input.
Prerequisites -
socketio,aiohttp
To run the demo navigate to demos directory and execute the following-
The server is bound to port
8080
# To start the server
$ python server.py
# To start the client
$ python client.pyMost of the coroutines have an asyncio.sleep() call to simulate an IO bound wait...
Refer to the wiki https://github.com/theboxahaan/pipelines/wiki/Processor
- Add
doomsdayQueuesfor clean task cancellation - Finalise
Plumberdesign - Think about input specifications. Graph representations look good RN
- Write a pipelines viewer if I get time.
- Write Input Rigs i.e. class with no inputQs and only outputs
- Add signal handler to handle script exit
- Add tests for Processor and Plumber
- Pass args through input ?
- Write a proper mechanism for getting function object from string
-
Plumberis the only class that interacts with the established context. Need to find a way to make context variables available toProcessorinstances. - Make a
TypeVarfor Queues - Write cleanup coros -- first introduce types of Queues
- Write a demo with
aiohttpor something... - Add option for non-aggregated input for multi-input
Processors -- is there a need for this ?? -
Add anEventlock onProcessors to control pipelines. - Backpressure testing ??? How do I do that ? - Issue Raised
- Write documentation
- Enable running different nodes on different machines - distribute the graph
- Add optional dependancies
aredisfor the distributed setup usingextras_requireinsetup.py - Add a proper demo for both cases
- Add performance metrics based on the number of elements in q per-second (Is that a good metric 🤔 )
- Add an example to spawn an FL network w/ configurable number of workers
- Write an
NCCLbackend for this to be used w/ distributed training
- Write an
