Inspired by Enumerable#lazy coupled with Enumerator::Lazy, and the Async.js library, I thought it would be cool to have syntax like urls.async.map{ |url| http_get(url) }
.
require 'benchmark'
sync = Benchmark.measure do
5.times.each{ sleep 0.1 }
end
async = Benchmark.measure do
5.times.async.each{ sleep 0.1 }
end
puts sync, async
# 0.000000 0.000000 0.000000 ( 0.500782)
# 0.000000 0.010000 0.010000 ( 0.102763)
The implementation was based on Enumerable#lazy
introduced with Ruby 2.0. Enumerator::Async
follows a similar approach, where the Enumerator is passed into the constructor.
enum = ('a'..'z').each
# the following are equivalent
Enumerator::Async.new(enum)
enum.async
To get the enumerator back from the async enumerator, simply call sync
or to_enum
like so:
enum.async.sync == enum
# => true
Every operation on the async enumerator affects the contained enumerator in the same way. For instance:
enum.with_index.to_a
# => [ ['a', 0], ['b', 1], ['c', 2] ... ]
enum.async.with_index.to_a
# => [ ['a', 0], ['b', 1], ['c', 2] ... ]
Async methods can be chained just like tipical enumerator methods:
enum.async.each{ sleep(0.1) }.each{ sleep(0.1) }
The method Enumerable#async
was added so that every collection can be processed in parallel:
[ 0, 1, 2 ].async.each{ |i| puts i }
(0..2).async.map(&:to_i)
# or chain to your heart's content
(0..5).reject(&:even?).reverse.async.with_index.map{ |x, index| x + index }
To limit the thread pool size, you can pass in an optional parameter to async
. Suppose for performance reasons you want to use a maximum of 4 threads:
(0..100).async(4).each do
# use bandwith
end
When programming concurrently, nasty bugs can come up because some operations aren't atomic. For instance, incrementing a variable x += 1
will not necessarily work as expected. Use a lock when encountering these types of errors.
require 'async_enum'
count = 0
mutex = Mutex.new
('a'..'z').async.each do
mutex.synchronize do
count += 1
end
end
count
# => 26
There used to be a lock DSL syntax, but it ruined using async_enum in scoped method calls.
To install it, add it to your gemfile:
# Gemfile
gem 'async_enum'
To run the demos, clone the project and run with the library included in the load path:
$ git clone git@github.com:aj0strow/async_enum
$ cd async_enum
$ ruby -I lib demos/sleep.rb
- Yoshida Tetsuya (@yoshida-eth0) fixed the fiber error issue, and made the gem work with hashes.
Please report errors and feel free to contribute and improve things!