lensesio/stream-reactor

S3 Sink connector reports consumer lag of 1 despite processing all records

jamielwhite opened this issue · 2 comments

Issue Guidelines

What version of the Stream Reactor are you reporting this issue for?

6.3.0

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

Yes (Kafka 3.6.0, Confluent 7.6.0)

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

Yes (S3 sink 6.3.0)

Have you read the docs?

Yes

What is the expected behaviour?

I expect the Kafka consumer group to report a lag of 0 once it has processed all records in the topic.

What was observed?

The consumer group lag remained at 1 once it caught up to new messages. When I read the files from S3, it had written the latest message on the topic. So the connector appears to be processing all of the messages but not committing the offsets how I'd expect.

➜  ~ kafka-consumer-groups --bootstrap-server localhost:19092 --describe --group connect-backup-s3-sink

GROUP                  TOPIC               PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        
connect-backup-s3-sink test_backup_topic_2 0          131             132             1            
connect-backup-s3-sink test_backup_topic_1 0          199             200             1  

What is your Connect cluster configuration (connect-avro-distributed.properties)?

group.id=test-kafka-connect
status.storage.replication.factor=1
key.converter=io.confluent.connect.avro.AvroConverter
config.storage.topic=connect-config
offset.storage.replication.factor=1
plugin.path=/usr/share/java/plugins
offset.storage.topic=connect-offsets
bootstrap.servers=kafka:9092
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
rest.advertised.host.name=localhost
rest.port=8083
status.storage.topic=connect-status
value.converter.schema.registry.url=http://schema-registry:8081
config.storage.replication.factor=1

What is your connector properties configuration (my-connector.properties)?

{
  "name": "backup-s3-sink",
  "config": {
    "connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
    "connect.s3.kcql": "INSERT INTO my-bucket-name SELECT * FROM `*` STOREAS `AVRO` WITH_FLUSH_INTERVAL = 30 PROPERTIES('store.envelope'=true)",
    "connect.s3.custom.endpoint": "http://localstack:4566",
    "connect.s3.vhost.bucket": true,
    "topics": "test_backup_topic_1,test_backup_topic_2"
  }
}

+1, I'm seeing this as well on 6.3.0.

Seeing the same behaviour too on version 6.3.0.