Concurrency and streams utilities.
This library embeds multiple modules for dealing with concurrency and Elixir streams. Including:
If available in Hex, the package can be installed
by adding concur
to your list of dependencies in mix.exs
:
def deps do
[
{:concur, "~> 0.1.0"}
]
end
The Concur.Semaphore
provides a full semaphores implementation for it to be
used by either concurrent or synchronous code. It contains the two semaphore
primitives: wait
and signal
. Here's an example:
alias Concur.Semaphore, as: Sem
sem = Sem.new(1)
# Process 1
Sem.wait() # => Proceeds to obtain the semaphore and continue executing
# Process 2
Sem.wait() # => Blocks until the semaphore is unblocked
# Process 1
Sem.signal() # => Unblocks semaphore. Process 2 continues executing
The Concur.BufferedStream
provides way to map enumerable values in advance up to a certain amount of values (buffer_size: 4
option).
These values can also be loaded asynchronously (with async?: true
option), meaning we don't have to wait for the previous value to be computed to calculate the next one.
alias Concur.BufferedStream, as: BS
[1, 2, 3]
|> BufferedStream.map(fn i ->
# really long function that will take time to compute
# which will be executed in advance for up to 10 values
end, buffer_size: 10)
|> Enum.map(&IO.inspect/1)
[1, 2, 3]
|> BufferedStream.map(fn i ->
# ...
end, buffer_size: 10, async?: true)
The Concur.Stateful.Queue
module allows for a stateful FIFO behavior.
alias Concur.Stateful.Queue
{:ok, queue} = Queue.start()
Queue.push(queue, 1)
Queue.push(queue, 2)
Queue.push(queue, 3)
Queue.pop(queue) # => {:value, 1}
Queue.pop(queue) # => {:value, 2}
Queue.pop(queue) # => {:value, 3}
Queue.pop(queue) # => :empty
See LICENSE