
Asynchronous Flow Programming Library for Node.js

Primary LanguageJavaScriptOtherNOASSERTION

Build Status Coverage Status

Asynchronous Flow Programming Library for Node.js

This is a Node.js module which provides a DSL for simplifying asynchronous flow programming in Node.js.

Let example explain:

var Class = require('js-class'),
	flow = require('js-flow');

var MyItem = Class({
    constructor: function (num) {
    	this.val = num;

    transform: function (next) {
    	next(Math.floor(this.val * 100));

var MyClass = Class({
	run: function (done) {

    prepare: function (next) {
    	fs.mkdir('someplace', next);

    generate: function (next) {
    	flow.times(100).do(function (n, next) {
        	next(new MyItem(n * Math.random()));
        }).map().with(this).run(function (err, items) {
        	this.items = items;

	execute: function (next) {
            .reduce(function (val, item, next) {
            	next(val + item);
            .run(100, function (err, result) {
            	fs.writeFile('someplace/result.txt', result, next);

    cleanup: function (next) {
    	fs.rmdir('someplace', next);

new MyClass().run();


npm install js-flow
var flow = require('js-flow');

Supported Flows

Array/Object Enumeration



new flow.Each()

Then the following options can be chained (the order is not important):

flow.each().in(Array or Object)
flow.each(Array or Object)

Both Array and Object are supported. If it is an Array, index and value in each pair is enumerated. If it is an Object, key and value in each pair is enumerated.

flow.each().in(...).do(Function or String)

Specify the iterator. For Function, the arguments is like:

function iterator([arguments passed from run,] [index/key,] value, next)

If the final run method is passed a number of arguments, they will be forward to the iterator. "index/key" is present if withIndex is used.

If String is provided instead of Function, it represents the method name in context provided by with. In above example, names are used. If String is prefixed with &, the method of iterated object is invoked.

Map/Reduce can also be used:

flow.each().in(...).map(Function or String).run(function (err, results) {})
flow.each().in(...).map(...).reduce(Function or String).run(initailVal, function (err, result) { })
flow.each().in(...).reduce(Function or String).run(initialVal, function (err, result) { })

When map is used, the callback passed to run accepts the 2nd parameter with the results ofmap. If reduce is used, the arguments passed to run is used as initial value for reduce.

Also compatible methods as in javascript Array:

flow.each(...).some(Function or String).run(function (err, true/false) { })
flow.each(...).every(Function or String).run(function (err, true/false) { })

The maximum number of concurrent enumeration can be limited using:

flow.each(...).series()		// this is equivalent to concurrent(1)
flow.each(...).concurrent()	// allow unlimited concurrentcy

Sequential/Parallel Execution

	.next(Function or String)		// do is alias of next
    .run([args], function (err) {  })

    .do(Function or String)
    .run([args], function (err) { })

For sequential execution, when chain is used, the result from previous step can be passed as arguments to next step:

    .next(function (next) { next('abcd'); })
    .next(function (str, next) { next(str + '1'); })
    .run(function (err, result) { })  // here result is 'abcd1'

For parallel execution, concurrency can be controlled the same way as Array/Object enumeration.


This is simple and straightforward:

flow.times(10).do(Function or String)

Same concurrency control as above.


flow.loop().do(Function or String)			// infinite loop
flow.loop().while(condition function).do()	// a while () { } loop
flow.while(condition function).do()			// same as above
flow.loop().do().while(...)					// a do {} while() loop
flow.loop().while().do().while(...)			// useless, but a while () { } while () loop

In any iteration function (Loop, Enumeration, Times etc), passing string break as the first argument of next function can terminate the loop/enumeration.

For all function passed in, bind is not necessary as .with(context) will ensure the functions are invoked with correct context.

Extra callbacks


successFunc and errorFunc are invoked synchronously before invoking callback. Arguments passed to successFunc doesn't including Error which is usually the first arguments for callbacks, and errorFunc is only passed single argument which is an instance of Error.


Create directories in parallel

flow.each(['dir1', 'dir2', 'dir3'])     // .each([...]) is short for .each().in([...])
    .run(function (err) {
        if (Array.isArray(err)) {   // some errors happened
        } else if (err) {
        } else {

Create directories in series

flow.each(['dir1', 'dir1/dir2', 'dir1/dir2/dir3'])
    .series()                       // equivalent to .concurrent(1)
    .do('mkdir')                    // use method name here, .with must be used
    .run(function (err) {
        if (err) {                  // operation stops on the first error
        } else {

Send requests in parallel

var Request = Class({
    send: function (callback) {     // callback is function (err, response)

flow.each([req1, req2, req3, ... reqN])
    .concurrent(4)                  // allow max 4 requests in parallel
    .run(function (err, responses) {
        // responses is an array of corresponding responses

Reduce responses

flow.each([req1, req2, req3, ... reqN])
    .concurrent(4)                  // allow max 4 requests in parallel
    .reduce(function (value, response, next) {
        next(null, value + response.value);
    .run(100, function (err, value) {   // initial value for reduce is 100

Arguments passed to run

    .next(function (factor1, factor2, next) {
    .next(function (factor1, factor2, next) {

    .run(12, 15, function (err) { ... });   // for all steps, factor1 = 12, factor2 = 15

Chained steps

    .next(function (factor1, factor2, next) {
        next(null, 120);
    .next(function (factor1, factor2, factor3, next) {
        next(null, factor1 + factor2 + factor3);
    .run(12, 15, function (err, result) { ... });   // for all steps, factor1 = 12, factor2 = 15


    .do(function (n, next)) {
        ... // n will be 0 - 9

While loop

    .while(function (next) {
    .do(function (next) {

Do-while loop

    .do(function (next) {
    .while (function (next) {


MIT/X11 License