DDTH's libary to interact with various queue implementations.
Project home: https://github.com/DDTH/ddth-queue
I work with queues from projects to projects. However, different projects are fit with different queues. I need a unified and simple API set to interact with various queue backend systems, also extra functionalities such as commit queue item when done or re-queue the item if needed, or find orphan queue items (items that have not been committed for a long period). Hence this library is born to fulfill my need.
Latest release version: 0.5.1
. See RELEASE-NOTES.md.
Maven dependency: if only a sub-set of ddth-queue
functionality is used, choose the
corresponding dependency artifact(s) to reduce the number of unused jar files.
ddth-queue-core
: ddth-queue interfaces and in-memory (using java.util.Queue
) implementations:
<dependency>
<groupId>com.github.ddth</groupId>
<artifactId>ddth-queue-core</artifactId>
<version>0.5.1</version>
</dependency>
ddth-queue-disruptor
: include ddth-queue-core
and LMAX Disruptor dependencies:
<dependency>
<groupId>com.github.ddth</groupId>
<artifactId>ddth-queue-disruptor</artifactId>
<version>0.5.1</version>
<type>pom</type>
</dependency>
ddth-queue-jdbc
: include ddth-queue-core
and ddth-dao-jdbc
dependencies:
<dependency>
<groupId>com.github.ddth</groupId>
<artifactId>ddth-queue-jdbc</artifactId>
<version>0.5.1</version>
<type>pom</type>
</dependency>
ddth-queue-kafka
: include ddth-queue-core
and ddth-kafka
dependencies:
<dependency>
<groupId>com.github.ddth</groupId>
<artifactId>ddth-queue-kafka</artifactId>
<version>0.5.1</version>
<type>pom</type>
</dependency>
ddth-queue-redis
: include ddth-queue-core
and Jedis
dependencies:
<dependency>
<groupId>com.github.ddth</groupId>
<artifactId>ddth-queue-jedis</artifactId>
<version>0.5.1</version>
<type>pom</type>
</dependency>
ddth-queue-rocksdb
: include ddth-queue-core
and RocksDB
dependencies:
<dependency>
<groupId>com.github.ddth</groupId>
<artifactId>ddth-queue-rocskdb</artifactId>
<version>0.5.1</version>
<type>pom</type>
</dependency>
ddth-queue
provides a unified and simple APIs to interact with various queue implementations:
- Put an item to queue: queue or re-queue.
- Take an item from queue.
- Retrive list of orphan items.
- Call
IQueue.take()
to take a message from queue. - Do something with the message.
- When done, call
IQueue.finish(msg)
- If not done and the message need to be re-queued, either call
IQueue.requeue(msg)
orIQueue.requeueSilent(msg)
to put back the message to queue.
- When done, call
Queue implementation has 2 message storages:
- Queue storage: (required) main storage where messages are put into and taken from. Queue storage is FIFO.
- Ephemeral storage: (optional) messages taken from queue storage are temporarily store in a ephemeral until finished or re-queued.
(Queue implementation is required to provide Queue storage. Ephemeral storage is optional)
When IQueue.take()
is called, the message is put in a ephemeral storage.
When either IQueue.finish(msg)
or IQueue.requeue(msg)
or IQueue.requeueSilent(msg)
is called,
the message is removed from the ephemeral storage.
The idea of the ephemeral storage is to make sure messages are not lost in the case the application
crashes in between IQueue.take()
and IQueue.finish(msg)
(or IQueue.requeue(msg)
, or IQueue.requeueSilent(msg)
).
If the application crashes in between IQueue.take()
and IQueue.finish(msg)
(or IQueue.requeue(msg)
, or IQueue.requeueSilent(msg)
)
there could be orphan messages left in the ephemeral storage. To deal with orphan messages:
- Call
Collection<IQueueMessage> getOrphanMessages(long thresholdTimestampMs)
to get all orphan messages that were queued beforethresholdTimestampMs
. - Call
IQueue.finish(msg)
to completely clear the orphan message from the ephemeral storage, or - Call
IQueue.requeue(msg)
, orIQueue.requeueSilent(msg)
to move the message back to the queue.
boolean IQueue.queue(IQueueMessage)
: Put a message to queue storage.
boolean requeue(IQueueMessage)
: Re-queue a taken message. Queue implementation must remove the message instance in the ephemeral storage (if any). Once re-queued, message's timestamp and number of re-queue times are updated.
boolean requeueSilent(IQueueMessage)
: Similar to API requeue
but message's timestamp and number of re-queue times are not updated.
IQueueMessage take()
: Take a message from queue.
Collection<IQueueMessage> getOrphanMessages(long)
: Gets all orphan messages (messages that were left in ephemeral storage for a long time).
finish(IQueueMessage)
: Called to clean-up message from ephemeral storage.
int queueSize()
: Gets queue's number of items.
int ephemeralSize()
: Gets ephemeral-storage's number of items.
Implementation | Bounded Size | Persistent | Ephemeral Storage | Multi-Clients |
---|---|---|---|---|
In-memory | Optional | No | Yes | No |
JDBC | No | Yes | Yes | Yes |
Kafka | No | Yes | No | Yes |
Redis | No | Yes (1) | Yes | Yes |
RocksDB | No | Yes | Yes | No |
- Bounded Size: queue's size is bounded.
- Currently only in-memory queue(s) support bounded queue size
- Databases (JDBC), Kafka, Redis and RocksDB queues are virtually limited only by hardware's capability
- Persistent: queue's items are persistent between JVM restarts.
- Redis backend: persistency depends on Redis server's configurations
- Ephemeral Storage: supports retrieval of orphan messages.
- Multi-Clients: multi-clients can share a same queue backend storage.
There are 2 implementations:
- InmemQueue.java:
uses a
java.util.Queue
as Queue storage and ajava.util.concurrent.ConcurrentMap
as Ephemeral storage. - DisruptorQueue.java:
use LMAX Disruptor as Queue storage
and a
java.util.concurrent.ConcurrentMap
as Ephemeral storage.
Messages in in-memory queues are not persistent between JVM restarts!
Queue messages are stored in RDBMS tables.
See JdbcQueue.java.
Queue messages are persistent.
This queue implementation utilizes Apache Kafka as queue storage.
Ephemeral storage is currently not supported!
Queue messages are persistent.
See KafkaQueue.java.
Queue storage and Ephemeral storage are implemented as a Redis hash and sorted set respectively. Also, a Redis list is used as a queue of message-ids.
Queue messages are persistent (depends on Redis server's configurations).
See RedisQueue.java.
Queue messages are stored in RocskDB.
Ephemeral storage is currently not supported!
Queue messages are persistent.
See RocksDbQueue.java.
com.github.ddth.queue.impl.universal.*
: universal queue implementations, where queue's message's id is a64-bit long
.com.github.ddth.queue.impl.universal2.*
: universal queue implementations, where queue's message's id is aString
.
Universal queue message implementation, with the following fields:
queue_id
: message's unique idcom.github.ddth.queue.impl.universal.UniversalQueueMessage
: queue-id is a 64-bitlong
com.github.ddth.queue.impl.universal2.UniversalQueueMessage
: queue-id is aString
org_timestamp
(java.util.Date
): timestamp when the message was first-queuedtimestamp
(java.util.Date
): message's last-queued timestampnum_requeues
(int
): number of times the message has been re-queuedcontent
(byte[]
): message's content
Universal in-memory queue implementation that uses java.util.Queue
as Queue storage,
and java.util.concurrent.ConcurrentMap
as Ephemeral storage.
universal.UniversalInmemQueue
to work with universal.UniversalQueueMessage
, and
universal2.UniversalInmemQueue
to work with universal2.UniversalQueueMessage
.
Universal in-memory queue implementation that uses LMAX Disruptor as Queue storage,
and java.util.concurrent.ConcurrentMap
as Ephemeral storage.
universal.UniversalDisruptorQueue
to work with universal.UniversalQueueMessage
, and
universal2.UniversalDisruptorQueue
to work with universal2.UniversalQueueMessage
.
Universal JDBC queue implementation:
- 2 db tables for queue and ephemeral storages
universal.UniversalRedisQueue
to work withuniversal.UniversalQueueMessage
, anduniversal2.UniversalRedisQueue
to work withuniversal2.UniversalQueueMessage
- Property
ephemeralDisabled
(defaultfalse
): when set totrue
ephemeral storage is disabled - Property
fifo
(defaulttrue
): when set totrue
messages are taken in FIFO manner
Sample table schema for MySQL: see sample_schema.mysql.sql.
Sample table schema for PgSQL: see sample_schema.pgsql.sql.
Similar to UniversalJdbcQueue
, but using a less-locking algorithm - specific for MySQL, and needs
only one single db table for both queue and ephemeral storages.
- Optimized for MySQL (EXPERIMENTAL!)
- 1 single db table for both queue and ephemeral storages
- Work with
UniversalQueueMessage
- Property
fifo
(defaulttrue
): when set totrue
messages are taken in FIFO manner
Sample table schema for MySQL: see sample_schema-less-locking.mysql.sql.
Similar to UniversalJdbcQueue
, but using a less-locking algorithm - specific for PostgreSQL, and needs
only one single db table for both queue and ephemeral storages.
- Optimized for PostgreSQL (EXPERIMENTAL!)
- 1 single db table for both queue and ephemeral storages
- Work with
UniversalQueueMessage
- Property
fifo
(defaulttrue
): when set totrue
messages are taken in FIFO manner
Sample table schema for MySQL: see sample_schema-less-locking.pgsql.sql.
Universal queue implementation that uses Apache Kafka as queue backend.
- Ephemeral storage is currently not supported.
universal.UniversalKafkaQueue
to work withuniversal.UniversalQueueMessage
, anduniversal2.UniversalKafkaQueue
to work withuniversal2.UniversalQueueMessage
.
Universal Redis queue implementation.
universal.UniversalRedisQueue
to work with universal.UniversalQueueMessage
, and
universal2.UniversalRedisQueue
to work with universal2.UniversalQueueMessage
.
Universal queue implementation that uses RocksDB to store queue messages.
- Ephemeral storage is currently not supported.
universal.UniversalRocksDbQueue
to work withuniversal.UniversalRocksDbQueue
, anduniversal2.UniversalKafkaQueue
to work withuniversal2.UniversalQueueMessage
.
See LICENSE.txt for details. Copyright (c) 2015-2016 Thanh Ba Nguyen.
Third party libraries are distributed under their own licenses.