/redis-streams-java-dist

Java library for Redis Streams providing a simplified developer experience and increased scalability

OtherNOASSERTION

Redis Streams Java

Redis Streams Java is a Java library that provides a topic abstraction over Redis streams. Each topic is backed by one or more Redis streams. As you publish messages to a topic, a new stream is automatically created as soon as the current stream grows beyond a predefined size. This allows streams to scale linearly across the shards of your Redis cluster.

The library provides core functionality for writing to a distributed Redis Stream as well as reading from a distributed Redis Stream in Redis. This works in all Redis deployment scenarios:

  • A Pair of Redis Enterprise or Redis Cloud Active-Active Clusters

  • Redis Enterprise or Redis Cloud Instance.

  • Single Instance of Redis Stack

In fact the library takes special care to ensure that reading from a Consumer Group of a single topic across two Active-Active clusters is done so in a way that honors the ordering and pending entry list of the consumer group consistently.

Table of Contents

Requirements

Redis Streams Java requires a JedisPooled connection object. You may want to tune your JedisPooled instance for your own needs.

Quick start

Installation

To run Redis Streams Java in production, you’ll need one of the following deployments:

Redis Streams Java Connector

Next, you’ll need to install the Redis Streams Java plugin and configure it.

Redis installation

For a self-managed deployment, or for testing locally, install Redis Stack or spin up a free Redis Cloud instance. If you need a fully-managed, cloud-based deployment of Redis on AWS, GCP, or Azure, see all the Redis Cloud offerings. For deployment in your own private cloud or data center, consider Redis Enterprise.

Documentation

Add the library to your project

To install the library simply add the following:

Maven

Add the following to your pom.xml file:

pom.xml
<dependency>
    <groupId>com.redis</groupId>
    <artifactId>redis-streams-java</artifactId>
    <version>0.3.5</version>
</dependency>

Gradle

Add the following to your build.gradle file

build.gradle
dependencies {
    implementation 'com.redis:redis-streams-java:0.3.5'
}

Usage

Components

Generically, there are three primary components of the streams library

  1. TopicManager - Oversees the topics, tracks the status of consumers within the library.

  2. ConsumerGroup - Manages and coordinates consumption from topics.

  3. TopicProducer - Produces messages for the topic.

Jedis Connection

To connect any of these key components you will need a JedisPooled connection to Redis.

TopicManager

To initialize the TopicManager, simply pass in your Jedis Connection along with a topic manager configuration to createTopic.

JedisPooled jedis = new JedisPooled("redis://localhost:6379");
SerialTopicConfig config = new SerialTopicConfig("my-topic");
TopicManager topicManager = TopicManager.createTopic(jedis, config);

This will create the topic manager, as well as creating the items in Redis required to support the topic.

TopicProducer

The next step is to create a TopicProducer, to create one, simply pass in your Jedis Connection along with the topic name to the TopicProducer constructor.

Producer producer = new TopicProducer(jedis, "my-topic");

ConsumerGroup

The final item to create is a ConsumerGroup, the consumer group is responsible for coordinating consumption of the topic. To create a ConsumerGroup, pass your Jedis Connection along with the topic name and your consumer name into the ConsumerGroup constructor

ConsumerGroup consumerGroup = new ConsumerGroup(jedis, "my-topic", "test-group");

Producing messages

To add a message to the topic, simply pass a Map<String,String> into the TopicProducer

Map<String, String> msg = Map.of("temp", "81",  "humidity", "0.92", "city", "Satellite Beach");
producer.produce(msg);

Consuming messages

To consume messages, simply call consume on you consumer group, passing in your consumer name:

TopicEntry entry = consumerGroup.consume("my-consumer");

The message contains: 1. The message id (the monotonic id created by Redis when the message was produced) 2. The Stream the message was read from 3. The message itself

Acknowledge Messages

After you have consumed a message, you must then acknowledge it, to do so, simply call acknowledge passing in the AckMessage constructed from the TopicEntry received from consuming a message.

TopicEntry entry = consumerGroup.consume("test-consumer");
// Some extra processing
// ...
consumerGroup.acknowledge(new AckMessage(entry));

Get Pending Messages

If your application is unable to acknowledge the message (for example if the process died during processing), the messages remain in a pending state, you can acquire any pending messages using the TopicManager. Then you can acknowledge those messages using the consumerGroup:

List<PendingEntry> pendingEntryList = topicManager.getPendingEntries("my-group", query);
consumerGroup.acknowledge(new AckMessage(pendingEntryList.get(0)));

Checking Consumer Stats

If you want to keep an eye on what is going on with your topic, and the consumer groups within the topic, you can use the use the TopicManager’s `getConsumerGroupStats method:

ConsumerGroupStatus stats = topicManager.getConsumerGroupStatus("my-group");
System.out.printf("Consumer Group Name: %s%n", stats.getGroupName());
System.out.printf("Consumer Group Topic Size: %d%n", stats.getTopicEntryCount());
System.out.printf("Consumer Group Pending Entries: %d%n", stats.getPendingEntryCount());
System.out.printf("Consumer Group Lag: %d%n", stats.getConsumerLag());

NOACK Consumer Group

A NoAckConsumerGroup implementation exists which allows you to read from a stream in the context of a consumer group with no need to acknowledge any messages that you retrieved from the stream. This is useful when you want to ensure "exactly once" delivery semantics and are comfortable losing a message if something happens after the entry is delivered. To utilize this, just initialize the NoAckConsumerGroup, and consume as you would with the normal ConsumerGroup the key difference is that there is no need to acknowledge any

NoAckConsumerGroup noack = new NoAckConsumerGroup(jedis, "my-topic", "no-ack-group");
TopicEntry entry = noack.consume("my-consumer");
// your apps processing

Single Cluster PEL

There is also a "Single Cluster PEL" topic manager and consumer group. This implementation does not replicate the Pending Entries List (PEL) across Cluster in an Active-Active configuration, making it more performant than its standard counterpart for those Active Active deployments. The caveat is that your consumer group PEL will not be synchronized across clusters, so you will not be able to claim any entries dropped outside of the original region of consumption.

To read without replicating the PEL, simply initialize the SingleClusterPelConsumer group and use it as you would with any other consumer group:

SingleClusterPelConsumerGroup singleClusterPel = new SingleClusterPelConsumerGroup(jedis, "my-topic", "pel-group");
TopicEntry entry = singleClusterPel.consume("my-consumer");
// your apps processing
singleClusterPel.acknowledge(new AckMessage(entry));

Consumer Group Stats and Pending Entries with Single Cluster PEL

The method for gather Consumer group stats and getting pending entries is naturally different with the Single Cluster PEL implementation. You must therefore use a specialized SingleCLusterPelTopicManager to retrieve these e.g.:

SingleClusterPelTopicManager singleClusterPelTopicManager = new SingleClusterPelTopicManager(jedis, config);
PendingEntryQuery query = new PendingEntryQuery();
query.setCount(1);
List<PendingEntry> pendingEntriesSingleCLuster = singleClusterPelTopicManager.getPendingEntries("pel-group", query);
ConsumerGroupStatus consumerGroupStatsSingleCluster = singleClusterPelTopicManager.getConsumerGroupStatus("pel-group");

Support

Redis Streams Java is supported by Redis, Inc. for enterprise-tier customers as a 'Developer Tool' under the Redis Software Support Policy. For non enterprise-tier customers we supply support for Redis Streams Java on a good-faith basis. To report bugs, request features, or receive assistance, please file an issue.

License

Redis Streams Java is licensed under the Business Source License 1.1. Copyright © 2024 Redis, Inc. See LICENSE for details.