- Elixir >= 1.7
- OTP >= 21.0
You can configure path for the mnesia database at config/config.exs
, default
path is '.mnesia/#{Mix.env}/#{node()}'
There are two options how messages will be handled when you ack
them:
delete
- deletes message from the storemark
- marks messages with provided status
To configure this behaviour simply add
config :simple_queue, :queue,
mark_completed: true,
mark_with: :completed
Queue also supports different store which implements SQ.Store
behaviour
config :simple_queue, :queue,
store: MyStore
Default store is mnesia
- Clone the project,
git clone https://github.com/sysashi/simple_queue.git
- Inside project's root run
mix do deps.get, compile
- Create mnesia database
mix amnesia.create -d SQ.Mnesia.Database --disk
(otherwise messages will be stored in memory)
To run tests, simply execute mix test
from the root of the projects
Queue is a process which you can put under your supervision tree, e.g.
Supervisor.start_link(
[
{SQ.Queue, [name: MyQueue]}
],
strategy: :one_for_one,
name: MyApp.Supervisor
)
or use with something like DynamicSupervisor
Store should support atomic and isolated transactions. (default store, mnesia, does) to avoid race conditions.
Using single process for queue is not a great idea, in our case GenServer does not support backpressure (it does in theory, and that is processes mailbox), so realistically one would implement an interface similar to poolboy, distributing load across multiple processes (queues)
Since queue is gated behind a process which only processes synchronous calls, multiple consumers(processes) should not be a problem.
There is a simple interface SQ
to test program in the iex.
After you have created the database, execute iex -S mix run
to get into the repl.
iex> SQ.create! # creates a single named queue process
Available commands:
iex> SQ.add(message) # => :ok
iex> SQ.get() # => {message_id, message} | :empty
,iex> SQ.ack(message_id) # => :ok | {:error, :not_found}
iex> SQ.reject(message_id) # => :ok | {:error, :not_found}
iex> SQ.purge() # => resets queue of the process
iex> SQ.purge(table: true) # => same as above plus purges database table
Messages that have not been acknowledged will persists across node and process restarts
Rejected messages will be inserted at the rear of the queue and in the store again, given new id.