S3 Sink connector reports consumer lag of 1 despite processing all records
jamielwhite opened this issue · 2 comments
jamielwhite commented
Issue Guidelines
What version of the Stream Reactor are you reporting this issue for?
6.3.0
Are you running the correct version of Kafka/Confluent for the Stream reactor release?
Yes (Kafka 3.6.0, Confluent 7.6.0)
Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?
Yes (S3 sink 6.3.0)
Have you read the docs?
Yes
What is the expected behaviour?
I expect the Kafka consumer group to report a lag of 0 once it has processed all records in the topic.
What was observed?
The consumer group lag remained at 1 once it caught up to new messages. When I read the files from S3, it had written the latest message on the topic. So the connector appears to be processing all of the messages but not committing the offsets how I'd expect.
➜ ~ kafka-consumer-groups --bootstrap-server localhost:19092 --describe --group connect-backup-s3-sink
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
connect-backup-s3-sink test_backup_topic_2 0 131 132 1
connect-backup-s3-sink test_backup_topic_1 0 199 200 1
What is your Connect cluster configuration (connect-avro-distributed.properties)?
group.id=test-kafka-connect
status.storage.replication.factor=1
key.converter=io.confluent.connect.avro.AvroConverter
config.storage.topic=connect-config
offset.storage.replication.factor=1
plugin.path=/usr/share/java/plugins
offset.storage.topic=connect-offsets
bootstrap.servers=kafka:9092
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
rest.advertised.host.name=localhost
rest.port=8083
status.storage.topic=connect-status
value.converter.schema.registry.url=http://schema-registry:8081
config.storage.replication.factor=1
What is your connector properties configuration (my-connector.properties)?
{
"name": "backup-s3-sink",
"config": {
"connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
"connect.s3.kcql": "INSERT INTO my-bucket-name SELECT * FROM `*` STOREAS `AVRO` WITH_FLUSH_INTERVAL = 30 PROPERTIES('store.envelope'=true)",
"connect.s3.custom.endpoint": "http://localstack:4566",
"connect.s3.vhost.bucket": true,
"topics": "test_backup_topic_1,test_backup_topic_2"
}
}
brandon-powers commented
+1, I'm seeing this as well on 6.3.0.
JKCai commented
Seeing the same behaviour too on version 6.3.0.