vuejs/vue-rx

Vue Composition API and vue-rx

ebeloded opened this issue ยท 12 comments

As the launch day of Vue 3.0 with composition API is approaching, it's worth discussing how vue-rx will fit into the new paradigm.

My understanding is that one can only rely on options API to use vue-rx. Is there a composition alternative in the works?

It should be trivial to write a composition function like useObservable() that internally subscribes to an observable and returns a Vue ref.

As for DOM event streams, something like this should also be doable:

setup() {
  const { handler: inc, stream: inc$ } = useDOMStream('click')
  const count = useObservable(inc$.pipe(
    map(() => 1),
    startWith(0),
    scan((total, change) => total + change)
  ))
  return {
    count,
    inc
  }
}

Ran across this thread thinking through the integration of rxjs. Only issue seems to be the initial value of the ref in both Observable and BehaviorSubject situations. Not sure what the best way to handle this. Use vue2 comp api plugin so readonly isn't available as a function hence the use of computed.

import { computed, ref, Ref } from '@vue/composition-api'
import { Observable, BehaviorSubject, Subscription } from 'rxjs'

export interface VueObservable<T> {
    ref: Readonly<Ref<Readonly<T> | null>>
    subscription: Subscription
}

export function useObservable<T>(s: Observable<T>, onComplete = () => { }, onError = (err: Error) => { }) {
    const valueRef = ref<T>(null)
    const sub = s.subscribe({
        next: value => valueRef.value = value,
        complete: onComplete,
        error: onError
    })
    const ro = computed(() => valueRef.value)
    return { ref: ro, subscription: sub }
}

export interface VueBehaviorSubject<T> {
    ref: Ref<T | null>
    subscription: Subscription
}

export function useBehaviorSubject<T>(bs: BehaviorSubject<T>, onComplete = () => { }, onError = (err: Error) => { }) {
    const valueRef = ref<T>(null)
    valueRef.value = bs.value
    const sub = bs.subscribe({
        next: value => valueRef.value = value,
        complete: onComplete,
        error: onError
    })
    return { ref: valueRef, subscription: sub }
}

I also came across this thread a few months ago and after testing various approaches, I finally settled on the approach below. I wanted to keep the registration of subscription at the component level to ensure proper unsubscribing (as it is done currently).

For the v-stream part, initially I thought about keeping the directive, but it's actually really trivial to emulate the main functionality (not all use cases I'm sure) for a slight cost in syntax heaviness (but for a greatly simplified implementation also). The useDOMStream function below returns both the "plus$" stream and the "plus" callback (that simply pushes event values down to the "plus$" stream). If you want a reference, you just chain the useObservable function (see example below).

Usage is as follows:

<template lang="pug">
  button(@click="plus")
  br
  pre {{ count }}
</>
import Vue from 'vue';
import { ref, reactive } from '@vue/composition-api';
import { useObservable, useDOMEvent } from 'composables/observables';
import { startWith } from 'rxjs/operators';
export default Vue.extend({
  setup() {
    const { subject: plus$, callback: plus } = useDOMEvent();
    const count = useObservable(plus$.pipe(
      map(() => 1),
      startWith(0),
      scan((total, change) => total + change)
  ))
  return {
    count,
    plus
  }

Implementation:

import { ref, Ref, onBeforeUnmount } from '@vue/composition-api';
import { Observable, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

function subscribeTo<T>(
  observable: Observable<T>,
  next?: (value: T) => void,
  error?: (err: any) => void,
  complete?: () => void
) {
  const unsubscribe$ = new Subject<void>();
  const subscription = observable
    .pipe(takeUntil(unsubscribe$))
    .subscribe(next, error, complete);
  onBeforeUnmount(() => {
    unsubscribe$.next();
    unsubscribe$.complete();
  });

  return subscription;
}

export function useObservable<T>(
  observable: Observable<T>,
  defaultValue?: T
): Ref<T> {
  const handler = ref(defaultValue) as Ref<T>;
  subscribeTo(
    observable,
    value => {
      handler.value = value;
    },
    error => {
      throw error;
    }
  );

  return handler;
}

export function useSubscription<T>(
  observable: Observable<T>,
  next?: (value: T) => void,
  error?: (err: any) => void,
  complete?: () => void
) {
  return subscribeTo(observable, next, error, complete);
}

export function useDOMEvent() {
  const subject = new Subject();
  return {
    subject,
    callback: (event: Event) => {
      subject.next(event);
    }
  };
}
not3 commented

@kevin-courbet

export function useDOMEvent() {
  const subject = new Subject();
  return {
    subject,
    callback: (event: Event) => {
      subject.next(event);
    }
  };
}

const { subject: plus$, callback: plus } = useDOMEvent();

When subject catch an error in the pipe stream, this subject would fail to work any more!

plus$.error(someError)

maybe v-stream part is still important

@kevin-courbet

function subscribeTo<T>(
  observable: Observable<T>,
  next?: (value: T) => void,
  error?: (err: any) => void,
  complete?: () => void
) {
  const unsubscribe$ = new Subject<void>();
  const subscription = observable
    .pipe(takeUntil(unsubscribe$))
    .subscribe(next, error, complete);
  onBeforeUnmount(() => {
    unsubscribe$.next();
    unsubscribe$.complete();
  });

  return subscription;
}

Why not:

function subscribeTo<T>(
  observable: Observable<T>,
  next?: (value: T) => void,
  error?: (err: any) => void,
  complete?: () => void
) {
  const subscription = observable
    .subscribe(next, error, complete);
  onBeforeUnmount(() => {
    subscription.unsubscribe();
  });

  return subscription;
}

@kevin-courbet can be also a wrapper from util

setup() {
  const page = ref(0)
  const products = ref([])
  const text = from(page).pipe(map(x => 'page: ' + x)) // can work like watch or computed, this lib provide from

  from(page).pipe(filter(x => x > 0 && x < 10), fetchProducts()).subscribe(products)

  return {
    count,
    double,
    products
  }
}

or extend ref to implement Observer and Observable interface

export const count = ref(0)
count.pipe(map(count => count * 2)).subscribe(c => console.log(c))

pseudo implementation

import { Observer, Observable } from "rxjs";
import { ref as R, Ref as Rf } from 'vue';

type Ref<T = any> = Rf<T> & Observer<T> & Observable<T>
// and something like
export function ref(value?: unknown): Ref {
    const or = R(value)  as Ref
    const beha = new Subject()
    or.next = beha.next.bind(beha)
    or.pipe = beha.pipe.bind(beha)
    or.subscribe = beha.subscribe.bind(beha)
    watch(or, val => beha.next(val))
    onBeforeUnmount(() => beha.unsubscribe())
    return or
}

with this vue reactive system become compatible with rxjs

@mdbetancourt Thanks for the tips, i started with your advice for the implementation

@ebeloded feel free to try it if you need : https://github.com/NOPR9D/vue-next-rx

regou commented

Should we use separate library like vue & vue-next, or to support both Vue2 and Vue3 in current library with two different documents?
@yyx990803

Should we use separate library like vue & vue-next, or to support both Vue2 and Vue3 in current library with two different documents?
@yyx990803

you can use https://github.com/antfu/vue-demi

qvga commented
  const searchTerm = ref("")
  
  new Observable(obs => watchEffect(() => obs.next(searchTerm)))
          .pipe(
              map(s => s.value),
              debounceTime(180),
              distinctUntilChanged(),
          )
          .subscribe()

use watcheffect on any ref

It should be trivial to write a composition function like useObservable() that internally subscribes to an observable and returns a Vue ref.

As for DOM event streams, something like this should also be doable:

setup() {
  const { handler: inc, stream: inc$ } = useDOMStream('click')
  const count = useObservable(inc$.pipe(
    map(() => 1),
    startWith(0),
    scan((total, change) => total + change)
  ))
  return {
    count,
    inc
  }
}

How would useObservable handle unsubscribe when the component is destroyed?

It should be trivial to write a composition function like useObservable() that internally subscribes to an observable and returns a Vue ref.

It should, it is, and it has been done. For all who come to this project looking for a way to use RxJS Observables with Vue 3 and/or Composition API, you should use useObservable from @vueuse/rxjs (as recommended in #141 (comment) and #148 (comment)). It's very simple to set up, works like a charm, and the project is actively maintained by none other than antfu

@yyx990803 maybe it would make sense to officially deprecate this project, especially now that Vue 2 is no longer maintained? We could add a deprecation notice to the README with a recommendation to use VueUse. This would potentially resolve #148 and #152 as well.