Ignore null headers and keys
aljoshare opened this issue · 2 comments
aljoshare commented
Hello everyone,
is there any possibility to ignore headers/keys which are null. For values there is behavior.on.null.values
but for headers/keys I can't find something similar. The use case is that if in the past messages without keys/headers are published to Kafka and at some point in time keys and headers are introduced that I would like to stream to s3, the connector fails because of old null headers/keys:
│ org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.DataException: Key cannot be null for SinkRecord: SinkRecord{kafkaOffset=0, topic='********', kafkaPartition=0} │
│ at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:578) │
│ at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:311) │
│ at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:254) │
│ at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:205) │
│ at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:234) │
│ at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) │
│ at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) │
│ at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) │
│ at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) │
│ at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182) │
│ at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231) │
│ at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) │
│ at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) │
│ at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) │
│ at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) │
│ at java.base/java.lang.Thread.run(Thread.java:829) │
│ Caused by: org.apache.kafka.connect.errors.DataException: Key cannot be null for SinkRecord: SinkRecord{kafkaOffset=0, topic='********', kafkaPartition=0} │
│ at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.write(KeyValueHeaderRecordWriterProvider.java:93) │
│ at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:562) │
│ ... 15 more │
│ 2022-12-01 07:13:09,188 ERROR WorkerSinkTask{id=s3-sink-connector-8} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-s3-sink-connector-8] │
│ org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. │
│ at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609) │
│ at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) │
│ at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) │
│ at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) │
│ at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182) │
│ at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231) │
│ at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) │
│ at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) │
│ at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) │
│ at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) │
│ at java.base/java.lang.Thread.run(Thread.java:829) │
│ Caused by: org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.DataException: Key cannot be null for SinkRecord: SinkRecord{kafkaOffset=0, topic='********', kafkaPartition=0} │
│ at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:578) │
│ at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:311) │
│ at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:254) │
│ at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:205) │
│ at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:234) │
│ at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) │
│ ... 10 more │
│ Caused by: org.apache.kafka.connect.errors.DataException: Key cannot be null for SinkRecord: SinkRecord{kafkaOffset=0, topic='********', kafkaPartition=0} │
│ at io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider$1.write(KeyValueHeaderRecordWriterProvider.java:93) │
│ at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:562) │
│ ... 15 more
Thank you very much!
raphaelauv commented
markallanson commented
This only works if you want to filter these out. What if you want to store the null header value?