ZQ – is a multi-producer-single-consumer durable queue based on the PostgreSQL. All operations are performed by calling stored procedures, so it is a reasonable choice if you already have a PostgreSQL in your zoo and need a simple queue solution.
Inspired by pgq.
In nutshell work with queue (after it was created) is as simple as:
- Enqueue data
- Open batch of desired size
- Consume data
- Close or cancel batch
- If batch was closed (for instance in case of successful data processing):
- data will be removed from queue
- next batch will be started from item followed by last item of closed batch
- If batch was canceled (like in case of temporary impediments of data processing):
- data stays in the queue
- next batch will be started from same item
- If batch was closed (for instance in case of successful data processing):
-
Create queue:
zq.create_queue(queue_name TEXT) : BOOLEAN
- Tries to create a queue "queue_name"
- Returns
true
if queue was successfully created - Returns
false
if queue is already exists
-
Delete queue:
zq.drop_queue(queue_name TEXT) : BOOLEAN
- Tries to delete queue "queue_name"
- Returns
true
if queue was successfully deleted - Returns
false
if queue is not exists
-
Enqueue data:
zq.enqueue(queue_name TEXT, data TEXT[]) : VOID
- Inserts items from data array as events in the queue 'queue_name' preserving order
- Throws an exception if given queue is not exists
-
Consume data:
zq.dequeue(queue_name TEXT) : TABLE
- Returns content of the current batch for the queue "queue_name" a table with
following columns:
"timestamp" TIMESTAMP
"data" TEXT
- Throws an exception if given queue is not exists
- Throws an exception if there is batch was not created for given queue
- Returns content of the current batch for the queue "queue_name" a table with
following columns:
-
Create new batch:
zq.open_batch(queue_name TEXT, max_batch_size INT) : INT
- Creates new batch of maximum size "max_batch_size" for the queue "queue_name", if there are new events
- Returns actual size of the newly created batch (<="max_batch_size") or 0 if no new events are available and batch was not created
- Throws an exception if given queue is not exists
- Throws an exception if "max_batch_size" is a non-positive integer
- Throws an exception if non-committed batch for given queue is already exists
-
Commit current batch:
zq.close_batch(queue_name TEXT) : VOID
- Commits previously created batch for the queue "queue_name"
- Throws an exception if given queue is not exists
- Throws an exception if there is no batch was created for given queue
-
Cancel current batch:
zq.cancel_batch(queue_name TEXT) : VOID
- Cancel previously created batch for the queue "queue_name"
- Throws an exception if given queue is not exists
- Throws an exception if there is no batch was created for given queue