- Elixir >= 1.7
- OTP >= 21.0
You can configure path for the mnesia database at config/config.exs, default
path is '.mnesia/#{Mix.env}/#{node()}'
There are two options how messages will be handled when you ack them:
delete- deletes message from the storemark- marks messages with provided status
To configure this behaviour simply add
config :simple_queue, :queue,
mark_completed: true,
mark_with: :completed
Queue also supports different store which implements SQ.Store behaviour
config :simple_queue, :queue,
store: MyStore
Default store is mnesia
- Clone the project,
git clone https://github.com/sysashi/simple_queue.git - Inside project's root run
mix do deps.get, compile - Create mnesia database
mix amnesia.create -d SQ.Mnesia.Database --disk(otherwise messages will be stored in memory)
To run tests, simply execute mix test from the root of the projects
Queue is a process which you can put under your supervision tree, e.g.
Supervisor.start_link(
[
{SQ.Queue, [name: MyQueue]}
],
strategy: :one_for_one,
name: MyApp.Supervisor
)
or use with something like DynamicSupervisor
Store should support atomic and isolated transactions. (default store, mnesia, does) to avoid race conditions.
Using single process for queue is not a great idea, in our case GenServer does not support backpressure (it does in theory, and that is processes mailbox), so realistically one would implement an interface similar to poolboy, distributing load across multiple processes (queues)
Since queue is gated behind a process which only processes synchronous calls, multiple consumers(processes) should not be a problem.
There is a simple interface SQ to test program in the iex.
After you have created the database, execute iex -S mix run to get into the repl.
iex> SQ.create! # creates a single named queue process
Available commands:
iex> SQ.add(message) # => :okiex> SQ.get() # => {message_id, message} | :empty,iex> SQ.ack(message_id) # => :ok | {:error, :not_found}iex> SQ.reject(message_id) # => :ok | {:error, :not_found}iex> SQ.purge() # => resets queue of the processiex> SQ.purge(table: true) # => same as above plus purges database table
Messages that have not been acknowledged will persists across node and process restarts
Rejected messages will be inserted at the rear of the queue and in the store again, given new id.