Promises based tools to manage asynchronous tasks in crystal
Hello guys! ;)
This little baby is still under heavy development. You can see my roadmap below!
This will be updated with the release of a stable version
The purpose of Async is to offer a collection of usefull tools to handle asynchronous tasks in crystal.
It comes with :
-
Promise, a wrapper allowing the user to do complex actions in only one line, such as
- Waiting for a task to be completed
- Running task in background and get their result later
- Assigning callbacks on success / error for an asynchronous task
- and many many more! ;)
-
Pool, a tool based on a classic thread pool architecture. It allows the user to
- launch multiple asynchronous workers, always ready to execute tasks
- add tasks to the pool. Workers will take it for you and execute it asynchronously in the bakckground!
- control the execution of MANY jobs simultaneous with different methods
- not to relaunch, for example, a Thread, each time you want to launch an action in the background : here, the workers are launched only ONE time, and are picking differents tasks!
Every class comes with three different implementation, each time using a different type of worker :
- Fibers : native crystal green thread, those you can invoke using
spawn
- Threads : as crystal doesn't natively support threads for the moment, it is based on C bindings of pthread
- Process : an entire process, with a lighter memory
Feel free to contribute, or to send tips !
And don't hesitate to give a star if you like it, of course!
- Add the dependency to your
shard.yml
:
dependencies:
async:
github: LeChatErrant/async
- Run
shards install
The documentation is not ready for now, as the project is still in development and subject to changes
https://lechaterrant.github.io/async/
require "async"
include Async
Promises are still currently in development! ;) I'm actually working on callbacks, and their Thread version will come soon
Promise is a wrapper around an asynchronous task. This task can be handled with a crystal Fiber, a Thread, or a Process, respectively with FiberPromise, ThreadPromise, and ProcessPromise.
It is build on the Promise model of Javascript (ES6), and allow multiple action with it
Each Promise has the same api (see Documentation), see below for an example using FiberPromise
Keep in mind that here, we're working with FiberPromise, so the asynchronous task is launched in a Fiber. With a ThreadPool, it would have been a Thread, and with a ProcessPool, an entire Process
Using bracket notation
promise = FiberPromise.new(->{ puts "Hello" })
# Notice that creating a Promise immediatly launches the wrapped Proc
Using do/end notation
promise = FiberPromise.new(->(arg : String) do
puts arg
end, "Hello")
# Notice that arguments are given after the Proc
# You code won't compile if you forget an argument
From an existing function
def say(arg : String)
puts arg
end
proc_say = ->say(String)
promise = FiberPromise.new(proc_say, "Hello)
# Or simply
promise = FiberPromise.new(->say(String), "Hello")
await
block the execution until the given Proc is finished
await FiberPromise.new(-> do
puts "time for a nap!"
sleep 2.seconds
puts "zzz..."
sleep 2.seconds
puts "I'm awake! :)"
end)
puts "I'm after await"
It's usefull to wait for a specific task before your program continues
Waiting for an already resolved Promise won't have any effect
promise = FiberPromise.new(->{ puts "hello" })
await promise
Try to display a promise : you'll get its state!
puts promise # #<Async::FiberPromise:object_id> PENDING
puts promise.state # PENDING
# .get will return it's state too until resolved
puts promise.get # PENDING
A promise is said PENDING when currently executed, RESOLVED when finished, and REJECTED if an error was thrown
When the promise is resolved, you can access its value
promised_value = await promise
puts promised_value # returned value
puts promise.get # returned value
puts promise.state # RESOLVED
puts promise # #<Async::FiberPromise:object_id> RESOLVED
Crystal can't infer the type of the returned value at compile time. For example, the following code won't compile
value = await FiberPromise.new(->{ "I am a String" })
puts value.split " " # Compile time error, as Crystal is not sure that value is a String
You can fix this problem by enforcing the type with the .as
method
value = await FiberPromise.new(->{ resolve "I am a String" })
puts value.as(String).split " "
But be careful! If the returned value was not a String, your code will crash
Prefer using a typed await
instead of the .as
notation
value = await String, FiberPromise.new(->{ resolve "I am a String and I know it!" })
puts value.split " "
Of course, you can use the keyword return
inside a Promise, and return different types of values, and multiple values at the same time
conditionnal_proc = ->(toggle : Bool) do
if toggle
return "I received true! :)"
end
return false, "I received false... :("
end
puts await FiberPromise.new(conditionnal_proc, true) # I received true! :)
puts await FiberPromise.new(conditionnal_proc, false) # {false, "I received false... :("}
Errors can be thrown inside a Promise, and are caught for you : the return value will be the Exception raised
promise = FiberPromise.new(->{
raise "Oh, no!"
})
value = await promise
puts value # Oh, no!
puts value.class # Exception
When you're inside a promise, prefere using resolve
and reject
, instead of return
and raise
.
Basically, it does the same thing, but indicates you're handling a Promise, and not something else
conditionnal_proc = ->(toggle : Bool) do
if toggle
resolve "I received true! :)"
end
resolve false, "I received false... :("
end
puts await FiberPromise.new(conditionnal_proc, true) # I received true! :)
puts await FiberPromise.new(conditionnal_proc, false) # {false, "I received false... :("
reject
works the same way raise
works, so you can reject either a String or an Exception
promise = FiberPromise.new(->{
reject "Oh, no!"
})
value = await promise
puts value # Oh, no!
puts value.class # Exception
promise = FiberPromise.new(->{
reject Exception.new("Oh, no!")
})
value = await promise
puts value # Oh, no!
puts value.class # Exception
Promise callbacks are still under development! Don't use it for the moment, as it's still not stable.
Callbacks are pieces of code which will be executed AFTER your Promise, asynchronously. You can add a callback like this :
FiberPromise.new(->{
# do something
}).then(->{
# will be executed once the Promise is RESOLVED
})
.then is for adding callbacks on RESOlVE
You can use .catch for adding callbacks on REJECT
FiberPromise.new(->{
reject "oh no!"
}).catch((e : Exception)->{
puts "{e.message}"
})
You can have callbacks on RESOLVE and REJECT simultaneously, to execute code depending on the promise state.
promise = FiberPromise.new(->{
# Do something
})
promise.then(->{ "RESOLVED" })
promise.catch(->{ "REJECTED" })
Notice that adding a callback on a state will override the older one on the same state
With .finally, you can add callback which will be called in any case
FiberPromise.new(->{
# Do something
}).finally({
# Will be called either if the Promise was RESOLVED or REJECTED
})
Adding a callback return a new Promise object
It means you can chain callbacks, like this :
FiberPromise.new(->{
# Do something
}).then(->{
"RESOLVED"
}).catch(->{
"REJECTED"
}).finally(->{
"ANY CASE"
})
If, for example, an error is thrown, every THEN blocks will be skipped until a CATCH or FINALLY block is found
FiberPromise.new(->{
reject "Oh no!"
}).then(->{
"I won't be called if an error is thrown..."
}).catch(->{
"Gotcha!"
})
With this, you can totally control your asynchronous flow :) enjoy!
Async offer you different kinds of workers pool :
- FiberPool
- ThreadPool (In developpement)
- ProcessPool (Not implemented yet)
Each pool has the same api (see Documentation)
Here is an example using a FiberPool, but this works with any kind of pool!
Keep in mind that here, a worker is a crystal Fiber. With a ThreadPool, it would have been a Thread, and with a ProcessPool, an entire Process
pool = FiberPool.new(3) # Create and launch a pool with 3 workers
You can add jobs by multiple way :
- From a block, using do/end notation
pool.push(->do
puts "hello"
end)
- From a block, using bracket notation
pool.push(->(i : Int32) { puts i }, 12)
# Notice that arguments are givent after the Proc!
# If you forget one argument, your code won't compile
- From an existing function
def my_function(x : Int32, y : Int32)
sleep 1.seconds
puts x
sleep 3.seconds
puts y
"hello"
end
my_function_proc = ->my_function(Int32, Int32)
pool.push(my_function_proc, 1, 2)
# Or simply
pool.push(->my_function(Int32, Int32), 1, 2)
- You can add a job with the
wait_for
method too. It'll block the execution until a worker has picked and executed this job
fiber_pool.wait_for(->{
puts "Let's sleep a bit"
sleep 3.seconds
puts "I'm done with sleeping!"
})
# Execution will continue once "I'm done with sleeping!" have been displayed
Async give you some way to control your workers pool :
- The
wait
method, blocking the execution until every jobs have been executed
pool.wait
# Execution will continue once every jobs will be finished
- The
finish
method, blocking the execution until every jobs have been executed, and then kill all workers. Notice that you can't use the pool after this, as it's a way to destroy it
pool.finish
# Execution blocked until every jobs will be finished and workers killed
- The
stop
method. Once stop have been called, the pool will finish all currently executed jobs, when kill every workers. Notice that all the pending jobs will be lost! It's usefull when you want to stop the pool without executing all queues tasks. Stop is not a blocking call, but you can't use the pool after this, as it's a way to destroy it
pool.stop
# Execution not blocked, currently started jobs finishing in background, pending jobs lost, and fibers killed in background
- The
terminate
method, killing all workers instataneously, without finishing any job!
# Unfortunatly, not implemented for the moment... Sorry <3
- The
finalize
is used to destroy the pool : it will finish in the background every jobs, and kill workers. It's the equivalent of thefinish
method, but without stoping the execution. As it's a way to destroy the pool, you can't use it after this
pool.finalize
-
FiberPool
- Trying to take any form of procs as arguments (templated Job class)
- Simple generic pool with queued jobs
- Sleeping fibers, able to be awoken by the FiberPool
- Fully asynchronous functioning
- "Joining" fibers at FiberPool destruction. Offering a way to "kill" fibers manually
- Return value of fibers
- wait and wait_for methods (respectively, blocking call waiting for ALL jobs to finish, and a blocking call waiting for the job given as parameter to finish)
- documented api
-
ThreadPool
- Abstract class above FiberPool and ThreadPool, to make users able to substitute fibers by threads
- Thread class, an encapsulation of pthread_t
- Mutex class, an encapsulation of pthread_mutex_t
- ConditionVariable class, an encapsulation of pthread_cond_t
- SafeQueue, wrapping Dequeu and Mutex
- ThreadChannel, wrapping SafeQueue and ConditionVariable
- ThreadWorker, the worker launched in each threads at the ThreadPool creation! It is sleeping when no jobs are in the ThreadChannel, and awoken when some job is pushed in the ThreadPool
- Threads joining
- Threads return value
-
ProcessPool
- Roadmap to be defined!
-
FiberPromise
- Launching generic job in fiber at creation
- State and return value
- await blocking method implementation
- typed await
- error throwing
- .then and .catch
- chaining .then and .catch
- .finally
- arguments depending on the callback (possibility to take 0 arguments for a .catch, for example)
- adding multiples callbacks on the same event
- resolve and reject keywords
- documented code
- specs
-
ThreadPromise
- Roadmap to be defined!
-
ProcessPromise
- Roadmap to be defined!
- Fork it (https://github.com/LeChatErrant/async/fork)
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create a new Pull Request
- LeChatErrant - creator and maintainer