ReplicaMap is a replicated fault tolerant multi-master eventually consistent key-value embeddable in-memory database written in Java backed by the persistent Apache Kafka storage.
Think of it as a Java ConcurrentMap<K,V>
that replicates all the updates over Apache Kafka.
-
ReplicaMap is multi-master. All the replicas are equal and writable: easy way to share mutable state between multiple processes.
-
All the atomic operations like
putIfAbsent
orreplace
work as expected. -
Asynchronous operations
asyncPut
,asyncPutIfAbsent
,asyncReplace
and more are supported and returnCompletableFuture
. -
Optimized Compute: instead of heavy retry loops just send one-shot serializable closures using
compute
ormerge
methods. This is also useful when you apply a small update to a huge value to avoid sending the whole value over the network. -
Sharding: configure list of allowed Kafka partitions for each instance.
-
Kafka
Log Compaction
provides persistent backup of the replicated map contents. -
It is possible to wrap any thread-safe implementations of
Map<K,V>
with ReplicaMap. The most obvious choice isConcurrentHashMap<K,V>
orConcurrentSkipListMap<K,V>
, but any custom off-heap or on-disk or other implementations can be used as well. -
When
NavigableMap<K,V>
is needed, there isReplicaNavigableMap<K,V>
. -
Multiple maps per single data topic are supported.
-
If you already have a Kafka cluster, there is no need to deploy any new infrastructure, just add the ReplicaMap library to the application and you have a distributed database.
-
If you already have a compacted Kafka topic, you can start using it with ReplicaMap.
-
Kafka client is the only dependency of ReplicaMap.
-
Supports Kafka versions 1.x.x and 2.x.x
The project artifacts are hosted on jitpack.io, you need to add the repository to your project:
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>
And then add the dependency:
<dependency>
<groupId>com.vladykin</groupId>
<artifactId>replicamap</artifactId>
<version>...</version>
</dependency>
// Setup the configuration like in Kafka (see all the options in KReplicaMapManagerConfig).
Map<String,Object> cfg = new HashMap<>();
cfg.put(KReplicaMapManagerConfig.BOOTSTRAP_SERVERS, BOOTSTRAP_SERVER);
cfg.put(KReplicaMapManagerConfig.DATA_TOPIC, "flowers");
cfg.put(KReplicaMapManagerConfig.OPS_TOPIC, "flowers_ops");
cfg.put(KReplicaMapManagerConfig.FLUSH_TOPIC, "flowers_flush");
// Setup the serialization for map keys and values, by default String serialization is used.
cfg.put(KReplicaMapManagerConfig.KEY_SERIALIZER_CLASS, LongSerializer.class);
cfg.put(KReplicaMapManagerConfig.KEY_DESERIALIZER_CLASS, LongDeserializer.class);
// Create and start the KReplicaMapManager instance.
ReplicaMapManager replicaMapManager = new KReplicaMapManager(cfg);
// Here we use DELETE_DELAY_MS that we have used for the topics configuration
// and adjusted it by 3 minutes to mitigate the possible clock skew.
replicaMapManager.start(DELETE_DELAY_MS - 180_000, TimeUnit.MILLISECONDS);
// Get the ReplicaMap instance and use it as if it was a typical ConcurrentMap,
// but all the updates will be visible to all the other clients even after restarts.
ReplicaMap<Long, String> flowers = replicaMapManager.getMap();
flowers.putIfAbsent(1L, "Rose");
flowers.putIfAbsent(2L, "Magnolia");
flowers.putIfAbsent(3L, "Cactus");
System.out.println(new TreeMap<>(flowers));
All the keys and values must correctly implement hashCode
and equals
because ReplicaMap
heavily relies on that. The same for the map identifiers returned by MapsHolder.getMapId(K key)
.
All the equal keys (where equals
returns true
) must always get to the same Kafka partition.
For example, by default Kafka partitions keys by hashing their bytes produced by the Serializer
,
if you have some extra fields in the key and these fields do not participate in equals
but participate
in serialization, then the equal keys may be placed to different partitions. In that case the sequence
of updates for the key looses linearizability and the state may become inconsistent across the clients.
ReplicaMap is eventually consistent: beware of possible lags, it is hard to predict when all the replicas will receive a given update.
In a typical case you have more than one Kafka partition, thus it is possible to have your updates reordered if they belong to the different partitions. For example, if you synchronously update key A and then key B and these keys are not in the same partition, other clients may see the update for the key B earlier than for the key A. This is counter-intuitive behavior and it is inconsistent with the program order. If you need the program order to be preserved then make sure you either have only 1 partition per topic or have collocated the keys that need this property to the same partition.
When using Optimized Compute you must never update values inplace, always create a modified copy of the value instead. Otherwise incorrect state may be flushed and data will be broken. For the same reason you must never modify keys and values in a listener.
There are 3 Kafka topics participate in the protocol (see below about their configuration):
data
topic, a compacted topic where plain key-value records are stored (an existing topic can be used)ops
topic, an operation's log topic, where every update attempt is sent (analog to WAL in conventional databases)flush
topic, where flush requests are sent
All these topics must be partitioned the same way.
Each modification attempt requires sending only a single message to the ops
topic and reading
it back. Thus, the latency of any modification attempt is the latency of this round trip.
Background OpsWorker
threads poll incoming operations from the ops
topic and apply them
to the underlying map. Since all the updates arrive in the same order for the same key (for
the same partition to be exact) we have eventually consistent state across all the replicas.
Optimized Compute feature relies on this per-partition update linearizability. Instead of
a loop that retries calling ConcurrentMap.replace(K,V,V)
and sending ops
messages over
and over again for each attempt, we can serialize the function passed to the compute
method,
the function will be sent to all the replicas and it will be executed on each replica only once.
This may be much more efficient for operations like incrementing heavily contended counters
or applying small updates to huge values.
It is only needed to implement ComputeSerializer
and ComputeDeserializer
interfaces and
set them to a config. All the methods accepting BiFunction
support Optimized Compute.
Once in a while a lucky client that has issued the update with the offset of multiple of
flush.period.ops
will send a flush request to the flush
topic to initiate a data flush.
Obviously, the update issuer may die before sending a flush request, then the next guy
will flush more updates in his turn.
Background FlushWorker
threads poll the flush
topic and flush the updated key-value
pairs to the data
topic in a single Kafka transaction. Consumer group for flush
consumers
is configured to be the same for all the participants to have flush requests distributed
across the all clients. The transactional id of data
producer is one per partition
to fence the previous flushers in case of partition rebalancing. This is needed to have
"exactly once" semantics for data flush (avoid mixing together racy flushes).
To avoid flush reordering (when a slow client requests a flush for the data that was already
flushed time ago by another client and newer updates can be overwritten by the older ones)
FlushWorker
ignores the incoming out of order flush requests.
- All the topics (
data
,ops
andflush
) must be setup in a fault tolerant way:
min.insync.replicas=2
unclean.leader.election.enable=false
- The
data
topic must be compacted. With some other cleanup policy clients may end up in inconsistent state:
cleanup.policy=compact
- The
ops
andflush
topics can be cleaned:
cleanup.policy=delete
- The
ops
andflush
topics must have disabled time based retention, otherwise the logs may become corrupted if the updates were not frequent enough or if the application was down for some time:
retention.ms=-1
- For the
flush
topic retention of 1 GiB per partition must always be enough:
retention.bytes=1073741824
- For the
ops
topic safe retention size depends on many factors, the simplest way to calculate it for a partition:
(maxKeySize + 2 * maxValueSize + recordOverhead) * (flushPeriodOps + maxFlushLagOps) * flushFailures
Let's takerecordOverhead = 70B
(it may be lower, depending on how many records on average are batched by the producer),
maxKeySize = 16B
,maxValueSize = 1KiB
,maxFlushLagOps = flushPeriodOps = 5000
,flushFailures = 100
,
then we need to set retention size to(16B + 2 * 1KiB + 70B) * (5000 + 5000) * 100 = 2GiB
:
retention.bytes=2147483648
- Another important factor is not covered in the formula above: while a new client is loading data from the
data
topic the respective flush notification record inops
topic should not be evicted. Thus, we have to configureops
topic with delayed file deletion:
file.delete.delay.ms=7200000
BOOTSTRAP_SERVER="localhost:9092"
DATA_TOPIC="flowers"
OPS_TOPIC="${DATA_TOPIC}_ops"
FLUSH_TOPIC="${DATA_TOPIC}_flush"
REPLICATION=3
PARTITIONS=12
MIN_INSYNC_REPLICAS=2
DELETE_DELAY_MS=7200000
OPS_RETENTION_BYTES=2147483648
FLUSH_RETENTION_BYTES=1073741824
bin/kafka-topics.sh --create --topic $DATA_TOPIC --bootstrap-server $BOOTSTRAP_SERVER \
--replication-factor $REPLICATION --partitions $PARTITIONS \
--config min.insync.replicas=$MIN_INSYNC_REPLICAS \
--config unclean.leader.election.enable=false \
--config cleanup.policy=compact \
--config min.compaction.lag.ms=0
bin/kafka-topics.sh --create --topic $OPS_TOPIC --bootstrap-server $BOOTSTRAP_SERVER \
--replication-factor $REPLICATION --partitions $PARTITIONS \
--config min.insync.replicas=$MIN_INSYNC_REPLICAS \
--config unclean.leader.election.enable=false \
--config cleanup.policy=delete \
--config retention.ms=-1 \
--config retention.bytes=$OPS_RETENTION_BYTES \
--config file.delete.delay.ms=$DELETE_DELAY_MS
bin/kafka-topics.sh --create --topic $FLUSH_TOPIC --bootstrap-server $BOOTSTRAP_SERVER \
--replication-factor $REPLICATION --partitions $PARTITIONS \
--config min.insync.replicas=$MIN_INSYNC_REPLICAS \
--config unclean.leader.election.enable=false \
--config cleanup.policy=delete \
--config retention.ms=-1 \
--config retention.bytes=$FLUSH_RETENTION_BYTES \
--config file.delete.delay.ms=$DELETE_DELAY_MS
bin/kafka-topics.sh --list --bootstrap-server $BOOTSTRAP_SERVER | grep $DATA_TOPIC
-
Make sure other consumers of this data topic (if any) support
read_committed
isolation level. -
Create empty
ops
andflush
topics according to the instructions above. -
Stop all the updates to your existing
data
topic.
This step is very important, otherwise your data may be corrupted or lost or ReplicaMap may have inconsistent state across the clients!!! -
Alter the
data
topic configuration to meet the requirements above:ZOOKEEPER="localhost:2181" bin/kafka-configs.sh --zookeeper $ZOOKEEPER --entity-type topics --entity-name $DATA_TOPIC --alter --add-config \ min.insync.replicas=$MIN_INSYNC_REPLICAS,unclean.leader.election.enable=false,cleanup.policy=compact,min.compaction.lag.ms=0
-
Run the following command to initialize
ops
topic with the lastdata
topic offsets:java -cp slf4j-api-1.7.26.jar:kafka-clients-2.3.1.jar:replicamap-0.4.jar com.vladykin.replicamap.kafka.KReplicaMapTools \ initExisting $BOOTSTRAP_SERVER $DATA_TOPIC $OPS_TOPIC
it must print
OK: ...
-
PROFIT!! Start using ReplicaMap in your application, all the existing data will be loaded.