socketry/async-websocket

Thread pool / defer work

kstole opened this issue ยท 10 comments

Is there some build-in mechanism for performing long-running work in a thread pool similar to Celluloid's defer? https://www.rubydoc.info/github/celluloid/celluloid/Celluloid:defer

I'm asking this for slack-ruby/slack-ruby-client#237

There is nothing stopping you from making a thread and running work on it. However, I guess you are looking for a better solution. Can we make a short example, even just pseudo code, so I can better understand your requirements?

Here is how you make sub-tasks in async:

require 'async'

puts "Sleep sort:"

numbers = 100.times.collect{rand}

Async.run do |task|
	numbers.each do |value|
		task.async do |subtask|
			subtask.sleep value
			puts value
		end
	end
end

puts "Done :)"

You can use Async.run any time to make a sub-task. Try replacing your defer calls with it. However, you'll need to/want to use async non-blocking IO. The other alternative is to make a Thread. But it's more tricky to get right.

Thanks! I'll do some experimenting with that.

The use case is for responding to Slack commands in slack-ruby-bot. The issue is that if the bot receives a Slack command that requires a while to respond to, then other commands will go unanswered during that time. My solution with Celluloid was to immediately kick the computation to a thread from the pool.

class BotCommand < SlackRubyBot::Commands::Base
  command 'longrunning' do |client, data, match|
    Celluloid.defer do
      sleep 3 # perform long-running operation
      client.say(channel: data.channel, text: 'command successful')
    end
  end
end

As discussed in slack-ruby/slack-ruby-client#237, this would ideally be built into the commands interface so that all command execution is asynchronous and the explicit defer call is unnecessary.

Until me/someone creates async-slack, you are going to be stuck with 3 different backends for concurrency which is going to make things tricky, i.e. trying to be general.

At least for async, you can replace it like so:

class BotCommand < SlackRubyBot::Commands::Base
  command 'longrunning' do |client, data, match|
    Async.run do |task|
      task.sleep 3 # perform long-running operation
      client.say(channel: data.channel, text: 'command successful')
    end
  end
end

But there is a small chance this will block the event loop, because it depends on how that callback is invoked.

You'll also want to use async-http-faraday to make the HTTP invocation asynchronous too.

 require 'async/http/faraday'
 Faraday.default_adapter = :async_http

Hey sorry for the late response. Finally got around to testing this. I couldn't get your suggestion to do what I want.

Using this example command with async-websocket for concurrency and Async.run, I call this command and then call it again while the first is still running. It should respond to both commands but doesn't.

command 'longrunning' do |client, data, match|
  count = match[:expression]
  Async.run do |task|
    5.times do |i|
      client.say(channel: data.channel, text: "Command #{count} - Iteration #{i}")
      sleep 1
    end
    client.say(channel: data.channel, text: "Command #{count} - Finished")
  end
end

async-websocket

By comparison, when i use celluloid-io for concurrency and Celluloid.defer rather than Async.run, I get the correct result.

celluloid

You need to use task.sleep

Hmm, so using task.sleep works for my little demo but in real testing, I'm still not getting the desired result. Long running commands (such as calling external APIs) are still blocking other commands until they return. I suppose this goes to your comment about async/http/faraday but that's inconvenient as my API clients are using HTTParty. It sounds like the way Celluloid.defer works must be different to allow any HTTP calls to execute without blocking

Celluloid.defer probably runs it on a different thread.

There is no silver bullet. Running 1 request per thread will only scale so far.

I've been considering adding a work/thread pool library to async but it's low on the priority at the moment, because it's mostly just a crutch for blocking code.

Does HTTParty allow you to replace the underlying implementation?

Not sure, but it's not a complete blocker. I could switch to Faraday with some effort. Unfortunately I don't have a lot of time to dedicate to investigating this. Thanks for the quick responses though!

Here is a basic implementation of defer for async:

#!/usr/bin/env ruby

require 'async'
require 'async/io/notification'

def defer(*args, &block)
	Async do
		notification = Async::IO::Notification.new
		
		thread = Thread.new(*args) do
			yield
		ensure
			notification.signal
		end
		
		notification.wait
		thread.join
	end
end

Async do
	10.times do
		defer do
			puts "I'm going to sleep"
			sleep 1
			puts "I'm going to wake up"
		end
	end
end

The implementation for deferred work is given. I'm going to close this issue. If it's popular enough, maybe it can be a gem, or implemented in async directly.