boazsegev/iodine

Async task on subscribe

EugeneIstomin opened this issue · 3 comments

Hello,

is subscribe actions belongs to async-runner model, or it is only sync, here is the example

Not working in parallel:

Iodine.subscribe(:events) do |ch, msg|
  if Iodine.master?  
    Iodine.run do  
      p 11111
      sleep 5
      p 22222
    end
  end
end

Working in parallel but threads:

Iodine.subscribe(:events) do |ch, msg|
  if Iodine.master?  
    @thr = Thread.new do  
      p 11111
      sleep 5
      p 22222
    end
  end
end

Hi @EugeneIstomin ,

I am not sure what you mean by async in your specific case.

Iodine.run will schedule a task / proc to be performed later. However, that task is still performed within an iodine worker thread, so when you call sleep 5 that task will block the worker thread. If you have only one thread, that task will block iodine completely.

Consider the following as a example:

module AsyncMe
  @thread = nil
  @queue = Queue.new
  @performed_once_and_only_once = Mutex.new
  def self.validate_subscription
    return unless @performed_once_and_only_once.try_lock
    Iodine.subscribe(:events, &AsyncMe.method(:async_task))
    @thread = Thread.new do
      while(true) do
        (@queue.shift).call
      end
    end
  end

  def self.async_task(ch, msg)
    unless Iodine.master?
      Iodine.unsubscribe(:events)
      return
    end
    @queue << Proc.new do
      p Process.pid, 1111
      sleep 5
      p 5555
      Iodine.run { p "done" }
    end
    p "scheduled"
  end
end

# forking mode
Iodine.on_state(:enter_master, &AsyncMe.method(:validate_subscription))
# non-forking mode
Iodine.on_state(:on_start, &AsyncMe.method(:validate_subscription))

As you can see, there's no need to spawn a different thread per task (which is a security risk), instead spawn additional worker thread(s) and manage a queue (cleanup code is missing from the example).

The Iodine.run is used to defer a task within the iodine reactor, which might be useful for small tasks or for breaking CPU heavy tasks into smaller chunks (preventing a thread being blocked)... but it's not designed to run CPU heavy tasks.

Good Luck!

Thanks, works great.
One problem i discovered is a USR1 trapping:

current handler ( old_sigusr1 = Signal.trap("SIGUSR1") { old_sigusr1.call if old_sigusr1.respond_to?(:call) }) do not kill the older threads, and trapping directly in thread do not run old_sigusr1

    def subscriptionCaller
      Iodine.subscribe("#{ENV['memexDomain']}::calls", &Messagebus.method(:asyncTask))
      @thread = Thread.new do
        Signal.trap("USR1") {@thread.kill}
        while(true) do
          (@queue.shift).call
        end
      end
    end

Yes, the USR1 signal is used by Iodine to signal a hot-restart (the master process shuts down and respawns the workers).

If you're trying to run cleanup code, consider using Iodine.on_state(:on_finish) { ... } or Iodine.on_state(:start_shutdown) { ... }.

My example might become:

module AsyncMe
  @thread = nil
  @queue = Queue.new
  @performed_once_and_only_once = Mutex.new
  @flag = true
  def self.validate_subscription
    return unless @performed_once_and_only_once.try_lock
    Iodine.subscribe(to: "*", match: :redis, &AsyncMe.method(:async_task))
    @thread = Thread.new do
      while(@flag) do
        (@queue.shift).call
      end
    end
  end

  def self.async_task(ch, msg)
    unless Iodine.master?
      Iodine.unsubscribe(to: "*", match: :redis)
      return
    end
    @queue << Proc.new do
      p Process.pid, 1111
      sleep 5
      p 5555
      Iodine.run { p "done" }
    end
    p "scheduled"
  end

  def self.cleanup
    return unless @thread
    @flag = false
    @queue << Proc.new { puts "Async worker done." }
    @thread.join
    puts "Async worker finished." 
  end
end

# forking mode
Iodine.on_state(:enter_master, &AsyncMe.method(:validate_subscription))
# non-forking mode
Iodine.on_state(:on_start, &AsyncMe.method(:validate_subscription))
# cleanup
Iodine.on_state(:on_finish, &AsyncMe.method(:cleanup))