confluentinc/kafka-connect-datagen

Connector fails when used with optional reference objects

Opened this issue · 0 comments

Hi!

There is a bug in the connector that causes it to fail with an exception when used with optional reference objects.
The problem appears when i try to reference an object that is defined earlier in the schema, by the full name of the object.
This works perfectly fine in my kafka streams application, and seems to be a perfectly valid avro schema.

The error below is caused by the default/optional part of the grandparents object in the avro schema.
If i remove the default field and dont have the grandparent field as an optional, the datagen works fine.
Sadly this is not an option for me as i have to have all my fields optional.

Is there a reason this is not supported in datagen connector ? Or a bug that is possible to fix?

How to reproduce the problem

Try to produce avro data to a topic with the following avro schema;

{
  "name": "SimplePersonAvro",
  "type": "record",
  "namespace": "simple.avro",
  "fields": [
    {
      "name": "person",
      "type": {
        "type": "array",
        "items": {
          "name": "Person",
          "type": "record",
          "fields": [
            {
              "name": "name",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "age",
              "type": [
                "null",
                "string"
              ],
              "default": null
            }
          ]
        }
      }
    },
    {
      "name": "father",
      "default": null,
      "type": [
        "null",
        {
          "name": "Parent",
          "type": "record",
          "fields": [
            {
              "name": "greatGrandParents",
              "default": null,
              "type": [
                "null",
                {
                  "type": "array",
                  "items": "simple.avro.Person"
                }
              ]
            }
          ]
        }
      ]
    }
  ]
}

Example connector to use with avro schema;

{
  "name": "SimpleAvroConnector",
  "config": {
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "tasks.max": 1,
    "kafka.topic": "input_topic",
    "max.interval": 10000,
    "iterations": 100,
    "schema.filename": "simple_avro.avsc",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "value.converter.enhanced.avro.schema.support": true
  }
}

When a connector with the above configurations are deployed, it crashed with the following error;

[2021-12-09 16:10:00,614] ERROR WorkerSourceTask{id=SimpleAvroConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.
connect.runtime.WorkerTask:184)
java.lang.NullPointerException
        at io.confluent.kafka.connect.datagen.DatagenTask.getOptionalValue(DatagenTask.java:300)
        at io.confluent.kafka.connect.datagen.DatagenTask.getOptionalValue(DatagenTask.java:316)
        at io.confluent.kafka.connect.datagen.DatagenTask.poll(DatagenTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:268)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:241)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)