
This is a proof-of-concept for using observables over a network. It uses the WAMP protocol (Thruway) with websockets, but can work with any transport supported by WAMP.

Goals of this project:

  • Provide "Hot Observables" or Subjects that work seamlessly over a network
  • Provide "Cold Observables" that work seamlessly over a network
  • Model every aspect of the underlying tech with Rx components
  • Support any language that has a WAMP client


Download the zip file and then:

  composer install



This uses Thruway's publish and subscribe under the hood, but exposes it as a single channel (Subject) that can be subscribed to or observed on.

//Client 1
$remote    = new ReactiveNet("ws://", 'realm1'); //WAMP router
$scheduler = new EventLoopScheduler(\EventLoop\getLoop());

//Emit every second on the channel ''
$intervalObserver = $remote->channel('');
    ->subscribe($intervalObserver, $scheduler);

//Client 2
$remote    = new ReactiveNet("ws://", 'realm1');
$scheduler = new EventLoopScheduler(\EventLoop\getLoop());

//Subscribe to just one stream
$obs1 = $remote->channel('');

$obs1->subscribeCallback(function ($x) {
    echo $x, PHP_EOL;

##Calls (Cold Observable)

Uses Thruway's RPC with progress to create a cold observable

$remote    = new ReactiveNet("ws://", 'realm1');
$scheduler = new EventLoopScheduler(\EventLoop\getLoop());

//Client 1 - Register Call
$remote->register("some.progress.test2", function ($args) use ($scheduler) {
    return Observable::interval(1000, $scheduler)->take($args[2]);
        function ($x) {
            echo "registered", PHP_EOL;

//Client 2 - Make Call
$remote->call('some.progress.test', [1, 2, 3])
        function ($result) {
            echo $result, PHP_EOL;
        function (Exception $e) {
            echo $e->getMessage();
        function () {
            echo "completed", PHP_EOL;


    var remote = new ReactiveNet("ws://", 'realm1');

    var obs1 ='');

    //Subscribe to just one stream
    obs1.subscribe(function (x) {

    //Make Call'some.progress.test', [1, 2, 3]).subscribe(
            function (result) {
            function (err) {
            function () {


  • Write tests
  • Handle reconnecting
  • Better error handling
  • Dispose stuff
  • Rename call and register.

See more in the examples

Similar project ReactiveSocket