gcanti/fp-ts-rxjs

Add `liftEither` for `ObservableEither`

anthonyjoeseph opened this issue · 3 comments

I often end up needing a way to use OperatorFunction's on the right case of ObservableEither's

E.g. filter or takeUntil for ObservableEither. Most operators, really

const liftEither = <E, A, B>(
  f: (a: r.Observable<A>) => r.Observable<B>
): ((a: OBE.ObservableEither<E, A>) => OBE.ObservableEither<E, B>) =>
  flow(OB.separate, ({left, right}) =>
    r.merge(pipe(left, ro.map(E.left)), pipe(right, f, ro.map(E.right)))
  )

I can make a P.R. if desired, but I'm not sure if this is the best way to do this.

Here's a simple example usage

import {pipe} from 'fp-ts/pipeable'
import * as r from 'rxjs'
import * as E from 'fp-ts/Either'
import * as OB from 'fp-ts-rxjs/lib/Observable'
import * as OBE from 'fp-ts-rxjs/lib/ObservableEither'

const isOdd = (a: number): boolean => a % 2 !== 0

const fa: OBE.ObservableEither<never, number> = pipe(
  r.from([1, 2, 3]),
  OBE.rightObservable
)
const fb: OBE.ObservableEither<never, number> = pipe(
  fa,
  liftEither(OB.filter(isOdd))
)
// fb will emit E.right(1), E.right(3)

(edit: a bit more terse)

Just a note - this solution is not ideal, because the input observable is subscribed to twice due to the r.merge call. The redundant emissions are filtered out, but it can make debugging difficult

Here's a solution without redundant subscriptions:

export const liftEither = <Err, A, B>(
  f: (a: r.Observable<A>) => r.Observable<B>
) => (obs: OBE.ObservableEither<Err, A>): OBE.ObservableEither<Err, B> => {
  const subjA = new r.Subject<A>()
  const subjB = new r.Subject<Err>()
  obs.subscribe(
    E.fold<Err, A, void>(
      (err) => subjB.next(err),
      (val) => subjA.next(val)
    )
  )
  return r.merge(pipe(subjA, f, OB.map(E.right)), pipe(subjB, OB.map(E.left)))
}

(edit: a little more terse)