ExtractTopic, key schema can't be null
arnitolog opened this issue · 2 comments
Hello,
I'm trying to use ExtractTopic transformation. It works fine when I produce messages and explicitly set the key.
But if I'm trying to set Key from the field, the connector fails with the error:
key schema can't be null
here is a working config when I explicitly set the key:
command to produce messages:
kafkactl produce test-topic --file elasticsearch/data.json --key test12
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: "my-elasticsearch.sink"
labels:
strimzi.io/cluster: connect-cluster
spec:
autoRestart:
enabled: true
class: "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
tasksMax: 2
config:
client.id: "connect-my-elasticsearch-sink"
consumer.override.client.rack: "${env::STRIMZI_RACK_ID}"
consumer.override.auto.offset.reset: "earliest"
topics.regex: "test-topic"
connection.url: "http://elasticsearch.svc.cluster.local:9200"
connection.username: "xxxx"
connection.password: "xxxx"
write.method: "UPSERT"
key.ignore: "false"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
transforms: "ExtractTopicFromKey,IndexRouter"
transforms.ExtractTopicFromKey.type: io.aiven.kafka.connect.transforms.ExtractTopic$Key
transforms.IndexRouter.type: "org.apache.kafka.connect.transforms.RegexRouter"
transforms.IndexRouter.regex: "(.*)"
transforms.IndexRouter.replacement: "test_index_$1"
and here is not-working config when I'm trying to retrieve ClientId from the message and use it as Key and then as topic name
kafkactl produce test-topic --file elasticsearch/data.json --key test12
data.json content
{
"Action": 2,
"ClientId": "testclient",
"RawId": 179645,
"Message": "A message from testclient"
}
connector's config
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: "my-elasticsearch.sink"
labels:
strimzi.io/cluster: connect-cluster
spec:
autoRestart:
enabled: true
class: "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
tasksMax: 2
config:
client.id: "connect-my-elasticsearch-sink"
consumer.override.client.rack: "${env::STRIMZI_RACK_ID}"
consumer.override.auto.offset.reset: "earliest"
topics.regex: "test-topic"
connection.url: "http://elasticsearch.svc.cluster.local:9200"
connection.username: "xxxx"
connection.password: "xxxx"
write.method: "UPSERT"
key.ignore: "false"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
transforms: "CreateTopicID,ExtractTopicID,ExtractTopicFromKey,IndexRouter"
transforms.CreateTopicID.type: "org.apache.kafka.connect.transforms.ValueToKey"
transforms.CreateTopicID.fields: "ClientId"
transforms.ExtractTopicID.type: "org.apache.kafka.connect.transforms.ExtractField$Key"
transforms.ExtractTopicID.field: "ClientId"
transforms.ExtractTopicFromKey.type: io.aiven.kafka.connect.transforms.ExtractTopic$Key
transforms.IndexRouter.type: "org.apache.kafka.connect.transforms.RegexRouter"
transforms.IndexRouter.regex: "(.*)"
transforms.IndexRouter.replacement: "test_index_$1"
here is the error:
org.apache.kafka.connect.errors.DataException: key schema can't be null: SinkRecord{kafkaOffset=48, timestampType=CreateTime} ConnectRecord{topic='test-topic', kafkaPartition=0, key=testclient, keySchema=null, value={Action=2, RawId=179645, Message=A message from testclient, ClientId=testclient}, valueSchema=null, timestamp=1683821808209, headers=ConnectHeaders(headers=)} at io.aiven.kafka.connect.transforms.ExtractTopic.apply(ExtractTopic.java:66) at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:540) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833)
--
@arnitolog thanks for reporting this.
At the moment the transform is scoped for schema-based values only. I was trying to see if Cast
transform may help us here, but it doesn't set a schema, even when whole type is casted.
I would consider this a valid improvement, and will check if there's any bandwidth to implement it in the near future. Also, I'd encourage you to create a PR, I'd be happy to assist if you find any blocker.
@arnitolog , I actually gave a try on this to be included on the next release: #98