GCS Files Overwritten every minute by kafka sink connector and some messages missing
Closed this issue ยท 14 comments
Some of the files written by kafka sink connector in bucket has 0 bytes, I wonder if this is normal behaviour?
In another hour all of the files have 0 bytes and I notice they keep getting overwritten every minute until next hour and it keeps happening again.
I also found this error in connector logs, not sure if they are related with the issue:
2024-02-09 04:06:01,665 ERROR [de-lsm-gcs-connector|task-1] WorkerSinkTask{id=de-lsm-gcs-connector-1} Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-de-lsm-gcs-connector-1]
org.apache.kafka.connect.errors.ConnectException: java.lang.NullPointerException
at io.aiven.kafka.connect.gcs.GcsSinkTask.flushFile(GcsSinkTask.java:131)
at java.base/java.util.HashMap.forEach(HashMap.java:1421)
at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1553)
at io.aiven.kafka.connect.gcs.GcsSinkTask.flush(GcsSinkTask.java:114)
at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:407)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:377)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException
...
2024-02-09 04:06:01,665 ERROR [de-lsm-gcs-connector|task-1] WorkerSinkTask{id=de-lsm-gcs-connector-1} Commit of offsets threw an unexpected exception for sequence number 124: null (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-de-lsm-gcs-connector-1]
org.apache.kafka.connect.errors.ConnectException: java.lang.NullPointerException
at io.aiven.kafka.connect.gcs.GcsSinkTask.flushFile(GcsSinkTask.java:131)
at java.base/java.util.HashMap.forEach(HashMap.java:1421)
at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1553)
at io.aiven.kafka.connect.gcs.GcsSinkTask.flush(GcsSinkTask.java:114)
at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:407)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:377)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException
...
aiven version: 0.13.0
kafka version: 3.5.1
this is my connector configuration:
{
"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
"tasks.max": "4",
"topics": "loan_logs",
"gcs.credentials.path": "<credspath>",
"gcs.bucket.name": "datalake-raw",
"file.name.prefix": "lsm/",
"file.name.timestamp.timezone": "Asia/Jakarta",
"file.name.template": "{{topic}}/sink_date={{timestamp:unit=yyyy}}{{timestamp:unit=MM}}{{timestamp:unit=dd}}/sink_hour={{timestamp:unit=HH}}/p{{partition:padding=false}}-{{start_offset:padding=true}}.snappy.parquet",
"file.compression.type": "snappy",
"format.output.fields": "key,value,offset,timestamp,headers",
"format.output.type": "parquet",
"format.output.envelope": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "false",
"behavior.on.null.values": "ignore"
}
I am using the same configuration in different kafka with version 3.3
and aiven 0.9.0
and there has not been any issue similar to this. Is there any new configuration I am missing?
topic config
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"kafka.strimzi.io/v1beta2","kind":"KafkaTopic","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"kafka01"},"name":"loan-logs---184077157bfc68094e73c105123319efa806b1f3","namespace":"kafka-prod"},"spec":{"config":{"cleanup.policy":"delete","max.message.bytes":"1048588","min.insync.replicas":"1","retention.bytes":"-1","retention.ms":"1209600000"},"partitions":10,"replicas":3,"topicName":"loan_logs"}}
creationTimestamp: "2023-11-09T03:34:54Z"
generation: 5
labels:
strimzi.io/cluster: kafka01
name: loan-logs---184077157bfc68094e73c105123319efa806b1f3
namespace: kafka-prod
resourceVersion: "141694147"
uid: 1936b625-2603-402c-be22-1d7333cfd029
spec:
config:
cleanup.policy: delete
max.message.bytes: "1048588"
message.format.version: 3.0-IV1
min.insync.replicas: "1"
retention.bytes: "-1"
retention.ms: "1209600000"
partitions: 10
replicas: 3
topicName: loan_logs
status:
conditions:
- lastTransitionTime: "2023-11-22T09:28:54.111074277Z"
status: "True"
type: Ready
observedGeneration: 5
topicName: loan_logs
sink connector:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
labels:
strimzi.io/cluster: gcs-connect-cluster
name: de-lsm-gcs-connector
namespace: kafka-operator
spec:
class: io.aiven.kafka.connect.gcs.GcsSinkConnector
config:
topics: loan_logs
file.name.prefix: lsm/
gcs.credentials.path: /opt/kafka/external-configuration/gcs-auth-secret/google_credentials.json
gcs.bucket.name: datalake-raw
file.name.timestamp.timezone: Asia/Jakarta
file.name.template: "{{topic}}/sink_date={{timestamp:unit=yyyy}}{{timestamp:unit=MM}}{{timestamp:unit=dd}}/sink_hour={{timestamp:unit=HH}}/p{{partition:padding=false}}-{{start_offset:padding=true}}.snappy.parquet"
file.compression.type: snappy
format.output.fields: key,value,offset,timestamp,headers
format.output.type: parquet
format.output.envelope: true
behavior.on.null.values: ignore
key.converter: org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable: false
tasksMax: 4
I forgot to mention earlier, we set up a cron job to pause-resume kafka connector for every 15 minutes to avoid small file problem in GCS. This is for cron jobs:
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: de-lsm-gcs-connector-pause
namespace: kafka-prod
spec:
schedule: "15,30,45,00 * * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: de-lsm-gcs-connector-pause
image: quay.io/curl/curl:8.6.0
imagePullPolicy: IfNotPresent
command:
- /usr/bin/curl
- -s
- -X
- PUT
- http://gcs-connect-cluster-connect-api:8083/connectors/de-lsm-gcs-connector/pause
restartPolicy: OnFailure
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: de-lsm-gcs-connector-resume
namespace: kafka-prod
spec:
schedule: "14,29,44,57 * * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: de-lsm-gcs-connector-resume
image: quay.io/curl/curl:8.6.0
imagePullPolicy: IfNotPresent
command:
- /usr/bin/curl
- -s
- -X
- PUT
- http://gcs-connect-cluster-connect-api:8083/connectors/de-lsm-gcs-connector/resume
restartPolicy: OnFailure
Even though there is job to pause the connector, it did not seem to affect the overwriting. The connector would be in state PAUSED briefly then start running again before resume job started and the overwriting continue.
I did say there were overwritten files with 0 byte size, but that was the case in dev environment. In production the files are not 0 bytes, but they are still got overwritten every minute. Since the filename has partition number prefix and ended with offset, I notice the offset got repeated in the same partition but different hour. and when I check the content there were indeed duplicate key with same offset. The files in GCS bucket are organized by sink date and sink hour, so different hour different object prefix. I think if there is not any prefix by sink date/hour, there would not be any duplicate.
(files in sink hour 9)
(files in sink hour 10)
@jeqo is there anything else I can provide?
@suisenkotoba just trying to find some time to look into this :) let me see what's possible tomorrow
alright thanks @jeqo
@suisenkotoba for completeness, could you also share the connect worker configurations?
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
annotations:
strimzi.io/use-connector-resources: "true"
name: gcs-connect-cluster
namespace: kafka-prod
spec:
bootstrapServers: kafka01-kafka-bootstrap:9092
config:
config.storage.replication.factor: 3
config.storage.topic: gcs-connect-cluster-configs
group.id: gcs-connect-cluster
max.retries: 100
offset.storage.replication.factor: 3
offset.storage.topic: gcs-connect-cluster-offsets
retry.backoff.ms: 5000
status.storage.replication.factor: 3
status.storage.topic: gcs-connect-cluster-status
jvmOptions:
"-Xms": "8g"
"-Xmx": "8g"
externalConfiguration:
volumes:
- name: gcs-auth-secret
secret:
secretName: gcs-auth-secret
image: asia.gcr.io/amf-prod/amf/kafka-connect:0.0.2
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
key: metrics-config.yml
name: connect-metrics
replicas: 1
template:
pod:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: name
operator: In
values:
- infra-workers
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchLabels:
strimzi.io/cluster: gcs-connect-cluster
topologyKey: kubernetes.io/hostname
tolerations:
- effect: NoSchedule
key: dedicated
operator: Equal
value: infra
version: 3.5.1
is this what you mean? @jeqo
@suisenkotoba thanks for sharing this info! Just be careful on not sharing too much info on a public issue :)
Let's me start by trying to separate issue, and check if I understood your context:
The "empty files" issue
Seems to happen only on dev.
It's not a expected behavior and seems to be related with the NPE shared -- though the stack trace is lacking details. Could you share what comes after:
Caused by: java.lang.NullPointerException
... <--
That should tell us what is causing the NPE.
Overwritten files
By overwritten, I think you mean files from the same partition/offset are present in different hours (i.e. duplicated)
As your are sharing images of 2 different hours and highlighting files with same partition/offset, I'm getting this idea.
Is this understanding correct?
I see why you may want to fake this "batching" behavior on connectors by using pause/resume; but I don't think it helps to achieve what you are looking for.
GCS Connector does batching itself in what's internally called "RecordGrouper". This Record Grouper is defined by the path patter defined, and keeps a map of [target paths, records] in memory until records are flushed to GCS.
The frequency is defined by the worker config offset.flush.interval.ms (defaults to 1minute). This is why you see new files written every minute or so.
If I'm reading your pause/resume cron jobs right, it means that connector is paused at min 00, and resumed at min 14, then paused at min 15 again. Is this what you are observing as well?
If this is correct, then we could replace this, by setting offset.flush.interval.ms
to 15 min, so all events from 15 min will be kept in memory until offset is flushed and files written to GCS. This way pause/resuming won't be needed.
The only consideration may be to have enough memory to keep events from 15min; but as you are already doing something similar by resuming connector every ~15min, then it should work similarly.
Back to "the same files duplicated between hours", it may be caused by pausing tasks and files being written but offsets not committed; then when resumed (in the next hour) connector starts from previous offsets and stores them on the new hour.
I'd suggest to give a try to keep connector running, tune the flushing, and remove pause/resume scripts, and see if this is enough to solve the issue.
@jeqo thanks for your feedback
Overwritten files
ignoring the duplicate in different hour, the files will be overwritten every minute in infinite loop. Actually the error is similar with error in dev which has 0 byte files.
I've tried turning off the cron jobs before yet the files were still overwritten. I'll try creating new connector with worker config offset.flush.interval.ms
as you suggested
@jeqo I re-created the kafka connect with additional config offset.flush.interval.ms=900000
yet the worker sink still failed to commit offset. However the error log seemed to a bit different, this time the NPE related to key schema.
2024-02-16 10:31:50,545 INFO [de-gcs-connector|task-3] [Consumer clientId=connector-consumer-de-gcs-connector-3, groupId=connect-de-gcs-connector] Seeking to offset 21927 for partition loan_logs-9 (org.apache.kafka.clients.consumer.KafkaConsumer) [task-thread-de-gcs-connector-3]
2024-02-16 10:31:50,545 INFO [de-gcs-connector|task-3] [Consumer clientId=connector-consumer-de-gcs-connector-3, groupId=connect-de-gcs-connector] Seeking to offset 22487 for partition loan_logs-8 (org.apache.kafka.clients.consumer.KafkaConsumer) [task-thread-de-gcs-connector-3]
2024-02-16 10:31:50,545 ERROR [de-gcs-connector|task-3] WorkerSinkTask{id=de-gcs-connector-3} Commit of offsets threw an unexpected exception for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-de-gcs-connector-3]
org.apache.kafka.connect.errors.ConnectException: java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Schema.type()" because the return value of "org.apache.kafka.connect.data.Schema.keySchema()" is null
at io.aiven.kafka.connect.gcs.GcsSinkTask.flushFile(GcsSinkTask.java:131)
at java.base/java.util.HashMap.forEach(HashMap.java:1421)
at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1553)
at io.aiven.kafka.connect.gcs.GcsSinkTask.flush(GcsSinkTask.java:114)
at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:407)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:377)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Schema.type()" because the return value of "org.apache.kafka.connect.data.Schema.keySchema()" is null
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:894)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:772)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:757)
at io.aiven.kafka.connect.common.output.SinkSchemaBuilder.headersSchema(SinkSchemaBuilder.java:153)
at io.aiven.kafka.connect.common.output.SinkSchemaBuilder.outputFieldSchema(SinkSchemaBuilder.java:166)
at io.aiven.kafka.connect.common.output.SinkSchemaBuilder.avroSchemaFor(SinkSchemaBuilder.java:97)
at io.aiven.kafka.connect.common.output.SinkSchemaBuilder.buildSchema(SinkSchemaBuilder.java:86)
at io.aiven.kafka.connect.common.output.parquet.ParquetOutputWriter.writeRecords(ParquetOutputWriter.java:62)
at io.aiven.kafka.connect.gcs.GcsSinkTask.flushFile(GcsSinkTask.java:129)
... 16 more
2024-02-16 10:31:50,545 ERROR [de-gcs-connector|task-0] WorkerSinkTask{id=de-gcs-connector-0} Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-de-gcs-connector-0]
org.apache.kafka.connect.errors.ConnectException: java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Schema.type()" because the return value of "org.apache.kafka.connect.data.Schema.keySchema()" is null
at io.aiven.kafka.connect.gcs.GcsSinkTask.flushFile(GcsSinkTask.java:131)
at java.base/java.util.HashMap.forEach(HashMap.java:1421)
at java.base/java.util.Collections$UnmodifiableMap.forEach(Collections.java:1553)
at io.aiven.kafka.connect.gcs.GcsSinkTask.flush(GcsSinkTask.java:114)
at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:407)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:377)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:221)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Schema.type()" because the return value of "org.apache.kafka.connect.data.Schema.keySchema()" is null
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:894)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:772)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:757)
at io.aiven.kafka.connect.common.output.SinkSchemaBuilder.headersSchema(SinkSchemaBuilder.java:153)
at io.aiven.kafka.connect.common.output.SinkSchemaBuilder.outputFieldSchema(SinkSchemaBuilder.java:166)
at io.aiven.kafka.connect.common.output.SinkSchemaBuilder.avroSchemaFor(SinkSchemaBuilder.java:97)
at io.aiven.kafka.connect.common.output.SinkSchemaBuilder.buildSchema(SinkSchemaBuilder.java:86)
at io.aiven.kafka.connect.common.output.parquet.ParquetOutputWriter.writeRecords(ParquetOutputWriter.java:62)
at io.aiven.kafka.connect.gcs.GcsSinkTask.flushFile(GcsSinkTask.java:129)
... 16 more
At the time we did not employ a schema registry, was that a problem?
Though we used both org.apache.kafka.connect.storage.StringConverter
for key.converter
and value.converter
, is it necessary to have schema inside a message as well?
Interesting. In this case it's not failing because of input value conversion, but building the output struct based on:
"format.output.fields": "key,value,offset,timestamp,headers",
"format.output.type": "parquet",
"format.output.envelope": "true",
Could you try without headers
? As it's failing when building the schema for headers.
@jeqo I tried to set header.converter
to use org.apache.kafka.connect.storage.StringConverter
. It seemed work. The default header converter from kafka connect is org.apache.kafka.connect.storage.SimpleHeaderConverter
, right? Does that mean the problem is with this converter?
According to this, schema is inferred, so it is not necessary to provide schema beforehand, do I get this right?
If in one topic one message has header and the other has not, would it be a problem with this SimpleHeaderConverter?
Not a problem with the converter per se, but with how the Connector is expecting the headers to be typed as.
The connector is building a schema per batch of records to write to GCS. This schema is inferred from the first record.
For headers, the schema is either null or a map typed by the first element in the first record headers. If another header element is typed differently, the connector will fail.
So, if you have headers typed differently, I can see how using String converter would help your pipeline to work as it can build the schema properly typing everything to string.
Looking further into this: If the type changes between records, it may fail when building the Avro/Parquet output or miss headers. This will require further testing.
An improvement path that could be explore is to base the map type not only on the first record in the batch but in all records, so we can guarantee that defining the schema will either fail per batch or be valid for all records. This will be a bit more expensive but improve safety and data quality.
Let me know if the current workaround is good enough for you. I can create another issue for header schema improvements.
thanks @jeqo for your answer. yes current workaround is good enough for now
Will close this one then, and follow up the investigation here Aiven-Open/cloud-storage-connectors-for-apache-kafka#227