This is a Node.js module which provides a DSL for simplifying asynchronous flow programming in Node.js.
Let example explain:
var Class = require('js-class'),
flow = require('js-flow');
var MyItem = Class({
constructor: function (num) {
this.val = num;
},
transform: function (next) {
next(Math.floor(this.val * 100));
}
});
var MyClass = Class({
run: function (done) {
flow.steps()
.next('prepare')
.next('generate')
.next('execute')
.next('cleanup')
.with(this)
.run(done);
},
prepare: function (next) {
fs.mkdir('someplace', next);
},
generate: function (next) {
flow.times(100).do(function (n, next) {
next(new MyItem(n * Math.random()));
}).map().with(this).run(function (err, items) {
this.items = items;
next();
});
},
execute: function (next) {
flow.each(this.items)
.map('&transform')
.reduce(function (val, item, next) {
next(val + item);
})
.with(this)
.run(100, function (err, result) {
fs.writeFile('someplace/result.txt', result, next);
});
},
cleanup: function (next) {
fs.rmdir('someplace', next);
}
});
new MyClass().run();
npm install js-flow
var flow = require('js-flow');
flow.each()
Or
new flow.Each()
Then the following options can be chained (the order is not important):
flow.each().in(Array or Object)
flow.each(Array or Object)
Both Array
and Object
are supported.
If it is an Array
, index and value in each pair is enumerated.
If it is an Object
, key and value in each pair is enumerated.
flow.each().in(...).do(Function or String)
flow.each().in(...).withIndex().do(...)
Specify the iterator.
For Function
, the arguments is like:
function iterator([arguments passed from run,] [index/key,] value, next)
If the final run
method is passed a number of arguments, they will be forward to the iterator. "index/key" is present if withIndex
is used.
If String
is provided instead of Function
, it represents the method name in context provided by with
. In above example, names are used.
If String
is prefixed with &, the method of iterated object is invoked.
Map/Reduce can also be used:
flow.each().in(...).map(Function or String).run(function (err, results) {})
flow.each().in(...).map(...).reduce(Function or String).run(initailVal, function (err, result) { })
flow.each().in(...).reduce(Function or String).run(initialVal, function (err, result) { })
When map
is used, the callback passed to run
accepts the 2nd parameter with the results ofmap
. If reduce
is used, the arguments passed to run
is used as initial value for reduce
.
Also compatible methods as in javascript Array
:
flow.each(...).some(Function or String).run(function (err, true/false) { })
flow.each(...).every(Function or String).run(function (err, true/false) { })
The maximum number of concurrent enumeration can be limited using:
flow.each(...).concurrent(3)
flow.each(...).series() // this is equivalent to concurrent(1)
flow.each(...).concurrent() // allow unlimited concurrentcy
flow.steps()
.next(Function or String) // do is alias of next
.next(...)
.run([args], function (err) { })
flow.parallel()
.do(Function or String)
.do(...)
.run([args], function (err) { })
For sequential execution, when chain
is used, the result from previous step can be passed as arguments to next step:
flow.steps()
.chain()
.next(function (next) { next('abcd'); })
.next(function (str, next) { next(str + '1'); })
.run(function (err, result) { }) // here result is 'abcd1'
For parallel execution, concurrency can be controlled the same way as Array/Object enumeration.
This is simple and straightforward:
flow.times(10).do(Function or String)
Same concurrency control as above.
flow.loop().do(Function or String) // infinite loop
flow.loop().while(condition function).do() // a while () { } loop
flow.while(condition function).do() // same as above
flow.loop().do().while(...) // a do {} while() loop
flow.loop().while().do().while(...) // useless, but a while () { } while () loop
In any iteration function (Loop, Enumeration, Times etc), passing string break
as the first argument of next
function can terminate the loop/enumeration.
For all function passed in, bind
is not necessary as .with(context)
will ensure the functions are invoked with correct context.
flow....onsuccess(successFunc).run(callback)
flow....onerror(errorFunc).run(callback)
successFunc
and errorFunc
are invoked synchronously before invoking callback
.
Arguments passed to successFunc
doesn't including Error
which is usually the first arguments for callbacks,
and errorFunc
is only passed single argument which is an instance of Error
.
Create directories in parallel
flow.each(['dir1', 'dir2', 'dir3']) // .each([...]) is short for .each().in([...])
.do(fs.mkdir)
.aggregateErrors()
.run(function (err) {
if (Array.isArray(err)) { // some errors happened
...
} else if (err) {
...
} else {
...
}
});
Create directories in series
flow.each(['dir1', 'dir1/dir2', 'dir1/dir2/dir3'])
.series() // equivalent to .concurrent(1)
.do('mkdir') // use method name here, .with must be used
.with(fs)
.run(function (err) {
if (err) { // operation stops on the first error
...
} else {
...
}
});
Send requests in parallel
var Request = Class({
...
send: function (callback) { // callback is function (err, response)
...
}
});
flow.each([req1, req2, req3, ... reqN])
.concurrent(4) // allow max 4 requests in parallel
.map('&send')
.aggregateErrors()
.run(function (err, responses) {
// responses is an array of corresponding responses
});
Reduce responses
flow.each([req1, req2, req3, ... reqN])
.concurrent(4) // allow max 4 requests in parallel
.map('&send')
.reduce(function (value, response, next) {
next(null, value + response.value);
})
.run(100, function (err, value) { // initial value for reduce is 100
...
});
Arguments passed to run
flow.steps()
.next(function (factor1, factor2, next) {
...
})
.next(function (factor1, factor2, next) {
})
.run(12, 15, function (err) { ... }); // for all steps, factor1 = 12, factor2 = 15
Chained steps
flow.steps()
.next(function (factor1, factor2, next) {
...
next(null, 120);
})
.next(function (factor1, factor2, factor3, next) {
...
next(null, factor1 + factor2 + factor3);
})
.run(12, 15, function (err, result) { ... }); // for all steps, factor1 = 12, factor2 = 15
Times
flow.times(10)
.do(function (n, next)) {
... // n will be 0 - 9
}
.run(...);
While loop
flow.loop()
.while(function (next) {
next(true/false);
})
.do(function (next) {
...
})
.run(...);
Do-while loop
flow.loop()
.do(function (next) {
...
})
.while (function (next) {
next(true/false)
})
.run(...);
MIT/X11 License