/node-pipes

Primary LanguageJavaScript

node-pipes

A node.js module for Easy chaining operations, a lightweight ETL:

$ npm install node-pipes

Then import it into your project:

//Initilze The Node Pipes
var Pipes = require('node-pipes').init();

//Instance
var pipes = new Pipes();

//Sample data
var data = [
	{
		name: 'zahir',
		age: 33
	},
	{
		name: 'carlo',
		age: 30
	},
	{
		name: 'David',
		age: 25
	},
	{
		name: 'PA',
		age: 27
	},
	{
		name: 'Cyrille',
		age: 37
	},
	{
		name: 'JB',
		age: 31
	}
];

//ENJOY WITH PIPES

Simple Handlet

pipes
  
  .streamFrom(data)

  .handler(function(message){
  	//handle each input message
  	console.log('Current Message', message);
  })   

  .run()
 ;

Filtrage

pipes
  
  .streamFrom(data)

  .filter(function(message){
  	return message.age >= 30
  }) 
  
  .handler(function(message){
  	//Only messages that age >= 30 are handled
  	console.log('Only age >=30', message);
  })   
  
  .run()

 ;

Aggregation Computation

pipes
  
  .streamFrom(data)
  
  //Each 3 messages
  .aggregate('count:3', function(dataList){
  	console.log('Count ', dataList, '\n');
  })
  
  .run()

 ;

Windowing

pipes
  
  .streamFrom(data)
  
  //WINDOWING 
  .aggregate('time:300', function(dataList){
  		console.log('Grouped Data each 300 ms ', dataList, '\n');
  })
  
  .run()

 ;

Streaming Twitter

pipes
  
  .streamFromTwitter({
  	username: 'your user name',
  	password: 'your password',
  	track: '#airbus'
  })  

  .handler(function(tweet){
  	console.log('received tweet', tweet);  	
  })

Stream from CSV file

pipes
  
  .streamFromCsv('data.csv', {schema: ["name", 'age', 'sexe'], separator: ';'})  

  .handler(function(person){
  	console.log('name ==> :', person.name);  	
  })

;