ReactiveX/RxRuby

RxRuby and concurrency

gizmomogwai opened this issue · 11 comments

There is already ticket #71, but I am stumbling over something else.

  1. When I create an Observable with e.g. RxRuby::Observable.timer(0, 1) the events of the stream are each in an own thread.
  2. From the documentation and sourcecode I could not find out what I have to do to create an own scheduler that is usable with observe_on. I tried to spy into it by providing an object with a method_missing implementation, but when I implemented schedule_recursive_with_state in my scheduler it was not called.
  3. Furthermore I could not find out how to transport all of the events created by the timer to one defined thread.

Observable.timer() uses DefaultScheduler by default which creates new thread for each event. If you want timer events to be in the same thread, you might want to pass the CurrentThreadScheduler to the method like this; Observable.timer(0, 1, RxRuby::CurrentTheadScheduler.instance).
Does this solve some of your problems?

Is there a scheduler that transports the events to a thread != the main thread?
e.g. (taken from the java api):

Observable.just("one", "two", "three", "four", "five")
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(/* an Observer */);

or https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/schedulers/ExecutorScheduler.java

I tried to implement an own Scheduler but failed. The code is in the lines of this:

class MyScheduler
  def initialize(looper)
    @looper = looper
  end

  def schedule_recursive_with_state(state, due_time, action)
    return @looper.enqueue(action)
  end
end

Looper is a tread that works on a Queue, enqueue would put the action to this queue (ignoring the due_time and state for now) and then action should be called from the Looper-Thread.

Could you give me some pointers on what i miss? I set up the whole chain like this:

s = MyScheduler.new(looper)
RxRuby::Observable.timer(0, 1).observe_on(s).subscribe(->(x){looper.enqueue(x)}, ->(e){puts "error #{e}"}, ->(){puts "finished"})
sleep(10)

I see what you mean 😄

s = MyScheduler.new(looper)
RxRuby::Observable.timer(0, 1).observe_on(s).subscribe(->(x){looper.enqueue(x)}, ->(e){puts "error #{e}"}, ->(){puts "finished"})
sleep(10)

Could you give me what result(output) or error you got from the code?

Hi @minamimonji, thanks for following up. I prepared this stripped down version, perhaps you could have a look. It tries to mimic java Executors.

require 'rx_ruby'

# simple thing that takes stuff from a queue and runs it
class Executor
  def initialize
    @queue = Queue.new
    @t = Thread.new do
      while true
        begin
          next_runnable = @queue.pop
          puts "calling invoke on #{next_runnable}"
          next_runnable.invoke
        rescue Exception => e
          puts e.backtrace
          puts e
        end
      end
    end
  end
  def enqueue(runnable)
    @queue.push(runnable)
  end
end

class T
  def method_missing(name, *args, &block)
    puts "called #{name} with #{args}"
  end
end

# that is my scheduler that is connected to an executor
class MyScheduler
  def initialize(executor)
    @executor = executor
  end

  def schedule_recursive_with_state(state, action)
    dt = Time.now.to_i
#############################
# usually i would expect to pass in state here as second parameter!
    res = RxRuby::ScheduledItem.new(self, T.new, dt, &action)
    @executor.enqueue(res)
  end
end
executor = Executor.new
scheduler = MyScheduler.new(executor)

#event every second
source = RxRuby::Observable.timer(0, 1)
#the events should be handled in my executor -> observe_on with a std. subscriber at the end
source.observe_on(scheduler).subscribe(->(x){puts "onEvent #{x}"}, ->(e){puts "error #{e}"}, ->(){puts "finished"})

sleep(10)

Here I have 2 problems:

  1. I do not what to do with the call MyScheduler.action that is intercepted from method_missing.
  2. I cannot find out how to call the stuff, that is passed into the scheduler.

I tried more

require 'rx_ruby'

# simple thing that takes stuff from a queue and runs it
class Executor
  def initialize
    @queue = Queue.new
    @t = Thread.new do
      puts "ttt"
      puts "ExecutorThread #{Thread.current}"
      while true
        begin
          next_runnable = @queue.pop
          puts "calling invoke on #{next_runnable}"
          next_runnable.invoke
        rescue Exception => e
          puts e.backtrace
          puts e
        end
      end
    end
  end
  def enqueue(runnable)
    @queue.push(runnable)
  end
end


# that is my scheduler that is connected to an executor
class MyScheduler
  def initialize(executor)
    @executor = executor
  end

  def schedule_recursive_with_state(state, action)
    dt = Time.now.to_i
    res = RxRuby::ScheduledItem.new(self, T.new, dt, &action)
    @executor.enqueue(res)
  end
end
executor = Executor.new
scheduler = MyScheduler.new(executor)

#event every second
source = RxRuby::Observable.timer(0, 1)
#the events should be handled in my executor -> observe_on with a std. subscriber at the end
source.map do |x|
  puts "in map #{Thread.current}"
  x
end.observe_on(scheduler).subscribe(->(x){puts "onEvent #{x} in thread #{Thread.current}"}, ->(e){puts "error #{e}"}, ->(){puts "finished"})

sleep(10)

the debug output shows, that there are more events produced, the first event already arrived in the correct thread in the subscriber, but after this, something goes wrong.

could you please help me out here?

@gizmomogwai Thanks for more info.
I haven't had time to look into it this weekend. I would take a look at it in a couple of days.

@minamimonji thank ... i appreciate your efforts (a lot!!!)

@gizmomogwai Could you run your code above with MyScheduler below?

class MyScheduler
  include RxRuby::Scheduler
  def initialize(executor)
    @executor = executor
  end

  def schedule_with_state(state, action)
    dt = Time.now.to_i
    res = RxRuby::ScheduledItem.new(self, state, dt, &action)
    @executor.enqueue(res)
    RxRuby::Subscription.empty
  end
end

The main differences are including the existing RuRuby::Scheduler to let it handle recursion, and implementing schedule_with_state instead of schedule_recursive_with_state.

@minamimonji thanks a lot ... your suggestion solves my problem!
One last question: In this example one would have to return a real subscription to support unsubscribe.
What should be done in this case to be compliant with rx_ruby's philosophy?

@gizmomogwai You might want to clean up objects, or threads when unsubscribe is called. In this case the thread in the Executor should be killed. You can write a subscription in MyScheduler like this,

def schedule_with_state(state, action)
  #...
  @executor.enqueue(res)
  RxRuby::Subscription.create{ @executor.terminate } # code in the block called when unsubscribe is called.
  # Note that you have to define Executor#terminate method to kill the thread `@t` in the `Executor`.
end

Wow ... thanks a lot ... that really helps!