Yet another master-worker lib for Python
Why not asyncio lib?
Asyncio uses only one core due to GIL. Not meant to be multiprocess. I was in need of multiprocess master-worker.
Why not concurrent.Futures?
I was in need of worker pre-configuration of libs it uses. The libs have time-intensive initialization so I didn't want them to init each time the task is pushed.
Why?
Yes.
Will the lib grow to something more versatile?
Oh, we've got plans...
Master-Worker consists of (surprise!) Master and Worker classes.
Tasks are
- Basically, the task is What actually the Worker meant to do
- Formally, it is a pair/tuple
(task_id, task_argument)
travelling between Master and Worker processed.
Workers is used in three close meanings:
- The abstract Worker class which defines basic internal routines like event looping.
- The (user-defined) implementation of Worker, defining pre-work initialization and how the tasks are executed (
run_task(task)
method) - The process which holds instance of the Worker class
Master is the class responsible for spawning workers, dispatch tasks, retrieving task results and load-balance (hah!)
There is also an additional stuff named TaskMonitor which is used to... well... monitor the execution process
IMPORTANT:
Current implementation of the DumbPipelineBox lib is dumb:
There is a demand that ALL Workers MUST be of one and the same Worker subclass (not the same instances, of course)
When Master is created it it told what type (implementation) of Worker to use.
Then, one must add workers via Master.add_worker(**kwargs)
method, which implies
- Registring all necessary internal stuff
- Starting a subprocess
- Initializing the specified Worker subclass with the **kwargs passed
- Starting Worker's event loop
Worker.run()
IMPORTANT: The **kwargs
mentioned must not contain any unpicklable data like threads, functions of links to the main process objects.
- Import the Worker class from lib.worker and
- overload the
Worker.__init__()
method if you need to define worker's pre-init - overload the
Worker.run_task(argument)
to define the worker's logic. Use return (not yield) to return the result of the task.
- overload the
class MyWorkerSubclass(Worker):
def __init__(self, my_parameter_a, my_parameter_b, ..., **kwargs):
# Always call the super()
super(MyWorkerSubclass, self).__init__(**kwargs)
# Some specific init the class you need to do before any work starts
# ...
def run_task(self, argument:object) -> object:
# Some working activity
# ...
return work_result # or None :)
- Import the Master class from lib.master. Initialize it with the user-defined Worker class
- Add workers via
Master.add_worker(**kwargs)
, the kwargs will be passed to your user-defined Worker's__init__(**kwargs)
- Add tasks via
Master.add_task(argument)
- Run the tasks with master.run() which will yield
task_id, results
pair (task_id is generated automatically)
- Add workers via
with Master(MyWorkerSubclass) as master:
# Add workers
master.add_worker(my_parameter_a='...', my_parameter_b='...')
master.add_worker(my_parameter_a='...', my_parameter_b='...')
for task_id, result in master.run('my_argument_1', 'my_argument_2', 'my_argument_3', ...):
# master.run yields the result on it is done
# if error will return the exception as the result
For examples see examples :)