GenStage is a specification for exchanging events between producers and consumers.
This project currently provides the following functionality:
-
GenStage
(docs) - a behaviour for implementing producer and consumer stages -
ConsumerSupervisor
(docs) - a supervisor designed for consuming events from GenStage and starting a child process per event
You may also be interested in two other projects built on top of GenStage:
-
Flow for building computational flows using map-reduce, partitions, windows, and more that run concurrently. See the documentation for Flow or José Valim's keynote at ElixirConf 2016 introducing the main concepts behind GenStage and Flow
-
Broadway for building concurrent and multi-stage data ingestion and data processing pipelines to consume events from Amazon SQS, RabbitMQ, and others. See Broadway's documentation or José Valim's introduction to Broadway
Examples for using GenStage and ConsumerSupervisor can be found in the examples directory:
-
ProducerConsumer - a simple example of setting up a pipeline of
A -> B -> C
stages and having events flowing through it -
ConsumerSupervisor - an example of how to use one or more
ConsumerSupervisor
as a consumer to a producer that works as a counter -
GenEvent - an example of how to use
GenStage
to implement an alternative toGenEvent
that leverages concurrency and provides more flexibility regarding buffer size and back-pressure -
RateLimiter - an example of performing rate limiting in a GenStage pipeline
GenStage requires Elixir v1.5. Just add :gen_stage
to your list of dependencies in mix.exs:
def deps do
[{:gen_stage, "~> 1.0"}]
end
Same as Elixir.