/rxdable

Utility lib to work with Node.js streams and Rx.js

Primary LanguageTypeScriptMIT LicenseMIT

rxdable

npm version Build Status Coverage Status Dependencies Status JavaScript Style Guide

Utility lib to work with Node.js streams and Rx.js.

  • Node.js >= 10.x
  • Rx.js 6.x
  • Zero dependencies
  • TypeScript support

Observable to Readable stream

const { getStreamByObservable } = require('rxdable')

const readableStream = getStreamByObservable(observable)

Readable stream to Observable

const { createReadStream } = require('fs')
const { subscribeToStream } = require('rxdable')
const { Observable } = require('rxjs')

function fileRead(file, encoding = 'utf8') {
  return new Observable(subscriber => {
    return subscribeToStream(
      createReadStream(file, encoding),
      subscriber
    )
  })
}

Readable stream to Observable (faster way)

const { getObservableByStream } = require('rxdable')

const observable = getObservableByStream(readableStream)

WARNING: This function will create an Observable ables to be subscribed just one time.

Operator to Transform stream

Single operator:

const { count } = require('rxjs/operators')
const { getStreamByOperator } = require('rxdable')

const countStream = getStreamByOperator(count())

Multiple operators:

const { map } = require('rxjs/operators')
const { getStreamByOperator } = require('rxdable')

const firstUppercasedLetterStream = getStreamByOperator(
  map(value => value.toString()),
  map(value => value.substr(0, 1)),
  map(value => value.toUpperCase())
)

No operators (passthrough):

const { getStreamByOperator } = require('rxdable')

const passthroughStream = getStreamByOperator()

Writable/Duplex/Transform stream to operator

const { createWriteStream } = require('fs')
const { pipeObservableToStream } = require('rxdable')
const { Observable } = require('rxjs')

function fileWrite(file, encoding = 'utf8') {
  return observable => {
    return new Observable(subscriber => {
      return pipeObservableToStream(
        observable,
        createWriteStream(file, encoding),
        subscriber
      )
    })
  }
}

Writable/Duplex/Transform stream to operator (faster way)

const { getOperatorByStream } = require('rxdable')

const operator = getOperatorByStream(transformStream)

WARNING: This function will create an Operator ables to be used just one time.