jcustenborder/kafka-connect-redis

Not able to connect with Elastic cache Redis

Closed this issue · 12 comments

Getting this error from the connector but I am able to connect using Redis CLI:

Unable to connect
	at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:94)
	at io.lettuce.core.AbstractRedisClient.getConnection(AbstractRedisClient.java:261)
	at io.lettuce.core.cluster.RedisClusterClient.connect(RedisClusterClient.java:348)
	at com.github.jcustenborder.kafka.connect.redis.RedisSessionFactoryImpl$RedisSessionImpl.create(RedisSessionFactoryImpl.java:154)
	at com.github.jcustenborder.kafka.connect.redis.RedisSessionFactoryImpl.create(RedisSessionFactoryImpl.java:52)
	at com.github.jcustenborder.kafka.connect.redis.RedisSinkTask.start(RedisSinkTask.java:72)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:309)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.UnsupportedOperationException
	at java.base/java.util.AbstractList.add(AbstractList.java:153)
	at java.base/java.util.AbstractList.add(AbstractList.java:111)
	at io.lettuce.core.output.ArrayOutput.set(ArrayOutput.java:54)
	at io.lettuce.core.protocol.RedisStateMachine.safeSet(RedisStateMachine.java:358)
	at io.lettuce.core.protocol.RedisStateMachine.decode(RedisStateMachine.java:139)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:716)
	at io.lettuce.core.protocol.CommandHandler.decode0(CommandHandler.java:680)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:675)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:596)
	at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:487)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:385)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	... 1 more

I am using the below configs:

cp-kafka-connect - cp-kafka-connect:6.1.9
redis-sink - jcustenborder-kafka-connect-redis-0.0.2.17
Redis - AWS elastic cache

Connect config:

{ "connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector", "topics": "TOPIC_NAME", "input.data.format": "BYTES", "tasks.max": "1", "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "redis.hosts": "TARGET_REDIS_URL_WITHOUT_PORT_AND_HTTP", "redis.client.mode":"Cluster" }

Need help for this Urgently. Thanks!

hi @jasbirDream11! Can you give this version a shot? https://github.com/jcustenborder/kafka-connect-redis/tree/v2

This is the next release that has support for Redis Cluster.

Hey @jcustenborder Thanks for your quick response. I just tried with 0.0.4 version but still it is giving me the same error. Is it because of AWS elastic cache not being supported or something or this is just a cluster issue?

Hey @jasbirDream11 Did you clone that branch and build it? Try this package? It should be something like 0.2-SNAPSHOT.

jcustenborder-kafka-connect-redis-0.2-SNAPSHOT.zip

I actually downloaded from confluent-hub directly and it contained all the jars. For local testing I ran it in standalone mode with redis.host as localhost:6379 and it worked fine on the VM. But it was just a simple Redis that I used and not AWS elastic cache.

The branch I am pointing to does not exist on the Confluent Hub yet.

Gotcha. My Bad. I'll try and build this branch and test it with this jar.

Hey @jasbirDream11 Did you clone that branch and build it? Try this package? It should be something like 0.2-SNAPSHOT.

jcustenborder-kafka-connect-redis-0.2-SNAPSHOT.zip

Here is a build with updated dependencies. Make sure you use mvn clean package if you build it yourself. Take the zip and all of it's contents from target/components/...

Thanks Bro @jcustenborder . It worked like a charm. Can't thank you enough for your quick help.

@jasbirDream11 did this work for you?

Yeah. Thanks for your quick help. It worked without any issues. I have one doubt regarding the time it takes to update the records in Redis. Currently it is doing every minute even though I have multiple tasks running. Is there any way to reduce it down further? @jcustenborder

I think this is something that I'll need to do a slight code change for. Right now I'm using the put() method to write changes to redis using lettuce pipelining. Flushing the commands to redis is handled during the flush() method call. I didn't think about the fact flushing of offsets to Kafka can be delayed. I think I need to refactor this.

@jasbirDream11 As a work around, can you try setting offset.flush.interval.ms=5000? I'm not sure if can do this at the connector level you might need to do it at the worker level.

@jcustenborder Tried the property at connector level. It did not change anything. Will try to check the code once to test this out. Thanks for the help!