/Observable

Playing around with observables and reactive javascript

Primary LanguageJavaScript

Observable

Playing around with observables and reactive javascript

var Observable = require("../");
var assert = require('assert');
var sinon = require('sinon');

describe("Observable", function () {
    var obsv, listener;
    var clock;

    beforeEach(function () {
        clock = sinon.useFakeTimers();
        obsv = Observable.create();
        listener = sinon.spy();
    });
    after(function () {
        clock.restore();
    });

    it("should be able to send and subscribe to data", function () {
        obsv.on("data", listener);
        obsv.send("hello");
        assert(listener.calledWith("hello"));
    });

    it("should able to map values", function () {
        obsv.map(function (x) {
            return x + 1;
        })
            .on("data", listener);
        obsv.send(50);
        assert(listener.calledWith(51));
    });

    it("should be able to filter values", function () {
        obsv.filter(function (x) {
            return x > 100;
        })
            .on("data", listener);

        obsv.send(50).send(200);

        assert(listener.calledOnce);
        assert(listener.calledWith(200));
    });

    it("should be able to flatten", function () {
        obsv.flatten()
            .on("data", listener);
        obsv.send([1, 2, 3]);

        assert(listener.callCount === 3);
        assert(listener.calledWith(1));
        assert(listener.calledWith(2));
        assert(listener.calledWith(3));
    });

    it("should be able to flatMap", function () {
        obsv.flatMap(function (str) {
            return str.split("");
        })
            .on("data", listener);
        obsv.send("str");

        assert(listener.callCount === 3);
        assert(listener.calledWith("s"));
        assert(listener.calledWith("t"));
        assert(listener.calledWith("r"));
    });

    it("should be able to async map", function () {
        obsv.map(function (x, out) {
            setTimeout(function () {
                out.send(x + 1)
            }, 1)
        })
            .on("data", listener);
        obsv.send(10);

        clock.tick(100);
        assert(listener.calledWith(11));
    });

    it("should be able to async flatMap", function () {
        obsv.flatMap(function (str, out) {
            setTimeout(function () {
                out.send(str.split(""))
            }, 1)
        })
            .on("data", listener);

        obsv.send("ok");

        clock.tick(100);
        assert(listener.callCount === 2);
        assert(listener.calledWith("o"));
        assert(listener.calledWith("k"));
    });

    it("should be able to combine", function () {
        var s1 = Observable.create();
        var s2 = Observable.create();
        var s3 = Observable.combine([s1, s2]);

        s3.on("data", listener);

        s1.send(1)
          .send(2);

        assert(listener.callCount === 2);
        assert(listener.calledWith(1));
        assert(listener.calledWith(2));
    });

    describe("piping", function(){
        it("should be able to pipe 'data' signals", function(){
            var s1 = Observable.create();
            var s2 = Observable.create();
            s1.pipe(s2);
            s2.on("data", listener);

            s1.send("hello");

            assert(listener.callCount === 1);
            assert(listener.calledWith("hello"));
        });

        it("should be able to pipe 'end' signals", function(){
            var s1 = Observable.create();
            var s2 = Observable.create();
            s1.pipe(s2);
            s2.on("end", listener);

            s1.end();
            assert(listener.calledOnce);
        });

        it("should be able to pipe 'error' signals", function(){
            var s1 = Observable.create();
            var s2 = Observable.create();
            s1.pipe(s2);
            s2.on("error", listener);

            s1.error();
            assert(listener.calledOnce);
        });
    });
    describe("ending an observable", function(){
        it("should be able to announce when done", function(){
            obsv.on("end", listener);
            obsv.send(1)
                .send(2)
                .end();

            assert(listener.callCount === 1);
            assert(listener.calledOnce);
        });

        it("should pass the 'end' signal to all mutations too", function(){
            obsv
                .flatten()
                .map(function(x){return x+1;})
                .on("end", listener);

            obsv.end();

            assert(listener.callCount === 1);
            assert(listener.calledOnce);
        });
    });

    describe("error handling", function(){
        it("should be able to send error signals", function(){
            obsv.on("error", listener);
            obsv.error("error text");

            assert(listener.callCount === 1);
            assert(listener.calledWith("error text", obsv));
        });

        it("should pass the 'error' signal to all mutations too", function(){
            obsv
                .flatten()
                .map(function(x){return x+1;})
                .on("error", listener);

            obsv.error("error text");

            assert(listener.callCount === 1);
            assert(listener.calledWith("error text", obsv));
        });
    });
});