Simple API for using web workers with RxJS observables
- Simple
fromWorker
function from main thread side - Fully RxJS interfaces allowing both main thread and worker thread streaming
- Error handling across the thread boundaries is propagated
- Under the hood
materialize
anddematerialize
is used as a robust transport of streaming errors
- Under the hood
- Automatic handling of worker termination on main thread unsubscription of observable
- Framework agnostic - while the demo uses Angular, the only dependencies are rxjs so React or Vue or plain old js is completely compatible
- Fully compatible with Webpack worker-plugin
- Therefore compatible with Angular webworker bundling which uses this
- Class interface based worker creation (should be familiar API for Angular developers)
- Unopinionated on stream switching behavior, feel free to use
mergeMap
,switchMap
orexhaustMap
in your worker if the input stream outputs multiple items that generate their own stream of results - Built in interfaces for handling
Transferable
parts of message payloads so large binaries can transferred efficiently without copying - See Transferable section for usage - Automatic destruction of worker on unsubscription of output stream, this allows for smart cancelling of computation
using
switchMap
operator, or parallelisation of computation withmergeMap
- Worker Pool strategy - maximise the throughput of units of work by utilising all cores on the host machine
https://cloudnc.github.io/observable-webworker
https://dev.to/zakhenry/observable-webworkers-with-angular-8-4k6
- Observable Web Workers, a deep dive into a realistic use case
- Parallel computation in the browser with observable webworkers
Install the npm package: observable-webworker
# with npm
npm install observable-webworker
# or with yarn
yarn add observable-webworker
// src/readme/hello.ts
import { fromWorker } from 'observable-webworker';
import { of } from 'rxjs';
const input$ = of('Hello from main thread');
fromWorker<string, string>(() => new Worker('./hello.worker', { type: 'module' }), input$).subscribe(message => {
console.log(message); // Outputs 'Hello from webworker'
});
// src/readme/hello.worker.ts
import { DoWork, ObservableWorker } from 'observable-webworker';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
@ObservableWorker()
export class HelloWorker implements DoWork<string, string> {
public work(input$: Observable<string>): Observable<string> {
return input$.pipe(
map(message => {
console.log(message); // outputs 'Hello from main thread'
return `Hello from webworker`;
}),
);
}
}
You must export your worker class (export class ...
) from the file if you're using a minifier. If you don't, your
class will be removed from the bundle, causing your worker to do nothing!
You'll probably need to export the class anyway as you are unit testing it right?!
If decorators is not something you use regularly and prefer direct functions, simply
use the runWorker
function instead.
// src/readme/hello-no-decorator.worker.ts#L5-L16
class HelloWorker implements DoWork<string, string> {
public work(input$: Observable<string>): Observable<string> {
return input$.pipe(
map(message => {
console.log(message); // outputs 'Hello from main thread'
return `Hello from webworker`;
}),
);
}
}
runWorker(HelloWorker);
If either your input or output (or both!) streams are passing large messages to or from the worker, it is highly
recommended to use message types that implement the Transferable
interface (ArrayBuffer
, MessagePort
, ImageBitmap
).
Bear in mind that when transferring a message to a webworker that the main thread relinquishes ownership of the data.
Recommended reading:
- https://developer.mozilla.org/en-US/docs/Web/API/Transferable
- https://developer.mozilla.org/en-US/docs/Web/API/Worker/postMessage
To use Transferable
s with observable-worker, a slightly more complex interface is provided for both sides of the
main/worker thread.
If the main thread is transferring Transferable
s to the worker, simply add a callback to the fromWorker
function
call to select which elements of the input stream are transferable.
// src/readme/transferable.main.ts#L7-L11
return fromWorker<ArrayBuffer, string>(
() => new Worker('./transferable.worker', { type: 'module' }),
input$,
input => [input],
);
If the worker is transferring Transferable
s to the main thread simply implement DoTransferableWork
, which will
require you to add an additional method selectTransferables?(output: O): Transferable[];
which you implement to select
which elements of the output object are Transferable
.
Both strategies are compatible with each other, so if for example you're computing the hash of a large ArrayBuffer
in
a worker, you would only need to use add the transferable selector callback in the main thread in order to mark the
ArrayBuffer
as being transferable in the input. The library will handle the rest, and you can just use DoWork
in the
worker thread, as the return type string
is not Transferable
.
If you have a large amount of work that needs to be done, you can use the fromWorkerPool
function to automatically
manage a pool of workers to allow true concurrency of work, distributed evenly across all available cores.
The worker pool strategy has the following features
- Work can be provided as either
Observable
,Array
, orIterable
- Concurrency is limited to
navigation.hardwareConcurrency - 1
to keep the main core free.- This is a configurable option if you know you already have other workers running
- Workers are only created when there is need for them (work is available)
- Workers are terminated when there is no more work, freeing up threads for other processes
- for
Observable
, work is considered remaining while the observable is not completed - for
Array
, work remains while there are items in the array - for
Iterable
, work remains while the iterator is notresult.done
- for
- Workers are kept running while work remains, preventing unnecessary downloading of the worker script
- Custom observable flattening operator can be passed, allowing for custom behaviour such as correlating the output
order with input order
- default operator is
mergeAll()
, which means the output from the webworker(s) is output as soon as available
- default operator is
In this simple example, we have a function that receives an array of files and returns an observable of the MD5 sum hashes of those files. For simplicity we're passing the primitives back and forth, however in reality you are likely to want to construct your own interface to define the messages being passed to and from the worker.
// src/readme/worker-pool.main.ts
import { Observable } from 'rxjs';
import { fromWorkerPool } from 'observable-webworker';
export function computeHashes(files: File[]): Observable<string> {
return fromWorkerPool<File, string>(() => new Worker('./worker-pool-hash.worker', { type: 'module' }), files);
}
// src/readme/worker-pool-hash.worker.ts
import * as md5 from 'js-md5';
import { DoWorkUnit, ObservableWorker } from 'observable-webworker';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
@ObservableWorker()
export class WorkerPoolHashWorker implements DoWorkUnit<File, string> {
public workUnit(input: File): Observable<string> {
return this.readFileAsArrayBuffer(input).pipe(map(arrayBuffer => md5(arrayBuffer)));
}
private readFileAsArrayBuffer(blob: Blob): Observable<ArrayBuffer> {
return new Observable(observer => {
if (!(blob instanceof Blob)) {
observer.error(new Error('`blob` must be an instance of File or Blob.'));
return;
}
const reader = new FileReader();
reader.onerror = err => observer.error(err);
reader.onload = () => observer.next(reader.result as ArrayBuffer);
reader.onloadend = () => observer.complete();
reader.readAsArrayBuffer(blob);
return () => reader.abort();
});
}
}
Note here that the worker class implements DoWorkUnit<File, string>
. This is different to before where we implemented
DoWork
which had the slightly more complex signature of inputting an observable and outputting one.
If using the fromWorkerPool
strategy, you must only implement DoWorkUnit
as it relies on the completion of the
returned observable to indicate that the unit of work is finished processing.