/orchestra

A framework for distributing work over many machines, integrated with Node.js workers to utilise many threads.

Primary LanguageTypeScript

Orchestra

npm version NPM License

This package is in prerelease and is currently unmaintained and unsupported, use at your own risk.

A framework for distributing work over many machines, integrated with Node.js workers to utilise many threads per machine.

const orchestra = new Client(
    network,
    workspace,
);

console.log(`Creating job...`)

// Add a job to the network and respect the queue
const job = await orchestra.queue({ script: 'add.js', params: { a: 1, b: 2 });

console.log(`Job ${job.id} added to queue`);

// Get a promise for the result of the job
const result = await orchestra.result(job.id);

console.log(`Job complete!`, { result });

Or if you need something to be processed immediately you can bypass the job queue using the play() method

const orchestra = new Client(
    network,
    workspace,
);

console.log(`Creating job...`);

// Instruct the network to execute add.js on the next available worker and return the result
const result = await orchestra.play({ script: 'add.js', params: { a: 1, b: 2 } });

console.log(`Job complete!`, { result });

If you need to pass data during execution of the script, you can use orchestra.stream()

console.log(`Creating job...`)
const job = await orchestra.queue({ script: 'add.js', params: { a: 1, b: 2 });

// create a stream to receive data from the Worker's write() method (see below)
const stream = await orchestra.stream(job.id);

stream.on('data', (chunk) => {
    console.log(`Received data from job ${job.id}`, chunk);
});

stream.on('end', () => {
    console.log(`Job ${job.id} complete!`);
});

Installation

Add the npm package to your project:

npm install @nazaire/orchestra --save

or

yarn add @nazaire/orchestra

Overview / Setup

On your master machine:

  • Create an instance of Composer

On every machine that you want to do work on:

  • Create an instance of Instrument

On any machines that you need to allocate work from:

  • Create an instance of Client

For example, let's take a basic web app.

  • Server machine (only one): Has an instance of Composer and Client
  • Worker machine (many): Has an instance of Instrument

When a request comes in to the server, it can add a job to the network using the Client instance.

The Composer will then distribute the job to the Instrument on the worker machine.

Alternatively, if you only need one machine:

  • Single machine: Has an instance of Composer, Instrument and Client

Or again, if you have multiple servers, you can have the Composer exist on a master worker machine:

  • Server (many): Has an instance of Client
  • Master worker (only one): Has an instance of Composer ( and Instrument if you want work to be done on the master worker too)
  • Slave worker (many): Has an instance of Instrument

Components

Network

Establish a network of machines. Provided options:

  • MemNetwork - A network that exists in one process (useful for testing or single machine setups)
  • RedisNetwork - A network that runs across many machines and uses Redis as a message broker
import { Network } from 'orchestra';

const network = new Network.MemNetwork();

// if you are using a network that requires an async connection
// you need to also run network.connect()

// network needs to be passed to all the major components (composer, instrument, client)

Workspace

A workspace is a directory that contains the code that will be run on the workers. All worker machines need to have access to this directory as they will invoke the scripts directly.

import { Workspace } from 'orchestra';

const workspace = new Workspace(
    '/path/to/workspace', // this should be an absolute path, you can use __dirname to get the current directory, or process.cwd() to get the directory the process was started in
);

// the workspace needs to be passed to the client and instrument


Composer

The composer is responsible for managing the queue of work to be done and distributing it to the workers. There should only be one Composer per network.


import { Composer } from 'orchestra';

const composer = new Composer(
network,
);

// The composer will start accepting job requests as soon as it's created and will distribute them to the instruments

Instrument

The instrument runs on the main thread of each machine and is responsible for receiving work from the Composer and distributing it to it's workers.

import { Instrument } from 'orchestra';

const instrument = new Instrument(
network,
workspace,
4 // max number of workers (usually just the number of cores on the machine)
);

// there's nothing to do here, the instrument will start working as soon as it's created
// it listens for work from the Composer and spawns Workers to do the work

Worker

This is a helper class that should be used within the worker script to retrieve the params and notify the instrument when the work is complete.


import { Worker } from 'orchestra';

const worker = new Worker();

const params = worker.params;

console.log(`Doing work...`, { params });


// (optional) the worker can also emit data during execution
worker.write({ message: `Doing addition: ${params.a} + ${params.b}` })
// a Client can use orchestra.stream(job.id) to receive this data

const result = params.a + params.b;

worker.return(result);

Client

The Client is an interface that can add work to the network.

import { Client } from 'orchestra';

const orchestra = new Client(
    network,
    workspace,
);

console.log(`Creating job...`);

const result = await orchestra.play({ script: 'add.js', params: { a: 1, b: 2 } });

console.log(`Job complete!`, { result });

Typescript

Orchestra supports fully typed job params and results.

// define the params and results of each script in a type
type MyScripts = {
    'add.js': {
       params: {
        a: number;
        b: number;
       },
       result: number;
    };
}

const workspace = new Workspace<keyof MyScripts, MyScripts>(
    '/path/to/workspace',
);

const orchestra = new Client<typeof Workspace>(
    network,
    workspace,
);

// this method is now fully typed (the params and the result)
const result = await orchestra.play({
    script: 'add.js',
    params: { a: 1, b: 2 } }
) // Promise<number>

The Worker class can also be typed.

// path/to/workspace/add.js

import { Worker } from 'orchestra';

// import the workspace type from somewhere else in your project
import { workspace }  from '...';

const worker = new Worker<typeof workspace, 'add.js'>();

const params = worker.params; // { a: number, b: number }

const result = params.a + params.b;

// this method is now fully typed and expects a number
worker.return(result);

Examples

Basic Addition (Typescript)

This example uses a worker to perform basic addition. The directory structure is as follows:

- src/
    - server.ts
    - worker.ts
    - orchestra.ts
    - scripts/
       - add.ts

add.ts - the worker script

This script will be invoked within a Node.JS worker process spawned by the Instrument.
Note that this is a .ts (typescript) file, but is referenced from the client at runtime as add.js.

// src/scripts/add.ts

import { Worker } from '@nazaire/orchestra';
import { workspace } from 'src/orchestra.ts';

// we pass the workspace type as a generic param to the Worker class
// this will ensure worker.params and worker.return are correctly typed
const worker = new Worker<typeof workspace, 'add.js'>();

worker.return(worker.params.a + worker.params.b);

orchestra.ts - shared configuration

This is a common file in your project shared between the worker and the server, it provides an instance of Network, Workspace and Client.

// src/orchestra.ts

import { Client, Network, Workspace } from "@nazaire/orchestra";
import { Redis } from "ioredis";
import { dirname, join } from "path";
import { fileURLToPath } from "url";

export const network = new Network.RedisNetwork(
  {
    publisher: new Redis(),
    subscriber: new Redis(),
  },
  {
    // debug: true,
  }
);

// here we specify the types of the available scripts in the workspace
export type Scripts = {
  "add.js": {
    params: { a: number, b: number };
    result: number;
  };
};

const __dirname = dirname(fileURLToPath(import.meta.url));
const workspacePath = join(__dirname, "./scripts");

export const workspace = new Workspace<keyof Scripts, Scripts>(workspacePath);

export const orchestra = new Client(network, workspace);

server.ts - the server process

This is the server process. In this example, it's simply setting up the Composer and adding a new job to the network every 2 seconds.

// src/server.ts

import { Composer } from "@nazaire/orchestra";
import { orchestra } from 'src/orchestra.js';

console.log(`Starting server...`);

await network.connect();

new Composer(network, {
    // debug: true,
});


console.log("Orchestra ready!");

setInterval(() => {
    const job = await orchestra.queue({
        script: "add.js",
        params: { a: 1, b: 2},
    });

    console.log("Added job to queue", job);

    try {
        // result() returns a Promise that will resolve with the result once the job is complete
        const value = await orchestra.result(job);

        console.log("Job succeeded", value);
    } catch (error) {
        console.error("Job failed", error);
    }
}, 2_000);

worker.ts - the worker process

This is the workers entry point. It sets up a Instrument that performs work as instructed from the Composer.

// src/worker.ts

import { network, workspace } from 'src/orchestra.js';

console.log(`Starting worker...`);

await network.connect();

console.log("Orchestra ready!");

new Instrument(
    network,
    workspace,
    1, // the number of workers to run on this machine (usually the number of cores available)
    {
        debug: true,
    }
);

// the Instrument will now spawn workers when it receives work

Conclusion

Once you've built the source code.

You can start the server with node dist/server.js,
and you can start a worker process with node dist/worker.js.

In this example, there should only ever be ONE process of the server (as the server houses the Composer instance) and you can spawn as many worker processes as you need.

Adjust the max worker count in the Instrument arguments to utilise as many threads as suitable for the machine the process is running on.