Add `liftEither` for `ObservableEither`
anthonyjoeseph opened this issue · 3 comments
anthonyjoeseph commented
I often end up needing a way to use OperatorFunction's on the right case of ObservableEither's
E.g. filter or takeUntil for ObservableEither. Most operators, really
const liftEither = <E, A, B>(
f: (a: r.Observable<A>) => r.Observable<B>
): ((a: OBE.ObservableEither<E, A>) => OBE.ObservableEither<E, B>) =>
flow(OB.separate, ({left, right}) =>
r.merge(pipe(left, ro.map(E.left)), pipe(right, f, ro.map(E.right)))
)
I can make a P.R. if desired, but I'm not sure if this is the best way to do this.
anthonyjoeseph commented
Here's a simple example usage
import {pipe} from 'fp-ts/pipeable'
import * as r from 'rxjs'
import * as E from 'fp-ts/Either'
import * as OB from 'fp-ts-rxjs/lib/Observable'
import * as OBE from 'fp-ts-rxjs/lib/ObservableEither'
const isOdd = (a: number): boolean => a % 2 !== 0
const fa: OBE.ObservableEither<never, number> = pipe(
r.from([1, 2, 3]),
OBE.rightObservable
)
const fb: OBE.ObservableEither<never, number> = pipe(
fa,
liftEither(OB.filter(isOdd))
)
// fb will emit E.right(1), E.right(3)
(edit: a bit more terse)
anthonyjoeseph commented
Just a note - this solution is not ideal, because the input observable is subscribed to twice due to the r.merge
call. The redundant emissions are filtered out, but it can make debugging difficult
anthonyjoeseph commented
Here's a solution without redundant subscriptions:
export const liftEither = <Err, A, B>(
f: (a: r.Observable<A>) => r.Observable<B>
) => (obs: OBE.ObservableEither<Err, A>): OBE.ObservableEither<Err, B> => {
const subjA = new r.Subject<A>()
const subjB = new r.Subject<Err>()
obs.subscribe(
E.fold<Err, A, void>(
(err) => subjB.next(err),
(val) => subjA.next(val)
)
)
return r.merge(pipe(subjA, f, OB.map(E.right)), pipe(subjB, OB.map(E.left)))
}
(edit: a little more terse)