Aiven-Open/transforms-for-apache-kafka-connect

ExtractTopic, key schema can't be null

arnitolog opened this issue · 2 comments

Hello,

I'm trying to use ExtractTopic transformation. It works fine when I produce messages and explicitly set the key.
But if I'm trying to set Key from the field, the connector fails with the error:
key schema can't be null

here is a working config when I explicitly set the key:
command to produce messages:
kafkactl produce test-topic --file elasticsearch/data.json --key test12

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: "my-elasticsearch.sink"
  labels:
    strimzi.io/cluster: connect-cluster
spec:
  autoRestart:
    enabled: true
  class: "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
  tasksMax: 2
  config:
    client.id: "connect-my-elasticsearch-sink"
    consumer.override.client.rack: "${env::STRIMZI_RACK_ID}"
    consumer.override.auto.offset.reset: "earliest"
    topics.regex: "test-topic"
    connection.url: "http://elasticsearch.svc.cluster.local:9200"
    connection.username: "xxxx"
    connection.password: "xxxx"
    write.method: "UPSERT"
    key.ignore: "false"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    transforms: "ExtractTopicFromKey,IndexRouter"
    transforms.ExtractTopicFromKey.type: io.aiven.kafka.connect.transforms.ExtractTopic$Key
    transforms.IndexRouter.type: "org.apache.kafka.connect.transforms.RegexRouter"
    transforms.IndexRouter.regex: "(.*)"
    transforms.IndexRouter.replacement: "test_index_$1"

and here is not-working config when I'm trying to retrieve ClientId from the message and use it as Key and then as topic name
kafkactl produce test-topic --file elasticsearch/data.json --key test12
data.json content

{
  "Action": 2,
  "ClientId": "testclient",
  "RawId": 179645,
  "Message": "A message from testclient"
}

connector's config

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: "my-elasticsearch.sink"
  labels:
    strimzi.io/cluster: connect-cluster
spec:
  autoRestart:
    enabled: true
  class: "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
  tasksMax: 2
  config:
    client.id: "connect-my-elasticsearch-sink"
    consumer.override.client.rack: "${env::STRIMZI_RACK_ID}"
    consumer.override.auto.offset.reset: "earliest"
    topics.regex: "test-topic"
    connection.url: "http://elasticsearch.svc.cluster.local:9200"
    connection.username: "xxxx"
    connection.password: "xxxx"
    write.method: "UPSERT"
    key.ignore: "false"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    transforms: "CreateTopicID,ExtractTopicID,ExtractTopicFromKey,IndexRouter"
    transforms.CreateTopicID.type: "org.apache.kafka.connect.transforms.ValueToKey"
    transforms.CreateTopicID.fields: "ClientId"
    transforms.ExtractTopicID.type: "org.apache.kafka.connect.transforms.ExtractField$Key"
    transforms.ExtractTopicID.field: "ClientId"
    transforms.ExtractTopicFromKey.type: io.aiven.kafka.connect.transforms.ExtractTopic$Key
    transforms.IndexRouter.type: "org.apache.kafka.connect.transforms.RegexRouter"
    transforms.IndexRouter.regex: "(.*)"
    transforms.IndexRouter.replacement: "test_index_$1"

here is the error:

org.apache.kafka.connect.errors.DataException: key schema can't be null: SinkRecord{kafkaOffset=48, timestampType=CreateTime} ConnectRecord{topic='test-topic', kafkaPartition=0, key=testclient, keySchema=null, value={Action=2, RawId=179645, Message=A message from testclient, ClientId=testclient}, valueSchema=null, timestamp=1683821808209, headers=ConnectHeaders(headers=)} 	at io.aiven.kafka.connect.transforms.ExtractTopic.apply(ExtractTopic.java:66) 	at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173) 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207) 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149) 	at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:540) 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493) 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332) 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244) 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 	at java.base/java.lang.Thread.run(Thread.java:833)
--
jeqo commented

@arnitolog thanks for reporting this.

At the moment the transform is scoped for schema-based values only. I was trying to see if Cast transform may help us here, but it doesn't set a schema, even when whole type is casted.

I would consider this a valid improvement, and will check if there's any bandwidth to implement it in the near future. Also, I'd encourage you to create a PR, I'd be happy to assist if you find any blocker.

jeqo commented

@arnitolog , I actually gave a try on this to be included on the next release: #98