Async task on subscribe
EugeneIstomin opened this issue · 3 comments
Hello,
is subscribe actions belongs to async-runner model, or it is only sync, here is the example
Not working in parallel:
Iodine.subscribe(:events) do |ch, msg|
if Iodine.master?
Iodine.run do
p 11111
sleep 5
p 22222
end
end
end
Working in parallel but threads:
Iodine.subscribe(:events) do |ch, msg|
if Iodine.master?
@thr = Thread.new do
p 11111
sleep 5
p 22222
end
end
end
Hi @EugeneIstomin ,
I am not sure what you mean by async
in your specific case.
Iodine.run
will schedule a task / proc
to be performed later. However, that task is still performed within an iodine worker thread, so when you call sleep 5
that task will block the worker thread. If you have only one thread, that task will block iodine completely.
Consider the following as a example:
module AsyncMe
@thread = nil
@queue = Queue.new
@performed_once_and_only_once = Mutex.new
def self.validate_subscription
return unless @performed_once_and_only_once.try_lock
Iodine.subscribe(:events, &AsyncMe.method(:async_task))
@thread = Thread.new do
while(true) do
(@queue.shift).call
end
end
end
def self.async_task(ch, msg)
unless Iodine.master?
Iodine.unsubscribe(:events)
return
end
@queue << Proc.new do
p Process.pid, 1111
sleep 5
p 5555
Iodine.run { p "done" }
end
p "scheduled"
end
end
# forking mode
Iodine.on_state(:enter_master, &AsyncMe.method(:validate_subscription))
# non-forking mode
Iodine.on_state(:on_start, &AsyncMe.method(:validate_subscription))
As you can see, there's no need to spawn a different thread per task (which is a security risk), instead spawn additional worker thread(s) and manage a queue (cleanup code is missing from the example).
The Iodine.run
is used to defer a task within the iodine reactor, which might be useful for small tasks or for breaking CPU heavy tasks into smaller chunks (preventing a thread being blocked)... but it's not designed to run CPU heavy tasks.
Good Luck!
Thanks, works great.
One problem i discovered is a USR1 trapping:
current handler ( old_sigusr1 = Signal.trap("SIGUSR1") { old_sigusr1.call if old_sigusr1.respond_to?(:call) }
) do not kill the older threads, and trapping directly in thread do not run old_sigusr1
def subscriptionCaller
Iodine.subscribe("#{ENV['memexDomain']}::calls", &Messagebus.method(:asyncTask))
@thread = Thread.new do
Signal.trap("USR1") {@thread.kill}
while(true) do
(@queue.shift).call
end
end
end
Yes, the USR1
signal is used by Iodine to signal a hot-restart (the master process shuts down and respawns the workers).
If you're trying to run cleanup code, consider using Iodine.on_state(:on_finish) { ... }
or Iodine.on_state(:start_shutdown) { ... }
.
My example might become:
module AsyncMe
@thread = nil
@queue = Queue.new
@performed_once_and_only_once = Mutex.new
@flag = true
def self.validate_subscription
return unless @performed_once_and_only_once.try_lock
Iodine.subscribe(to: "*", match: :redis, &AsyncMe.method(:async_task))
@thread = Thread.new do
while(@flag) do
(@queue.shift).call
end
end
end
def self.async_task(ch, msg)
unless Iodine.master?
Iodine.unsubscribe(to: "*", match: :redis)
return
end
@queue << Proc.new do
p Process.pid, 1111
sleep 5
p 5555
Iodine.run { p "done" }
end
p "scheduled"
end
def self.cleanup
return unless @thread
@flag = false
@queue << Proc.new { puts "Async worker done." }
@thread.join
puts "Async worker finished."
end
end
# forking mode
Iodine.on_state(:enter_master, &AsyncMe.method(:validate_subscription))
# non-forking mode
Iodine.on_state(:on_start, &AsyncMe.method(:validate_subscription))
# cleanup
Iodine.on_state(:on_finish, &AsyncMe.method(:cleanup))