/durable-queue

a disk-backed queue for clojure

Primary LanguageClojureEclipse Public License 1.0EPL-1.0

Clojars Project cljdoc badge CircleCI

This library implements a disk-backed task queue, allowing for queues that can survive processes dying, and whose size is bounded by available disk rather than memory. It is a small, purely-Clojure implementation focused entirely on the in-process use case, meaning that it is both simpler and more easily embedded than network-aware queue implementations such as Kafka and ActiveMQ.

usage

[factual/durable-queue "0.1.5"]

To interact with queues, first create a queues object by specifying a directory in the filesystem and an options map:

> (require '[durable-queue :refer :all])
nil
> (def q (queues "/tmp" {}))
#'q

This allows us to put! and take! tasks from named queues. take! is a blocking read, and will only return once a task is available or, if a timeout is defined (in milliseconds), once the timeout elapses:

> (take! q :foo 10 :timed-out!)
:timed-out!
> (put! q :foo "a task")
true
> (take! q :foo)
< :in-progress | "a task" >
> (deref *1)
"a task"

Notice that the task has a value describing its progress, and a value describing the task itself. We can get the task descriptor by dereferencing the returned task. Note that since the task is persisted to disk and anything on disk may be corrupted, this involves a checksum which may fail and throw an IOException. Any software which wants to be robust to all failure modes should always dereference within a try/catch clause.

Calling take! removed the task from the queue, but just because we've taken the task doesn't mean we've completed the action associated with it. In order to make sure the task isn't retried on restart, we must mark it as complete!.

> (put! q :foo "another task")
true
> (take! q :foo)
< :in-progress | "another task" >
> (complete! *1)
true

If our task fails and we want to re-enqueue it to be tried again, we can instead call (retry! task). Tasks which are marked for retry are added to the end of the current queue.

To get a description of the current state of the queue, we can use stats, which returns a map of queue names onto various counts:

> (stats q)
{:enqueued 2,
 :retried 0,
 :completed 1,
 :in-progress 1,
 :num-slabs 1,
 :num-active-slabs 1}
field description
:enqueued the number of tasks which have been enqueued via put!, including any pre-existing tasks on-disk when the queues were initialized
:retried the number of tasks which have been retried via retry!
:completed the number of tasks which have been completed via complete!
:in-progress the number of tasks which have been consumed via take!, but are not yet complete
:num-slabs the number of underlying files which are being used to store tasks
:num-active-slabs the number of underlying files which are currently open and mapped into memory

pod usage

Build pod:

bb pod

In the project root is a durable-queue which uses the pod uberjar and could be use as pod executable.

In a babashka script:

(require '[babashka.pods :as pods])

(pods/load-pod "./durable-queue")

(require '[pod.flexiana.durable-queue :as dq])

(def qp "./tmp/queue")

(dq/stats qp)

(dq/take! qp :ex 10 :timeout)

(dq/put! qp :ex {:a 1 :b 2})

Differences between the functions in the clojure library and those in the pod of the same name:

  • the first parameter is not the queue object but the path where the queue object is saved
  • the take! function removes the object from the queue (involves running the complete! function)

configuring the queues

queues can be given a number of different options, which can affect its performance and correctness.

By default, it is assumed all tasks are idempotent. This is necessary, since the process can die at any time and leave an in-progress task in an undefined state. If your tasks are not idempotent, a :complete? predicate can be defined which, on instantiation of the queues object, will scan through all pre-existing task descriptors and remove those for which the predicate returns true.

A complete list of options is as follows:

name description
:complete? a predicate for identifying already completed tasks, defaults to always returning false
:max-queue-size the maximum number of elements that can be in the queue before put! blocks, defaults to Integer/MAX_VALUE
:slab-size The size, in bytes, of the backing files for the queue. The size of a serialized task cannot be larger than this size, defaults to 64mb.
:fsync-put? Whether an fsync should be performed for each put!. Defaults to true.
:fsync-take? Whether an fsync should be performed for each take!. Defaults to false.
:fsync-threshold The maximum number of writes (puts, takes, retries, completes) that can be performed before an fsync is performed.
:fsync-interval The maximum amount of time, in milliseconds, that can elapse before an fsync is performed.

Disabling :fsync-put? will risk losing tasks if a process dies. Disabling :fsync-take? increases the chance of a task being re-run when a process dies. Disabling both will increase throughput of the queue by at least an order of magnitude (in the default configuration, ~1.5k tasks/sec on rotating disks and ~6k tasks/sec on SSD, with fsync completely disabled ~100k tasks/sec independent of hardware).

Writes can be batched using fsync-threshold and/or fsync-interval, or by explicitly calling (durable-queue/fsync q). Setting the fsync-threshold to 10 will allow for ~25k tasks/sec on SSD, and still enforces a small upper boundary on how much data can be lost when the process dies. An exception will be thrown if both per-task and batch sync options are set.

license

Copyright © 2015 Factual Inc

Distributed under the Eclipse Public License 1.0