redis/riot

Replication errors from Elasticache to Redis

pamelayin opened this issue · 3 comments

Hi, currently I'm testing out Riot as migration tool from Redis to Elasticache and also the other way around (for fallback).

I'm running into 2 separate errors for the fallback. Setup and errors are described below.

Setup
On-prem redis: version 4.0.10 standalone
Elasticache: version 6.2.6 standalone
Riot: version v3.1.1 on AWS EC2 instance (OS: Amazon Linux 2)
Data generation: tried with both riot generate with --types=hash,list,set,sorted set, string and redis-benchmark -t zadd,hset,rpush,sadd,set

First data has been replicated from on-prem Redis to Elasticache via riot <onprem source> replicate <elasticache target> --mode live.
Then doing fallback using riot <elasticache source> replicate <onprem target> --mode live --type ds.
The errors are produced during the fallback step.

Errors

  1. Encountered an error executing step scan-step in job replicate-job: io.lettuce.core.output.StatusOutput does not support set(long)

data generation:
redis-benchmark <onprem redis> -t zadd,hset,rpush,sadd,set -r 10000 -q -n 10000

stack trace:

riot --stacktrace <elasticache source> replicate <onprem target> --mode live --type ds --scan-type string
Listening  ? % [=                                                ] 0/? (0:00:00 / ?) ?/s
Scanning  63% [===============          ] 4050/6376 (0:00:06 / 0:00:03) 675.0/s19:13:20.407 SEVERE org.springframework.batch.core.step.AbstractStep	: Encountered an error executing step scan in job replicate
org.springframework.retry.ExhaustedRetryException: Retry exhausted after last attempt in recovery path, but exception is not skippable.; nested exception is java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: io.lettuce.core.output.StatusOutput does not support set(long)
	at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$5.recover(FaultTolerantChunkProcessor.java:429)
	at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
	at org.springframework.batch.core.step.item.BatchRetryTemplate.execute(BatchRetryTemplate.java:217)
	at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.write(FaultTolerantChunkProcessor.java:444)
	at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:217)
	at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77)
	at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
	at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
	at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
	at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
	at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
	at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
	at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
	at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
	at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
	at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
	at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:152)
	at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68)
	at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68)
	at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169)
	at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144)
	at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:94)
	at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:91)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: io.lettuce.core.output.StatusOutput does not support set(long)
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022)
	at com.redis.spring.batch.common.OperationItemProcessor.process(OperationItemProcessor.java:92)
	at com.redis.spring.batch.writer.AbstractRedisItemWriter.write(AbstractRedisItemWriter.java:71)
	at org.springframework.batch.core.step.item.SimpleChunkProcessor.writeItems(SimpleChunkProcessor.java:193)
	at org.springframework.batch.core.step.item.SimpleChunkProcessor.doWrite(SimpleChunkProcessor.java:159)
	at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$3.doWithRetry(FaultTolerantChunkProcessor.java:348)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
	... 24 more
Caused by: java.lang.UnsupportedOperationException: io.lettuce.core.output.StatusOutput does not support set(long)
	at io.lettuce.core.output.CommandOutput.set(CommandOutput.java:107)
	at io.lettuce.core.protocol.RedisStateMachine.safeSet(RedisStateMachine.java:778)
	at io.lettuce.core.protocol.RedisStateMachine.handleInteger(RedisStateMachine.java:404)
	at io.lettuce.core.protocol.RedisStateMachine$State$Type.handle(RedisStateMachine.java:206)
	at io.lettuce.core.protocol.RedisStateMachine.doDecode(RedisStateMachine.java:334)
	at io.lettuce.core.protocol.RedisStateMachine.decode(RedisStateMachine.java:295)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:842)
	at io.lettuce.core.protocol.CommandHandler.decode0(CommandHandler.java:793)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:767)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:659)
	at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:599)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1383)
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1246)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1295)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	... 1 more

Is it really that there's limitation in replicating set data types or is it caused by another issue?

  1. Encountered an error executing step scan-step in job replicate-job: For input string: "OK"

data generation:
redis-benchmark <onprem redis> -t zadd,hset,rpush,sadd,set -r 1000 -q -n 10000 then
redis-benchmark <onprem redis> -t zadd,hset,rpush,sadd,set -r 10000 -q -n 10000

stack trace:

riot --stacktrace <elasticache source> replicate <onprem target> --mode live --type ds
Listening  ? % [=                                                ] 0/? (0:00:00 / ?) ?/s
Scanning  47% [===========              ] 3150/6659 (0:00:05 / 0:00:05) 630.0/s19:06:17.243 SEVERE org.springframework.batch.core.step.AbstractStep	: Encountered an error executing step scan in job replicate
org.springframework.retry.ExhaustedRetryException: Retry exhausted after last attempt in recovery path, but exception is not skippable.; nested exception is java.util.concurrent.ExecutionException: java.lang.NumberFormatException: For input string: "OK"
	at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$5.recover(FaultTolerantChunkProcessor.java:429)
	at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
	at org.springframework.batch.core.step.item.BatchRetryTemplate.execute(BatchRetryTemplate.java:217)
	at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.write(FaultTolerantChunkProcessor.java:444)
	at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:217)
	at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77)
	at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
	at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
	at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
	at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
	at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
	at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
	at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
	at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
	at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
	at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
	at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:152)
	at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68)
	at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68)
	at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169)
	at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144)
	at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:94)
	at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:91)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.ExecutionException: java.lang.NumberFormatException: For input string: "OK"
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022)
	at com.redis.spring.batch.common.OperationItemProcessor.process(OperationItemProcessor.java:92)
	at com.redis.spring.batch.writer.AbstractRedisItemWriter.write(AbstractRedisItemWriter.java:71)
	at org.springframework.batch.core.step.item.SimpleChunkProcessor.writeItems(SimpleChunkProcessor.java:193)
	at org.springframework.batch.core.step.item.SimpleChunkProcessor.doWrite(SimpleChunkProcessor.java:159)
	at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$3.doWithRetry(FaultTolerantChunkProcessor.java:348)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
	... 24 more
Caused by: java.lang.NumberFormatException: For input string: "OK"
	at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.base/java.lang.Long.parseLong(Long.java:692)
	at java.base/java.lang.Long.parseLong(Long.java:817)
	at io.lettuce.core.output.IntegerOutput.set(IntegerOutput.java:48)
	at io.lettuce.core.protocol.RedisStateMachine.safeSetSingle(RedisStateMachine.java:826)
	at io.lettuce.core.protocol.RedisStateMachine.handleSingle(RedisStateMachine.java:358)
	at io.lettuce.core.protocol.RedisStateMachine$State$Type.handle(RedisStateMachine.java:206)
	at io.lettuce.core.protocol.RedisStateMachine.doDecode(RedisStateMachine.java:334)
	at io.lettuce.core.protocol.RedisStateMachine.decode(RedisStateMachine.java:295)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:842)
	at io.lettuce.core.protocol.CommandHandler.decode0(CommandHandler.java:793)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:767)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:659)
	at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:599)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1383)
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1246)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1295)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	... 1 more

Is this actual bug or some condition I can get around otherwise?

jruaux commented

Hi, I tried to replicate your issue with same Redis versions (except redis 6.2.6 container instead of elasticache) but didn't have any error. Would you be able to share a RDB dump?

Hi @jruaux I'm not sure if I can share data directly due to company policy but I was able to reproduce it on redis 6.2.6 using both redis-benchmark and riot generate. It seems like they are both related to the zset data type specifically.

New setup:

node1: redis 4.0.10 container on-premise
node2: redis 6.2.6 container on-premise

Testing: same as before but was able to reproduce it with riot generate as well

Error 1 - set(long)

  1. data generation: riot <node1> generate --types=zset --count 10000
  2. replicate data from node1 to node2: riot <node1> replicate <node2> --mode live
  3. replicate back from node2 to node1: riot <node2> replicate <node1> --mode live --type ds

Error2 - input string "OK"

  1. data generation: riot <node1> generate --types=zset --count 30000
  2. replicate data from node1 to node2: riot <node1> replicate <node2> --mode live
  3. replicate back from node2 to node1: riot <node2> replicate <node1> --mode live --type ds

This testing was done with same riot version as before (v3.1.3). Hope this is enough info to reproduce the error on your end.

jruaux commented

This was due to a bug in Lettuce which has been fixed in their latest release. Latest RIOT release (3.1.5) was upgraded to that Lettuce release.