RxRuby and concurrency
gizmomogwai opened this issue · 11 comments
There is already ticket #71, but I am stumbling over something else.
- When I create an Observable with e.g. RxRuby::Observable.timer(0, 1) the events of the stream are each in an own thread.
- From the documentation and sourcecode I could not find out what I have to do to create an own scheduler that is usable with observe_on. I tried to spy into it by providing an object with a method_missing implementation, but when I implemented schedule_recursive_with_state in my scheduler it was not called.
- Furthermore I could not find out how to transport all of the events created by the timer to one defined thread.
Observable.timer()
uses DefaultScheduler
by default which creates new thread for each event. If you want timer events to be in the same thread, you might want to pass the CurrentThreadScheduler
to the method like this; Observable.timer(0, 1, RxRuby::CurrentTheadScheduler.instance)
.
Does this solve some of your problems?
Is there a scheduler that transports the events to a thread != the main thread?
e.g. (taken from the java api):
Observable.just("one", "two", "three", "four", "five")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(/* an Observer */);
or https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/schedulers/ExecutorScheduler.java
I tried to implement an own Scheduler but failed. The code is in the lines of this:
class MyScheduler
def initialize(looper)
@looper = looper
end
def schedule_recursive_with_state(state, due_time, action)
return @looper.enqueue(action)
end
end
Looper is a tread that works on a Queue, enqueue would put the action to this queue (ignoring the due_time and state for now) and then action should be called from the Looper-Thread.
Could you give me some pointers on what i miss? I set up the whole chain like this:
s = MyScheduler.new(looper)
RxRuby::Observable.timer(0, 1).observe_on(s).subscribe(->(x){looper.enqueue(x)}, ->(e){puts "error #{e}"}, ->(){puts "finished"})
sleep(10)
I see what you mean 😄
s = MyScheduler.new(looper)
RxRuby::Observable.timer(0, 1).observe_on(s).subscribe(->(x){looper.enqueue(x)}, ->(e){puts "error #{e}"}, ->(){puts "finished"})
sleep(10)
Could you give me what result(output) or error you got from the code?
Hi @minamimonji, thanks for following up. I prepared this stripped down version, perhaps you could have a look. It tries to mimic java Executors.
require 'rx_ruby'
# simple thing that takes stuff from a queue and runs it
class Executor
def initialize
@queue = Queue.new
@t = Thread.new do
while true
begin
next_runnable = @queue.pop
puts "calling invoke on #{next_runnable}"
next_runnable.invoke
rescue Exception => e
puts e.backtrace
puts e
end
end
end
end
def enqueue(runnable)
@queue.push(runnable)
end
end
class T
def method_missing(name, *args, &block)
puts "called #{name} with #{args}"
end
end
# that is my scheduler that is connected to an executor
class MyScheduler
def initialize(executor)
@executor = executor
end
def schedule_recursive_with_state(state, action)
dt = Time.now.to_i
#############################
# usually i would expect to pass in state here as second parameter!
res = RxRuby::ScheduledItem.new(self, T.new, dt, &action)
@executor.enqueue(res)
end
end
executor = Executor.new
scheduler = MyScheduler.new(executor)
#event every second
source = RxRuby::Observable.timer(0, 1)
#the events should be handled in my executor -> observe_on with a std. subscriber at the end
source.observe_on(scheduler).subscribe(->(x){puts "onEvent #{x}"}, ->(e){puts "error #{e}"}, ->(){puts "finished"})
sleep(10)
Here I have 2 problems:
- I do not what to do with the call MyScheduler.action that is intercepted from method_missing.
- I cannot find out how to call the stuff, that is passed into the scheduler.
I tried more
require 'rx_ruby'
# simple thing that takes stuff from a queue and runs it
class Executor
def initialize
@queue = Queue.new
@t = Thread.new do
puts "ttt"
puts "ExecutorThread #{Thread.current}"
while true
begin
next_runnable = @queue.pop
puts "calling invoke on #{next_runnable}"
next_runnable.invoke
rescue Exception => e
puts e.backtrace
puts e
end
end
end
end
def enqueue(runnable)
@queue.push(runnable)
end
end
# that is my scheduler that is connected to an executor
class MyScheduler
def initialize(executor)
@executor = executor
end
def schedule_recursive_with_state(state, action)
dt = Time.now.to_i
res = RxRuby::ScheduledItem.new(self, T.new, dt, &action)
@executor.enqueue(res)
end
end
executor = Executor.new
scheduler = MyScheduler.new(executor)
#event every second
source = RxRuby::Observable.timer(0, 1)
#the events should be handled in my executor -> observe_on with a std. subscriber at the end
source.map do |x|
puts "in map #{Thread.current}"
x
end.observe_on(scheduler).subscribe(->(x){puts "onEvent #{x} in thread #{Thread.current}"}, ->(e){puts "error #{e}"}, ->(){puts "finished"})
sleep(10)
the debug output shows, that there are more events produced, the first event already arrived in the correct thread in the subscriber, but after this, something goes wrong.
could you please help me out here?
@gizmomogwai Thanks for more info.
I haven't had time to look into it this weekend. I would take a look at it in a couple of days.
@minamimonji thank ... i appreciate your efforts (a lot!!!)
@gizmomogwai Could you run your code above with MyScheduler
below?
class MyScheduler
include RxRuby::Scheduler
def initialize(executor)
@executor = executor
end
def schedule_with_state(state, action)
dt = Time.now.to_i
res = RxRuby::ScheduledItem.new(self, state, dt, &action)
@executor.enqueue(res)
RxRuby::Subscription.empty
end
end
The main differences are including the existing RuRuby::Scheduler
to let it handle recursion, and implementing schedule_with_state
instead of schedule_recursive_with_state
.
@minamimonji thanks a lot ... your suggestion solves my problem!
One last question: In this example one would have to return a real subscription to support unsubscribe.
What should be done in this case to be compliant with rx_ruby's philosophy?
@gizmomogwai You might want to clean up objects, or threads when unsubscribe
is called. In this case the thread in the Executor
should be killed. You can write a subscription in MyScheduler
like this,
def schedule_with_state(state, action)
#...
@executor.enqueue(res)
RxRuby::Subscription.create{ @executor.terminate } # code in the block called when unsubscribe is called.
# Note that you have to define Executor#terminate method to kill the thread `@t` in the `Executor`.
end
Wow ... thanks a lot ... that really helps!