Simple worker handling (in a cluster or just as multi threaded) in NodeJs.
Any exported worker function can be awaited in the main script. Since it it using the clusters and Promises behind the scene, you may just use it to handle stuff multithreaded. Or orchestrate a worker pool.
$ npm i runworker
- init the worker script
- have the worker script
export
ormodule.exports
functions/promises to be callable from the master script - use those exported functions in your master script off the worker object
Uses import
and import()
, this is require free code (no magic constants and so on). You have to import from 'runworker/esm'
. Starting the script with node --experimental-modules ????.mjs
or node -r esm ????.mjs
.
import {runWorker, isMaster} from 'runworker/esm';
if (isMaster)(async () => {
let worker = await runWorker('..path..to..worker/worker.mjs');
let result = await worker.test('asd'); // execute worker's exported fn
console.log(result);
worker.kill();
})();
export let test = str => str + '!';
process.stdin.resume(); //so the worker will not close instantly and will be able to communicate
... or any worker function.
Is require()
based. It has one advantage: the worker path can be relative.
const {runWorker, isMaster} = require('runworker');
if (isMaster)(async () => {
let worker = await runWorker('..path..to..worker/worker.mjs');
let result = await worker.test('asd'); // execute worker's exported fn
console.log(result);
worker.kill();
})();
module.exports.test = str => str + '!';
process.stdin.resume(); //so the worker will not close instantly and will be able to communicate
... or any worker function.
Naturally, you can use the Promise chain syntax instead of await
const {runWorker, isMaster} = require('runworker');
if (isMaster) {
let newWorker = null;
runWorker('..path..to..worker/worker.mjs') // returns worker
.then( worker => newWorker = worker ) // save worker, returns worker
.then( worker => worker.test('asd') ) // execute worker's exported fn, returns result
.then( console.log ) // logs result, returns nothing
.then( newWorker.kill ); // calls kill, returns nothing
}
... and worker as above.
Since the clustering works by forking the running process, the master only script part has to be differentiated.
And since the worker object additions are based on Promises
, we need to check if we are within the master proccess (import/require isMaster
from this module) and wrap our code into an async closure.
if (isMaster)(async () => {
// code ..
})();
The long form is if(isMaster) { let x = async()=>{ /* code .. */ }; x(); }
The boolean constants isMaster
and isWorker
can be imported/required as well. They are streight from the internal cluster object.
runMaster(masterPathname)
can be used, to have your start script include the master script (taking care of the if (isMaster)
part). This is a Promise
to await
and catch errors on as well.
Export any function to be proxied to the master script's worker object.
Exported functions may be of type: normal function, async function or promise. They may only return serializeable values (JSON.stringify compatible).
Exported variables will not be proxied, use a custom getter or settter function. The reason not it is not implemented using observables: If the value would be send through, the master script could have triggered a follow up function and the value might not yet have been set on the worker (race conditions).
Is used in the master script.
runWorker(workerPathname)
-- needs to be required/imported to initialize a worker, returns a Promise to await, resolves as Cluster Worker with a few additions (mainly, the proxies to the exported functions - see additional members below)!
Note:
runWorker(workerPathname [, useModeFast = true [, enableRespawn = false]])
runWorker(workerPathname, { useModeFast: true, enableRespawn: false })
(with option object, properties are optional)- has a
useModeFast
that toggles 2 ways of loading the worker script:- TRUE: is faster: no IPC call to load script, but blocks longer on weak cpus
- FALSE: is slower but does perform better on weak single-core / dual-cores: tells the fork after loading (using the IPC) to load the worker resulting in less blocking
- and has an
enableRespawn
that can be used to automatically listen toon('error' ...)
and check for.exitedAfterDisconnect
to run a new worker and triggers.on('respawn', function(newWorker){ let old = this; ... })
on the worker object.
- has a
runWorker()
returned worker gets additional members that can be used within the master on the worker object:
...()
-- all the exported methods proxied (as Promises) from the worker module.sendToWorker(key, message)
-- To send a custom message to the worker. Usually you would use the proxied methods. The worker can useprocess.on('customKey', (msg)=>...)
to process it..workerPathname
-- To retrive the loaded worker script file path- all the usual Cluster Worker methods and properties apply
.process.pid
can be used to access the unique worker id
Note: Multiple workers can be instantiated from different or the same module scripts.
cluster
- can be required/imported in the master script as well from the runworker module
to use cluster.on(..)
cluster.workers{}
holds all workers (worker pool), Nodejs API: cluster.workers - get the length: Object.keys(cluster.workers).length
From the worker script.
sendToMaster(customKey, customMessage)
-- can be required/imported in the worker to send an event to the master. The master will be able to useworkerObj.on('customKey', (msg)=>...)
to receive it.
check out the examples.mjs/
and examples.js/
folders to see some code. The usecase.*
files are rather complete in setting up a cluster. The minimalistic versions are not - they lack catching errors.
using runWorker(..)
- master: spins off a fork, sets the script file path as property (is not used other then to provide the user with the file path to be able to kick off another worker when it died)
- master: waits for the fork to have started
- master: tells (messages) the fork to load the worker script
- worker: answers (messages back) with a list of functions and how they are nested
- master: creates proxy functions using promises of the same object structure (to message a worker what function to call)
- master: resolves the promise with the worker object
now you are free to await the proxy functions on the worker object or listen to events: .on(..)
The normal cluster module and its IPC is used, so no speed difference in communication. BUT: the spin up time can be faster/slower - since the specific worker module file is loaded, when the fork has started and is ready to communicate.