/redis-replicator

Redis replication tool. support sync, psync, psync2. can parse rdb, aof, mixed rdb and aof files. support redis-7.0

Primary LanguageJavaApache License 2.0Apache-2.0

Table of Contents(中文说明)

1. Redis-replicator

Buy Me A Coffee

1.1. Brief introduction

Java CI with Maven Coverage Status Maven Central Javadocs Hex.pm LICENSE

Redis Replicator implement Redis Replication protocol written in java. It can parse, filter, broadcast the RDB and AOF events in a real time manner. It also can synchronize redis data to your local cache or to database. The following I mentioned Command which means Writable Command(e.g. set,hmset) in Redis and excludes the Readable Command(e.g. get,hmget), Supported redis-6.2 and former redis versions.

1.2. Chat with author

Join the chat at https://gitter.im/leonchen83/redis-replicator

1.3. Contract the author

chen.bao.yi@gmail.com

2. Install

2.1. Requirements

compile minimal jdk 9+
runtime minimal jdk 8+
maven-3.3.1+
redis 2.6 - 7.0

2.2. Maven dependency

    <dependency>
        <groupId>com.moilioncircle</groupId>
        <artifactId>redis-replicator</artifactId>
        <version>3.7.0</version>
    </dependency>

2.3. Install from source code

    step 1: install jdk-9.0.x for compile(or jdk-11.0.x)
    step 2: git clone https://github.com/leonchen83/redis-replicator.git
    step 3: $cd ./redis-replicator 
    step 4: $mvn clean install package -DskipTests

2.4. Select a version

redis version redis-replicator version
[2.6, 7.0.x] [3.6.4, ]
[2.6, 7.0.x-RC2] [3.6.2, 3.6.3]
[2.6, 7.0.0-RC1] [3.6.0, 3.6.1]
[2.6, 6.2.x] [3.5.2, 3.5.5]
[2.6, 6.2.0-RC1] [3.5.0, 3.5.1]
[2.6, 6.0.x] [3.4.0, 3.4.4]
[2.6, 5.0.x] [2.6.1, 3.3.3]
[2.6, 4.0.x] [2.3.0, 2.5.0]
[2.6, 4.0-RC3] [2.1.0, 2.2.0]
[2.6, 3.2.x] [1.0.18](not supported)

3. Simple usage

3.1. Usage

        Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
        replicator.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                if (event instanceof KeyStringValueString) {
                    KeyStringValueString kv = (KeyStringValueString) event;
                    System.out.println(new String(kv.getKey()));
                    System.out.println(new String(kv.getValue()));
                } else {
                    ....
                }
            }
        });
        replicator.open();

3.2. Backup remote rdb snapshot

See RdbBackupExample.java

3.3. Backup remote commands

See CommandBackupExample.java

3.4. Convert rdb to dump format

We can use DumpRdbVisitor to convert rdb to redis DUMP format.

        Replicator r = new RedisReplicator("redis:///path/to/dump.rdb");
        r.setRdbVisitor(new DumpRdbVisitor(r));
        r.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                if (!(event instanceof DumpKeyValuePair)) return;
                DumpKeyValuePair dkv = (DumpKeyValuePair) event;
                byte[] serialized = dkv.getValue();
                // we can use redis RESTORE command to migrate this serialized value to another redis.
            }
        });
        r.open();

3.5. Rdb check

We can use SkipRdbVisitor to check rdb's correctness.

        Replicator r = new RedisReplicator("redis:///path/to/dump.rdb");
        r.setRdbVisitor(new SkipRdbVisitor(r));
        r.open();

3.6. Scan and PSYNC

By default, redis-replicator uses PSYNC to pretend as slave to receives commands. examples like following

        Replicator r = new RedisReplicator("redis://127.0.0.1:6379");
        r.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                System.out.println(event);
            }
        });
        
        r.open();

However, under some cloud services, the PSYNC command is prohibited, so we use the Scan command instead of the PSYNC command

        Replicator r = new RedisReplicator("redis://127.0.0.1:6379?enableScan=yes&scanStep=256");
        r.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                System.out.println(event);
            }
        });
        
        r.open();

3.7. Other examples

See examples

4. Advanced topics

4.1. Command extension

4.1.1. Write a command

    @CommandSpec(command = "APPEND")
    public static class YourAppendCommand extends AbstractCommand {
        private final String key;
        private final String value;
    
        public YourAppendCommand(String key, String value) {
            this.key = key;
            this.value = value;
        }
        
        public String getKey() {
            return key;
        }
        
        public String getValue() {
            return value;
        }
    }

4.1.2. Write a command parser

    public class YourAppendParser implements CommandParser<YourAppendCommand> {

        @Override
        public YourAppendCommand parse(Object[] command) {
            return new YourAppendCommand(new String((byte[]) command[1], UTF_8), new String((byte[]) command[2], UTF_8));
        }
    }

4.1.3. Register this parser

    Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
    replicator.addCommandParser(CommandName.name("APPEND"),new YourAppendParser());

4.1.4. Handle command event

    replicator.addEventListener(new EventListener() {
        @Override
        public void onEvent(Replicator replicator, Event event) {
            if(event instanceof YourAppendCommand){
                YourAppendCommand appendCommand = (YourAppendCommand)event;
                // your code goes here
            }
        }
    });

4.1.5. Put them together

See CommandExtensionExample.java

4.2. Module extension

4.2.1. Compile redis test modules

    $cd /path/to/redis-4.0-rc2/src/modules
    $make

4.2.2. Open comment in redis.conf

    loadmodule /path/to/redis-4.0-rc2/src/modules/hellotype.so

4.2.3. Write a module parser

    public class HelloTypeModuleParser implements ModuleParser<HelloTypeModule> {

        @Override
        public HelloTypeModule parse(RedisInputStream in, int version) throws IOException {
            DefaultRdbModuleParser parser = new DefaultRdbModuleParser(in);
            int elements = parser.loadUnsigned(version).intValue();
            long[] ary = new long[elements];
            int i = 0;
            while (elements-- > 0) {
                ary[i++] = parser.loadSigned(version);
            }
            return new HelloTypeModule(ary);
        }
    }

    public class HelloTypeModule implements Module {
        private final long[] value;

        public HelloTypeModule(long[] value) {
            this.value = value;
        }

        public long[] getValue() {
            return value;
        }
    }

4.2.4. Write a command parser

    public class HelloTypeParser implements CommandParser<HelloTypeCommand> {
        @Override
        public HelloTypeCommand parse(Object[] command) {
            String key = new String((byte[]) command[1], Constants.UTF_8);
            long value = Long.parseLong(new String((byte[]) command[2], Constants.UTF_8));
            return new HelloTypeCommand(key, value);
        }
    }
    
    @CommandSpec(command = "hellotype.insert")
    public class HelloTypeCommand extends AbstractCommand {
        private final String key;
        private final long value;

        public long getValue() {
            return value;
        }

        public String getKey() {
            return key;
        }

        public HelloTypeCommand(String key, long value) {
            this.key = key;
            this.value = value;
        }
    }

4.2.5. Register this module parser and command parser and handle event

    public static void main(String[] args) throws IOException {
        Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
        replicator.addCommandParser(CommandName.name("hellotype.insert"), new HelloTypeParser());
        replicator.addModuleParser("hellotype", 0, new HelloTypeModuleParser());
        replicator.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                if (event instanceof KeyStringValueModule) {
                    System.out.println(event);
                }
                
                if (event instanceof HelloTypeCommand) {
                    System.out.println(event);
                }
            }
        });
        replicator.open();
    }

4.2.6. Put them together

See ModuleExtensionExample.java

4.3. Stream

Since Redis 5.0+, Redis add a new data structure STREAM. Redis-replicator parse the STREAM like the following:

        Replicator r = new RedisReplicator("redis://127.0.0.1:6379");
        r.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                if (event instanceof KeyStringValueStream) {
                    KeyStringValueStream kv = (KeyStringValueStream)event;
                    // key
                    String key = kv.getKey();
                    
                    // stream
                    Stream stream = kv.getValueAsStream();
                    // last stream id
                    stream.getLastId();
                    
                    // entries
                    NavigableMap<Stream.ID, Stream.Entry> entries = stream.getEntries();
                    
                    // optional : group
                    for (Stream.Group group : stream.getGroups()) {
                        // group PEL(pending entries list)
                        NavigableMap<Stream.ID, Stream.Nack> gpel = group.getPendingEntries();
                        
                        // consumer
                        for (Stream.Consumer consumer : group.getConsumers()) {
                            // consumer PEL(pending entries list)
                            NavigableMap<Stream.ID, Stream.Nack> cpel = consumer.getPendingEntries();
                        }
                    }
                }
            }
        });
        r.open();

4.4. Write your own rdb parser

  • Write YourRdbVisitor extends RdbVisitor
  • Register your RdbVisitor to Replicator using setRdbVisitor method.

4.5. Redis URI

Before redis-replicator-2.4.0, We construct RedisReplicator like the following:

Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/dump.rdb", FileType.RDB, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/appendonly.aof", FileType.AOF, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/appendonly.aof", FileType.MIXED, Configuration.defaultSetting());

After redis-replicator-2.4.0, We introduced a new concept(Redis URI) which simplify the construction process of RedisReplicator.

Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb");
Replicator replicator = new RedisReplicator("redis:///path/to/appendonly.aof");

// configuration setting example
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379?authPassword=foobared&readTimeout=10000&ssl=yes");
Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb?rateLimit=1000000");
Replicator replicator = new RedisReplicator("rediss://user:pass@127.0.0.1:6379?rateLimit=1000000");

5. Other topics

5.1. Built-in command parser

commands commands commands commands commands commands
PING APPEND SET SETEX MSET DEL
SADD HMSET HSET LSET EXPIRE EXPIREAT
GETSET HSETNX MSETNX PSETEX SETNX SETRANGE
HDEL UNLINK SREM LPOP LPUSH LPUSHX
LRem RPOP RPUSH RPUSHX ZREM ZINTERSTORE
INCR DECR INCRBY PERSIST SELECT FLUSHALL
FLUSHDB HINCRBY ZINCRBY MOVE SMOVE BRPOPLPUSH
PFCOUNT PFMERGE SDIFFSTORE RENAMENX PEXPIREAT SINTERSTORE
ZADD BITFIELD SUNIONSTORE RESTORE LINSERT ZREMRANGEBYLEX
GEOADD PEXPIRE ZUNIONSTORE EVAL SCRIPT ZREMRANGEBYRANK
PUBLISH BITOP SETBIT SWAPDB PFADD ZREMRANGEBYSCORE
RENAME MULTI EXEC LTRIM RPOPLPUSH SORT
EVALSHA ZPOPMAX ZPOPMIN XACK XADD XCLAIM
XDEL XGROUP XTRIM XSETID COPY LMOVE
BLMOVE ZDIFFSTORE GEOSEARCHSTORE FUNCTION SPUBLISH

5.2. EOFException

  • Adjust redis server setting like the following. more details please refer to redis.conf
    client-output-buffer-limit slave 0 0 0

WARNNING: this setting may run out of memory of redis server in some cases.

5.3. Trace event log

  • Set log level to debug
  • If you are using log4j2, add logger like the following:
    <Logger name="com.moilioncircle" level="debug">
        <AppenderRef ref="YourAppender"/>
    </Logger>
    Configuration.defaultSetting().setVerbose(true);
    // redis uri
    "redis://127.0.0.1:6379?verbose=yes"

5.4. SSL connection

    System.setProperty("javax.net.ssl.keyStore", "/path/to/keystore");
    System.setProperty("javax.net.ssl.keyStorePassword", "password");
    System.setProperty("javax.net.ssl.keyStoreType", "your_type");

    System.setProperty("javax.net.ssl.trustStore", "/path/to/truststore");
    System.setProperty("javax.net.ssl.trustStorePassword", "password");
    System.setProperty("javax.net.ssl.trustStoreType", "your_type");

    Configuration.defaultSetting().setSsl(true);

    // optional setting
    Configuration.defaultSetting().setSslSocketFactory(sslSocketFactory);
    Configuration.defaultSetting().setSslParameters(sslParameters);
    Configuration.defaultSetting().setHostnameVerifier(hostnameVerifier);
    // redis uri
    "redis://127.0.0.1:6379?ssl=yes"
    "rediss://127.0.0.1:6379"

5.5. Auth

    Configuration.defaultSetting().setAuthUser("default");
    Configuration.defaultSetting().setAuthPassword("foobared");
    // redis uri
    "redis://127.0.0.1:6379?authPassword=foobared&authUser=default"
    "redis://default:foobared@127.0.0.1:6379"

5.6. Avoid full sync

  • Adjust redis server setting like the following
    repl-backlog-size
    repl-backlog-ttl
    repl-ping-slave-periods

repl-ping-slave-period MUST less than Configuration.getReadTimeout(), default Configuration.getReadTimeout() is 60 seconds

5.7. Lifecycle event

        Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
        final long start = System.currentTimeMillis();
        final AtomicInteger acc = new AtomicInteger(0);
        replicator.addEventListener(new EventListener() {
            @Override
            public void onEvent(Replicator replicator, Event event) {
                if(event instanceof PreRdbSyncEvent) {
                    System.out.println("pre rdb sync");
                } else if(event instanceof PostRdbSyncEvent) {
                    long end = System.currentTimeMillis();
                    System.out.println("time elapsed:" + (end - start));
                    System.out.println("rdb event count:" + acc.get());
                } else {
                    acc.incrementAndGet();
                }
            }
        });
        replicator.open();

5.8. Handle huge key value pair

According to 4.3. Write your own rdb parser, This tool built in an Iterable Rdb Parser so that handle huge key value pair.
More details please refer to:
[1] HugeKVFileExample.java
[2] HugeKVSocketExample.java

5.9. Redis6 support

5.9.1. SSL support

    $cd /path/to/redis
    $./utils/gen-test-certs.sh
    $cd tests/tls
    $openssl pkcs12 -export -CAfile ca.crt -in redis.crt -inkey redis.key -out redis.p12
    $cd /path/to/redis
    $./src/redis-server --tls-port 6379 --port 0 --tls-cert-file ./tests/tls/redis.crt \
         --tls-key-file ./tests/tls/redis.key --tls-ca-cert-file ./tests/tls/ca.crt \
         --tls-replication yes --bind 0.0.0.0 --protected-mode no

    System.setProperty("javax.net.ssl.keyStore", "/path/to/redis/tests/tls/redis.p12");
    System.setProperty("javax.net.ssl.keyStorePassword", "password");
    System.setProperty("javax.net.ssl.keyStoreType", "pkcs12");

    System.setProperty("javax.net.ssl.trustStore", "/path/to/redis/tests/tls/redis.p12");
    System.setProperty("javax.net.ssl.trustStorePassword", "password");
    System.setProperty("javax.net.ssl.trustStoreType", "pkcs12");

    Replicator replicator = new RedisReplicator("rediss://127.0.0.1:6379");

If you don't want to use System.setProperty you can programing as following

    RedisSslContextFactory factory = new RedisSslContextFactory();
    factory.setKeyStorePath("/path/to/redis/tests/tls/redis.p12");
    factory.setKeyStoreType("pkcs12");
    factory.setKeyStorePassword("password");

    factory.setTrustStorePath("/path/to/redis/tests/tls/redis.p12");
    factory.setTrustStoreType("pkcs12");
    factory.setTrustStorePassword("password");

    SslConfiguration ssl = SslConfiguration.defaultSetting().setSslContextFactory(factory);
    Replicator replicator = new RedisReplicator("rediss://127.0.0.1:6379", ssl);

5.9.2. ACL support

    Replicator replicator = new RedisReplicator("redis://user:pass@127.0.0.1:6379");

5.10. Redis7 support

5.10.1. Function

Since redis 7.0 add function support. and function structure stored in rdb file. we can use following method to parse function.

    Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
    replicator.addEventListener(new EventListener() {
        @Override
        public void onEvent(Replicator replicator, Event event) {
            if (event instanceof Function) {
                Function function = (Function) event;
                function.getCode();
                    
                // your code goes here
            }
        }
    });
    replicator.open();

you can also parse function to serialized data so that use FUNCTION RESTORE to restore serialized data to target redis

    Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
    replicator.setRdbVisitor(new DumpRdbVisitor(replicator));
    replicator.addEventListener(new EventListener() {
        @Override
        public void onEvent(Replicator replicator, Event event) {
            if (event instanceof DumpFunction) {
                DumpFunction function = (DumpFunction) event;
                byte[] serialized = function.getSerialized();
                // your code goes here
                // you can use FUNCTION RESTORE to restore above serialized data to target redis
            }
        }
    });
    replicator.open();

6. Contributors

7. References

8. Supported by

8.1. 宁文君

27 January 2023, A sad day that I lost my mother 宁文君, She was encouraging and supporting me in developing this tool. Every time a company uses this tool, she got excited like a child and encouraged me to keep going. Without her I couldn't have maintained this tool for so many years. Even I didn't achieve much but she is still proud of me, R.I.P and hope God bless her.

8.2. YourKit

YourKit
YourKit is kindly supporting this open source project with its full-featured Java Profiler.
YourKit, LLC is the creator of innovative and intelligent tools for profiling
Java and .NET applications. Take a look at YourKit's leading software products:
YourKit Java Profiler and YourKit .NET Profiler.

8.3. IntelliJ IDEA

IntelliJ IDEA is a Java integrated development environment (IDE) for developing computer software.
It is developed by JetBrains (formerly known as IntelliJ), and is available as an Apache 2 Licensed community edition,
and in a proprietary commercial edition. Both can be used for commercial development.

8.4. Redisson

Redisson is Redis based In-Memory Data Grid for Java offers distributed objects and services (BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) backed by Redis server. Redisson provides more convenient and easiest way to work with Redis. Redisson objects provides a separation of concern, which allows you to keep focus on the data modeling and application logic.