Utility lib to work with Node.js streams and Rx.js.
- Node.js >= 10.x
- Rx.js 6.x
- Zero dependencies
- TypeScript support
const { getStreamByObservable } = require('rxdable')
const readableStream = getStreamByObservable(observable)
const { createReadStream } = require('fs')
const { subscribeToStream } = require('rxdable')
const { Observable } = require('rxjs')
function fileRead(file, encoding = 'utf8') {
return new Observable(subscriber => {
return subscribeToStream(
createReadStream(file, encoding),
subscriber
)
})
}
const { getObservableByStream } = require('rxdable')
const observable = getObservableByStream(readableStream)
WARNING: This function will create an Observable ables to be subscribed just one time.
Single operator:
const { count } = require('rxjs/operators')
const { getStreamByOperator } = require('rxdable')
const countStream = getStreamByOperator(count())
Multiple operators:
const { map } = require('rxjs/operators')
const { getStreamByOperator } = require('rxdable')
const firstUppercasedLetterStream = getStreamByOperator(
map(value => value.toString()),
map(value => value.substr(0, 1)),
map(value => value.toUpperCase())
)
No operators (passthrough):
const { getStreamByOperator } = require('rxdable')
const passthroughStream = getStreamByOperator()
const { createWriteStream } = require('fs')
const { pipeObservableToStream } = require('rxdable')
const { Observable } = require('rxjs')
function fileWrite(file, encoding = 'utf8') {
return observable => {
return new Observable(subscriber => {
return pipeObservableToStream(
observable,
createWriteStream(file, encoding),
subscriber
)
})
}
}
const { getOperatorByStream } = require('rxdable')
const operator = getOperatorByStream(transformStream)
WARNING: This function will create an Operator ables to be used just one time.