confluentinc/kafka-connect-storage-cloud

Ignore null headers and keys

aljoshare opened this issue · 2 comments

Hello everyone,

is there any possibility to ignore headers/keys which are null. For values there is behavior.on.null.values but for headers/keys I can't find something similar. The use case is that if in the past messages without keys/headers are published to Kafka and at some point in time keys and headers are introduced that I would like to stream to s3, the connector fails because of old null headers/keys:

│ org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.DataException: Key cannot be null for SinkRecord: SinkRecord{kafkaOffset=0, topic='********', kafkaPartition=0}                                                                                                                                                              │
│     at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:578)                                                                                                                                                                                                                                                                            │
│     at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:311)                                                                                                                                                                                                                                                                  │
│     at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:254)                                                                                                                                                                                                                                                                           │
│     at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:205)                                                                                                                                                                                                                                                                                  │
│     at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:234)                                                                                                                                                                                                                                                                                                        │
│     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)                                                                                                                                                                                                                                                                           │
│     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)                                                                                                                                                                                                                                                                                      │
│     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)                                                                                                                                                                                                                                                                                 │
│     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)                                                                                                                                                                                                                                                                                   │
│     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)                                                                                                                                                                                                                                                                                             │
│     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)                                                                                                                                                                                                                                                                                               │
│     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)                                                                                                                                                                                                                                                                                  │
│     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)                                                                                                                                                                                                                                                                                                 │
│     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)                                                                                                                                                                                                                                                                          │
│     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)                                                                                                                                                                                                                                                                          │
│     at java.base/java.lang.Thread.run(Thread.java:829)                                                                                                                                                                                                                                                                                                                    │
│ Caused by: org.apache.kafka.connect.errors.DataException: Key cannot be null for SinkRecord: SinkRecord{kafkaOffset=0, topic='********', kafkaPartition=0}                                                                                                                                                                                                     │
│     at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.write(KeyValueHeaderRecordWriterProvider.java:93)                                                                                                                                                                                                                                              │
│     at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:562)                                                                                                                                                                                                                                                                            │
│     ... 15 more                                                                                                                                                                                                                                                                                                                                                           │
│ 2022-12-01 07:13:09,188 ERROR WorkerSinkTask{id=s3-sink-connector-8} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-s3-sink-connector-8]                                                                                               │
│ org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.                                                                                                                                                                                                                                                                  │
│     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609)                                                                                                                                                                                                                                                                           │
│     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)                                                                                                                                                                                                                                                                                      │
│     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)                                                                                                                                                                                                                                                                                 │
│     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)                                                                                                                                                                                                                                                                                   │
│     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)                                                                                                                                                                                                                                                                                             │
│     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)                                                                                                                                                                                                                                                                                               │
│     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)                                                                                                                                                                                                                                                                                  │
│     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)                                                                                                                                                                                                                                                                                                 │
│     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)                                                                                                                                                                                                                                                                          │
│     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)                                                                                                                                                                                                                                                                          │
│     at java.base/java.lang.Thread.run(Thread.java:829)                                                                                                                                                                                                                                                                                                                    │
│ Caused by: org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.DataException: Key cannot be null for SinkRecord: SinkRecord{kafkaOffset=0, topic='********', kafkaPartition=0}                                                                                                                                                   │
│     at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:578)                                                                                                                                                                                                                                                                            │
│     at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:311)                                                                                                                                                                                                                                                                  │
│     at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:254)                                                                                                                                                                                                                                                                           │
│     at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:205)                                                                                                                                                                                                                                                                                  │
│     at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:234)                                                                                                                                                                                                                                                                                                        │
│     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)                                                                                                                                                                                                                                                                           │
│     ... 10 more                                                                                                                                                                                                                                                                                                                                                           │
│ Caused by: org.apache.kafka.connect.errors.DataException: Key cannot be null for SinkRecord: SinkRecord{kafkaOffset=0, topic='********', kafkaPartition=0}                                                                                                                                                                                                     │
│     at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.write(KeyValueHeaderRecordWriterProvider.java:93)                                                                                                                                                                                                                                              │
│     at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:562)                                                                                                                                                                                                                                                                            │
│     ... 15 more 

Thank you very much!

This only works if you want to filter these out. What if you want to store the null header value?