Keys are not delete from redis when redis.command is SET
Opened this issue · 1 comments
We have noticed that in v0.9.0
(currently we are using redis/redis-enterprise-kafka:6.7.4
and don't have this issue) keys are not deleted from redis if message value is null when redis.command
is SET
, instead of deletion value is set to empty string.
Here https://redis-field-engineering.github.io/redis-kafka-connect/#_sink_string it says:
String or bytes. If value is null the key is deleted.
So we assume that redis item deletion is expected behaviour.
Providing example reproduced using jupyter notebooks:
I'm experiencing this issue too. In fact, it kills the whole connector.
My connector configuration (using terraform to define):
resource "kafka-connect_connector" "redis-kafka-connect" {
name = "redis-kafka-connect"
config = {
"name": "redis-kafka-connect",
"connector.class": "com.redis.kafka.connect.RedisSinkConnector",
"tasks.max": "1",
"topics": "${var.topic}",
"redis.uri": "redis://redis:6379",
"redis.command": "SET",
# The key is serialized using Avro, but for visibility in tooling we want to convert it to just the entity UUID using a Single-Message Transform
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
# The value is also serialized in Avro, but we don't want to make any changes - leave it serialized in Redis
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
# Take the `id` field from the key and replace the entire key with just the UUID value. Picture below showing that this transform works as expected for create/update events
"transforms": "ExtractField",
"transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractField.field": "id"
}
}
This setup works for creates/updates
But it kills the connector when the value is a null value.
Error: org.apache.kafka.connect.errors.DataException: The value for the record must be a string or byte array. Consider using the StringConverter or ByteArrayConverter if the data is stored in Kafka in the format needed in Redis. (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect | 2024-07-09T05:05:18.013549922Z com.redis.kafka.connect.shaded.io.lettuce.core.RedisException: org.apache.kafka.connect.errors.DataException: The value for the record must be a string or byte array. Consider using the StringConverter or ByteArrayConverter if the data is stored in Kafka in the format needed in Redis.
connect | 2024-07-09T05:05:18.013552003Z at com.redis.kafka.connect.shaded.io.lettuce.core.internal.Exceptions.fromSynchronization(Exceptions.java:106)
connect | 2024-07-09T05:05:18.013553622Z at com.redis.kafka.connect.shaded.com.redis.spring.batch.common.AbstractOperationExecutor.process(AbstractOperationExecutor.java:124)
connect | 2024-07-09T05:05:18.013555046Z at com.redis.kafka.connect.shaded.com.redis.spring.batch.writer.AbstractOperationItemWriter.write(AbstractOperationItemWriter.java:47)
connect | 2024-07-09T05:05:18.013556414Z at com.redis.kafka.connect.sink.RedisSinkTask.put(RedisSinkTask.java:328)
connect | 2024-07-09T05:05:18.013557686Z at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
connect | 2024-07-09T05:05:18.013558962Z at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
connect | 2024-07-09T05:05:18.013560234Z at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
connect | 2024-07-09T05:05:18.013561502Z at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
connect | 2024-07-09T05:05:18.013562770Z at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
connect | 2024-07-09T05:05:18.013564125Z at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
connect | 2024-07-09T05:05:18.013565430Z at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
connect | 2024-07-09T05:05:18.013566721Z at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect | 2024-07-09T05:05:18.013568017Z at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect | 2024-07-09T05:05:18.013569932Z at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect | 2024-07-09T05:05:18.013571252Z at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect | 2024-07-09T05:05:18.013572582Z at java.base/java.lang.Thread.run(Thread.java:829)