tc39/proposal-observable

Alternative observation model proposal

dead-claudia opened this issue ยท 11 comments

@zenparsing If I'm framing this correctly, here are the major concerns I've, or others, found:

  • Currently, exception handling exclusively falls on the observable's author. observer.next(value) throws if subscriber.next(value) throws, which is really bad for error handling.
  • Making observer.next, etc. return a promise is an issue because allocation is really slow, and will bog down everything, which is suboptimal for highly reactive APIs, and will result in a ton of otherwise unnecessary promise allocations. It also carries similar issues to the above.
  • Subscription is very explicit, and prone to memory leaks if the user doesn't close them.
  • It's very difficult to create or use any dedicated syntax for observables, yet there already is one for every other primary collection type, with only async iterables being at stage 3:
    • Single immediate: values (core to the language)
    • Multiple immediate: iterables (for ... of)
    • Single future: promises (async/await)
    • Multiple future: async iterables (for await ... of)
  • Neither proposal has any syntax assist, and it would be exceptionally difficult to create any sort of syntax for observables, especially syntax that could be composed in any meaningful way.

I do feel the only real alternative that solves all of the issues being complained about here may involve pretty much trashing the observable paradigm altogether (despite how nice it might seem), and borrow some ideas from Rust.

Here's my proposed solution: use async iterators instead, and make APIs pull-based using raw callback adapters where necessary. I know this seems a bit radical, but you can implement observable APIs as pull-based iterators. You request event notifications instead of simply receiving them as they come. Rust's futures operate similarly: they're zero-cost and demand-driven, with a callback assist. They require minimal heap allocation (technically none beyond the optimal solution), yet they're also inherently lazy. How would such a thing translate into JavaScript? Our upcoming for await loops and async iterators may do the trick, by declaratively creating procedural pipelines, with many combinators for free.


First, let's start out with a basic pull-based event iterator. This attaches a handler immediately, but only resolves events when necessary. If next is never called, nothing observable ever happens beyond maybe a memory leak.

// Generic wrapper for any DOM event target, easily modified for
// Node.js event emitters
function EventState(target, name) {
    this.target = target
    this.name = name
    this.resolve = undefined
    this.event = undefined
    this.handler = event => {
        const resolve = this.resolve
        if (resolve === undefined) {
             this.event = event
        } else {
            this.resolve = undefined
            resolve({done: false, value: event})
        }
    }
    this.target.addEventListener(this.name, this.handler)
}

class EventIterator {
    // Initialize the iterator and event listener.
    constructor(elem, name) {
        this._ = new EventState(elem, name, opts)
    }

    [Symbol.asyncIterator]() { return this }
    poll() { return this._ !== undefined && this._.event !== undefined }

    // Wait for a new event
    next() {
        const data = this._
        if (data === undefined) {
            return Promise.reject(new TypeError("iterator closed"))
        } else if (data.event !== undefined) {
            const event = data.event
            data.event = undefined
            return Promise.resolve({done: false, value: event})
        } else {
            return new Promise(resolve => data.resolve = resolve)
        }
    }

    // Clear the newly out of scope event listener, to prevent memory leaks.
    return() {
        const data = this._
        if (data === undefined) {
            return Promise.reject(new TypeError("iterator closed"))
        }
        data.target.removeEventListener(this._name, this._handler)
        this._ = undefined
    }
}

function fromEvent(elem, name) {
    return new EventIterator(elem, name)
}

It may seem complex on the surface, but this is literally a low-level, hand-rolled async iterator listening to an event, not some sort of subscription-capable observable. What are the benefits to this approach, using async iterators?

Pros:

  • Far simpler to set up from the user's end. (The library author's end is a bit less consistent)
  • Implementation and usage is highly decoupled, leading to much more modular code.
  • No new native constructors are required to implement it.
  • Only a single consumer needs designed for, due to the lack of subscription.
  • Syntax already exists for it, thus no new critical operators like map or filter are absolutely required for it, and pipelines can be much more declarative as a result.
    • A Lodash-like utility library could provide a wrapper with those kinds of operators.
  • Pull-based means fewer redundant and/or troublesome calls. (Makes filter, etc. less obligatory)
  • Producers can easily be merged together with a simple yield* statement.
  • Error semantics are already very well defined and are offloaded by default to the user to handle. (iter.throw is rarely useful in practice.)
  • Concurrent handling is much more explicit within individual pipelines.
  • Easier to break up long tasks.
  • Async iterators are abortable with syntax assist, and can even be aborted implicitly by breaking a for await loop; observables are only unsubscribable, with only explicit calls and implicit new Observable propagation possible.
  • Resource usage is scoped to for await loops, making for a very nice and consistent experience.
  • Lower level abstraction results in less overall memory overhead.

Cons:

  • Library writer's experience is rather inconsistent.
  • Gets somewhat complicated quickly when writing event handler adapters. (Mitigated with the observe function defined later on.)
  • You don't get to see every value a callback emits.
  • Multicast requires an external tee function. (Which is thankfully fairly straightforward.)
  • Calls are not synchronous by default, resulting in heap-allocated continuations. (This is mitigated by the need-based treatment of this.)
  • May result in multiple very long-lived pending promises on startup.
  • Not very reactive at all. (It waits instead.)
  • Not suitable for high-frequency, processing-heavy changes. (These often require a raw callback, anyways.)
  • Lower level abstraction with potentially less of an ecosystem, so some more complex things will need written from scratch.

Here's some examples of how requesting instead of receiving could prove very useful, and leverage existing stuff much better (events aren't inherently just push-based):

  • Simple logging of click events:

    // RxJS observables
    Rx.Observable.fromEvent(elem, "click").forEach(event => {
        console.log(event)
    })
    
    // Async iterators
    for await (const event of fromEvent(elem, "click")) {
        console.log(event)
    }
  • Throttling comes pretty naturally, requiring no special function or operator for async iterators, but require one for observables. (The observable throttle operator is left as an exercise for the reader, since it's complicated.)

    // Observables
    Rx.Observable.fromEvent(elem, "scroll")
        .throttle(100 /* ms */)
        .forEach(event => {
            console.log(event)
        })
    
    // Async iterators
    const throttle = makeThrottle(100 /* ms */)
    for await (const event of fromEvent(elem, "scroll")) {
        await throttle()
        console.log("scroll detected")
    }
    
    // Async iterator utility
    function makeThrottle(delay) {
        let last = 0
        return () => {
            if (!last) { last = Date.now(); return }
            const prev = last + delay
            const current = last = Date.now()
            if (current <= prev) return
            return new Promise(resolve => setTimeout(resolve, current - prev))
        }
    }
  • Debouncing is similarly easy with async iterators, yet similarly complicated with observables. Note that it filters the events, because not all events need debounced. Oh, and just for kicks, this particular example includes multiple parallel debouncing timeouts with a single iterator, something a bit inelegant with observables.

    // Observables
    const events = Rx.Observable.fromEvent(elem, "scroll")
    
    events.debounce(100 /* ms */).forEach(event => {
        console.log("fast scroll detected")
    })
    
    events.debounce(200 /* ms */).forEach(event => {
        console.log("slow scroll detected")
    })
    
    // Async iterators
    const debounce100 = makeDebounce(100 /* ms */)
    const debounce200 = makeDebounce(200 /* ms */)
    for await (const event of fromEvent(elem, "scroll")) {
        await Promise.race([
            debounce100().then(() => console.log("fast scroll detected"))
            debounce200().then(() => console.log("slow scroll detected"))
        ])
    }
    
    // Async iterator utility
    function makeDebounce(delay) {
        let last = 0
        let timer, callback
        return async () => {
            if (last + delay < (last = Date.now())) {
                clearTimeout(timer)
                timer = setTimeout(callback, delay)
                return false
            }
            return new Promise(resolve => {
                timer = setTimeout(callback = () => {
                    timer = callback = undefined
                    resolve(true)
                }, delay)
            })
        }
    }
  • Concatenation is fairly simple for either, but the async iterator version is pretty trivial.

    // Observables
    function zip(observable1, observable2) {
        return new Observable(observer => {
            let sub = observable1.subscribe({
                next: v => observer.next(v),
                error: v => observer.error(v),
                complete() {
                    sub = observable2.subscribe({
                        next: v => observer.next(v),
                        error: v => observer.error(v),
                        complete: v => { sub = undefined; observer.complete(v) },
                    })
                },
            })
            return () => { if (sub != null) sub.unsubscribe() }
        })
    }
    
    // Async iterators
    async function *zip(iter1, iter2) {
        yield* iter1
        yield* iter2
    }
  • Filtering is generally straightforward, but no function is needed for the async iterator version. Mosts other pure operators/combinators are similarly easier to mirror with async iterators, often just leveraging existing syntax. It's also more flexible.

    // Observables
    Rx.Observable.fromEvent(document, 'mousemove')
        .map(e => ({x: e.clientX, y: e.clientY})
        .filter(pos => pos.x === pos.y)
        .forEach(pos => {
            console.log(`mouse at ${pos.x}, ${pos.y}`)
        })
    
    // Async iterators, no extra operators
    for await (const e in fromEvent(document, "mousemove")) {
        const pos = {x: e.clientX, y: e.clientY}
        if (pos.x === pos.y) console.log(`mouse at ${pos.x}, ${pos.y}`)
    }
  • A basic tee function. Observables don't need it because of their multiple-subscription model, but async iterators do. It's simple enough to write, though.

    function tee(iter) {
        iter = iter[Symbol.asyncIterator]()
        let state = "active"
        let result
    
        async function *teeIter(own, other) {
            while (state === "active") {
                if (own.length) {
                    yield own.shift()
                } else {
                    try {
                        const {done, value} = await iter.next()
                        if (done) {
                            state = "done"
                            result = value
                        } else {
                            other.push(value)
                            yield value
                        }
                    } catch (e) {
                        state = "error"
                        result = e
                    }
                }
            }
    
            if (state === "error") throw result
            yield* own
            return result
        }
    
        const left = []
        const right = []
        return [teeIter(left, right), teeIter(right, left)]
    }

Resource management can be handled by using for await and yielding the resource. The language will handle everything automatically as well, so no cleanup is even needed. Consider this example:

// Observables with RxJS
const fs = require('fs')
const Rx = require('rx')

function appendAsync(fd, buffer) {
    return Rx.fromNodeCallback(callback => fs.write(fd, buffer, callback))
}

function openFile(path, flags) {
  var fd = fs.openSync(path, flags)
  return Rx.Disposable.create(() => fs.closeSync(fd))
}

Rx.Observable.using(
    () => openFile('temp.txt', 'w+'),
    fd =>
        Rx.Observable.range(0, 10000)
            .map(v => Buffer(v))
            .flatMap(buffer => appendAsync(fd, buffer))
).subscribe()

// Async iterators
const fsp = require('fs-promise')

async function *openFile(path, flags) {
  const fd = await fsp.open(path, flags)
  try { yield fd } finally { await fsp.close(fd) }
}

;(async () => {
    for await (const fd of openFile('temp.txt', 'w+')) {
        for (let i = 0; i < 10000; i++) {
            await fsp.write(fd, Buffer.alloc(i))
        }
    }
})()

One key difference between observables and async iterators is that parallel observables can be created in the global scope with ease, while async iterators require separate immediately-invoked async function expressions to be created in parallel. This boilerplate is only at creation time, and you can consider the outer IIAFE as a declarative procedural pipeline instead of the imperative functional observable chain.

// Observables
Rx.Observable.fromEvent(document, 'scroll')
    .forEach(event => /* ... */)

Rx.Observable.fromEvent(document, 'mousemove')
    .forEach(event => /* ... */)

// Async iterators
;(async () => {
    for await (const event of fromEvent(document, 'scroll')) {
        // ...
    }
})()

;(async () => {
    for await (const event of fromEvent(document, 'mousemove')) {
        // ...
    }
})()

Adapting a handler is not hard to set up for either, although unlike with observables, it's not quite trivial to do with async iterators in this case. With async iterators, you have to choose whether to buffer the data or not when adapting a handler (you'd want to buffer I/O reads, but likely not UI events), whereas with observables, buffering never enters the equation. Async iterators are a lower level abstraction, and it shows very well in this case. Note that the below factory could easily be made into a common library abstraction, to gain back the advantage of the concise Observable constructor.

// Observables
function fromHandler(add, remove) {
    return new Observable(observer => {
        const context = add(value => { observer.next(value) })
        return () => remove(context)
    })
}

// Async iterators
class ObserveContext {
    constructor(observe) { this._ = observe }

    next(value) {
        if (this._._closed) return
        if (!this._._resolveNext(value)) {
            this._._pushLast(value)
            this._._waiting = true
        }
    },

    throw(value) {
        return next(Promise.reject(value))
    },

    return(value) {
        if (this._._closed) return
        this._._done = true
        if (!this._._resolveNext(value)) {
            this._._pushLast(value)
            this._._waiting = true
        }
    }
}

class ObserveBase {
    constructor(init) {
        this._remove = remove
        this._closed = this._waiting = this._done = false
        this._last = this._resolve = undefined
        this._cleanup = init(new ObserveContext(this))
    }

    [Symbol.asyncIterator]() { return this }
    poll() { return !this._closed && this._waiting }

    next() {
        if (this._closed) {
            return Promise.reject(new TypeError("iterator closed"))
        } else if (this._hasLast()) {
            return Promise.resolve({
                value: this._nextLast(),
                done: !this._waiting && this._done,
            })
        } else {
            return new Promise(resolve => this._pushResolve(resolve))
        }
    }

    return() {
        if (this._closed) {
            return Promise.reject(new TypeError("iterator closed"))
        }
        if (typeof this._cleanup === "function") (0, this._cleanup)()
        this._last = this._resolve = this._cleanup = undefined
        this._closed = true
    }
}

class ObserveBuffered extends ObserveBase {
    constructor(add, remove) {
        super(add, remove)
        this._last = []
        this._resolve = []
    }

    _resolveNext(value) {
        if (!this._resolve.length) return false
        this._resolve.shift()(value)
        return true
    }

    _pushLast(value) { this._last.push(value) }
    _pushResolve(resolve) { this._resolve.push(resolve) }

    _nextLast() {
        if (this._last.length <= 1) this._waiting = false
        return this._last.shift()
    }
}

class ObserveRaw extends ObserveBase {
    _resolveNext(value) {
        if (this._resolve === undefined) return false
        const resolve = this._resolve
        this._resolve = undefined
        resolve(value)
        return true
    }

    _pushLast(value) { this._last = value }
    _pushResolve(resolve) { this._resolve = resolve }

    _nextLast() {
        const last = this._last
        this._waiting = false
        this._last = undefined
        return last
    }
}

function observe(init) {
    return new ObserveRaw(init)
}

observe.buffer = init => {
    return new ObserveBuffered(init)
}

function fromHandler(add, remove) {
    return observe(observer => {
        const context = add(value => { observer.next(value) })
        return () => remove(context)
    })
}

With that helper, you could shim EventIterator as this, in either observable or async iterator form (the options are to allow the action prevented and/or propagation stopped before the next tick:

function fromEvent(target, name, opts = undefined) {
    return observe(context => {
        function callback(event) {
            if (typeof opts !== "object" && opts !== null) {
                if (opts.preventDefault) event.preventDefault()
                if (opts.stopImmediatePropagation) event.stopImmediatePropagation()
                if (opts.stopPropagation) event.stopPropagation()
            }
           context.next(event)
       }
       target.addEventListener(name, callback, opts)
       return () => target.removeEventListener(name, callback, opts)
   })
}

fromEvent.buffer = (target, name, opts = undefined) {
    return observe.buffer(context => {
        function callback(event) {
            if (typeof opts !== "object" && opts !== null) {
                if (opts.preventDefault) event.preventDefault()
                if (opts.stopImmediatePropagation) event.stopImmediatePropagation()
                if (opts.stopPropagation) event.stopPropagation()
            }
           context.next(event)
       }
       target.addEventListener(name, callback, opts)
       return () => target.removeEventListener(name, callback, opts)
   })
}

Node.js events could be similarly shimmed:

function fromEvent(ee, name) {
    return observe(context => {
        const resolve = value => context.next(value)
        ee.on(name, resolve)
        return () => ee.removeListener(name, resolve)
    })
}

fromEvent.buffer = (ee, name) => {
    return observe.buffer(context => {
        const resolve = value => context.next(value)
        ee.on(name, resolve)
        return () => ee.removeListener(name, resolve)
    })
}

Putting it all together, here's an Angular example I pulled from here (with modification:

// RxJS Observables
function searchWikipedia(term) {
  return rx.Observable.fromPromise($http({
      url: "http://en.wikipedia.org/w/api.php?&callback=JSON_CALLBACK",
      method: "jsonp",
      params: {
        action: "opensearch",
        search: encodeURI(term),
        format: "json"
      }
  }))
}

observeOnScope($scope, 'search')
    .throttle(1000)
    .map(change => change.newValue || "")
    .distinctUntilChanged()
    .flatMapLatest(searchWikipedia)
    .safeApply($scope, result => { $scope.data = result })
    .subscribe();

// Async iterators
const search = awaitLatest(term => $http({
    url: "http://en.wikipedia.org/w/api.php?&callback=JSON_CALLBACK",
    method: "jsonp",
    params: {
        action: "opensearch",
        search: encodeURI(term),
        format: "json",
    },
}))

const throttle = makeThrottle(1000)
let last
for await (const {newValue} of observeOnScope($scope, 'search')) {
    await throttle()
    const value = newValue || ""
    if (value !== last) {
        last = value
        const result = await search(value)
        if (!safeApply($scope, () => $scope.data = result)) break
    }
}

// Async iterator utilities
function awaitLatest(init) {
    let request, result
    return async value => {
        const first = request == null
        request = init(value).then(data => result = data)
        return first ? request : Promise.resolve(result)
    }
}

function safeApply($scope, func) {
    if ($scope.$$destroyed) return false
    if ($scope.$$phase || $scope.$root.$$phase) {
        func()
    } else {
        $scope.apply(() => { func() })
    }
    return true
}

I think you're looking for this: https://github.com/tc39/proposal-async-iteration

Observables are just a different type with different uses.

@isiahmeadows yes, that's definitely async iterators - it's already stage 3 and according to Chrome people implementation is happening right now.

@Blesh @benjamingr

Yes, I'm aware of the differences. I'm just proposing idioms (async generator + callbacks) that would serve most of the same use cases as this proposal, with just an additional standard library function with a similar API to Observables. To clarify, here's what I'm proposing instead (I've edited my original proposal a bit after some thinking):

  1. Add this new standard library class + related interface on top of the async iterator proposal:

    // TypeScript syntax here
    class Observer<T> {
        // Buffering is optional and not the default.
        constructor(init: (context: ObserverContext<T>) => ObserverCleanup?, buffer: boolean = false);
        closed: boolean; // whether the observer is closed
        poll(): boolean; // whether there's any value waiting
        next(): Promise<IteratorResult<T>>;
        throw(error: Error): Promise<never>;
        return<R>(value?: R): Promise<IteratorResult<R>>;
        [Symbol.asyncIterator](): this;
    }
    
    type ObserverCleanup = () => void;
    interface ObserverContext<T> {
        next(value?: T): void;
        throw(error?: Error): void;
        return(value?: any): void;
    }

    These are designed to adapt immediate callbacks where necessary to the async iterator world. The context acts as an intermediary to the returned iterator, although there's a few things to note:

    1. context.next calls don't translate one-to-one to promise resolutions/rejections. The iterator might not have any outstanding requests for data.
    2. context.return is like context.next, but it returns done: true instead for the next iterator.next call.
    3. The returned close function is called by the first of context.return and iterator.return's subsequent next call, since they force a similar state transition.
    4. Buffering isn't the default, since user interface interactions rarely need completely buffered, and most external I/O features a pull-based interface (networking is the exception here).

    The difference between new Observer(init) and new Observer(init, true) is that the constructor only memoizes the last passed value, while Observer.buffer stores all incoming data in a backing buffer.

whether these need buffered or not (I/O needs buffered, but click events don't) is why I have a distinction between buffered and unbuffered iterators

The API is nearly identical to the observable API, but returns an async iterator instead. [Here's a non-mutating prollyfill for my proposal](https://gist.github.com/isiahmeadows/da874c2e2cb8c20ed9d54e8914a29bd4).
  1. Adjust idioms to leverage async iterators instead. As my above examples above show, Promises, closures, and standard async control flow can easily implement most observable/stream operators, often with simpler implementations than the observable equivalent for a similar footprint. If you do need parallelism, it's pretty easy to write a utility with the for await syntax:

    async function forAwaitParallel(iter, func) {
        let result = Promise.resolve()
        for await (const value of iter) {
            const p = (async () => func(value))()
            result = result.then(() => p)
        }
        return result
    }

What I found while pondering concerns regarding with the stage 1 proposal is that, for many purposes, requesting events instead of reacting to them results in 1. decreased memory/CPU pressure, and 2. usually not much more boilerplate at the call site given the right abstraction (often less). So my usage of async iterators for inherently reactive work is not too dissimilar from futures-rs's streams in Rust, and I did in fact take some inspiration from that library in general.

requesting events instead of reacting to them results in 1. decreased memory/CPU pressure

How is it if we have the overhead of creating a promise on every loop? and this also means it can never be sync, unlike observables. How would you have operators like .map?

@satya164

How is it if we have the overhead of creating a promise on every loop?

  • You can't get spammed with frequent event notifications when you have to request each individual notification (a boon for things like scroll events, which you frequently need to debounce or throttle). Besides, for super-high-frequency events like pointer events or animation frames, you're likely to just use raw callbacks instead, because even observables would be too slow.
  • Observables are usually still too slow for high-frequency input changes.
  • The promise is created when the request is, not when the response is. The response allocation is for iterator results, and promises are automatically resolved if an object is ready.
  • For what it's worth, adapting eager callbacks to be lazy is a lot simpler than creating an observable that even encapsulates the creation process, just implementation-wise. It would also be much easier to implement efficiently. My proposed prolyfill makes quite a few temporary objects.
  • Either way, there's still a lot of indirection. Chaining observables creates a giant directional graph, not a simple array, and nobody optimizes observables for straight-line use. Promises are similar, but are much more frequently used for sequential logic, and thus are slightly better optimized for that.

and this also means it can never be sync, unlike observables.

That is a known caveat, which makes this unsuitable for hot loops, but that's why I made the DOM event adapter take options for each event method. (DOM APIs are already trending down this path, anyways, because it's simpler and easier to optimize from an implementation standpoint.) Here's a basic unoptimized implementation.

function fromEvent(target, name, opts = undefined) {
    return new Observer(context => {
        function callback(event) {
            if (typeof opts !== "object" && opts !== null) {
                if (opts.preventDefault) event.preventDefault()
                if (opts.stopImmediatePropagation) event.stopImmediatePropagation()
                if (opts.stopPropagation) event.stopPropagation()
            }
           context.next(event)
       }
       target.addEventListener(name, callback, opts)
       return () => target.removeEventListener(name, callback, opts)
   }, opts != null && !!opts.buffered)
}

I'll note that pretty much every DOM event has no other mutable methods on it besides those three, so it shouldn't matter much in practice.

How would you have operators like .map?

Operators like .map would work similarly to how they would with iterators today:

async function *map(iter, func) {
    for await (const value of iter) yield func(value)
}

async function *filter(iter, func) {
    for await (const value of iter) {
        if (await func(value)) yield value
    }
}

The only reason at this point there isn't such a Lodash-like utility library is because nobody has written one. It's not hard to write, at least for Underscore-worthy speed.

Observables are usually still too slow for high-frequency input changes.

Source?

Either way, this issue seems orthogonal to the purpose of the proposal, which is to propose a push-based observable type... Not an async iterable, which is a different proposal.

@Blesh

Observables are usually still too slow for high-frequency input changes.

  1. s/usually/often/ and s/input/UI/. I'm more thinking of very high-frequency changes like mouse tracking and real-time web sockets, which generally use raw callbacks, anyways.
  2. I'm considering more the common case of low-to-mid frequency updates, which aren't performance critical. Spammy events usually get rate-limited anyways, so it's not that much of a problem.

Either way, this issue seems orthogonal to the purpose of the proposal, which is to propose a push-based observable type... Not an async iterable, which is a different proposal.

I'm thinking of ways to use a pull-based mechanism for wrapping push-based APIs, and I'm experimenting with what such an API might look like, thus my proposal. (I'm somewhat invested, but I'm not going to cry in some corner if it gets ultimately rejected. ๐Ÿ˜„)

I've updated my gist to be much tidier and more speccy with examples.

I'm going to close this short term, so I can toy with it a little better, so maybe I can figure out a more elegant way of dealing with a few things. (My proposal's API has a few smaller outstanding issues.)

I'm more thinking of very high-frequency changes like mouse tracking

Why do you think observables are slow for that? Do you have any examples where they are slow and not usable?

and real-time web sockets, which generally use raw callbacks, anyways.

Sounds unlikely that an observable be slower than a socket coming over the network? You have any examples?

@satya164 I've closed this and withdrawn my proposal. It didn't come out quite as intended. (It's actually worse for some key areas observables succeed, despite the sugar API. Syntax can only do so much.)

I'm more thinking of very high-frequency changes like mouse tracking and real-time web sockets, which generally use raw callbacks, anyways

@isiahmeadows FWIW, to dispell this notion, we use Observables (Rx 2, which is much, much slower than Rx 5) for web sockets and mouse movements in very very busy dashboard UIs at Netflix. I've never seen observable be a perf bottleneck in these scenarios, even with a slower version of RxJS.