lensesio/stream-reactor

Sink__consumer_offsets topic into the S3 bucket using S3 Lenses Plugins

cchidhu opened this issue · 2 comments

Issue Guidelines

We're attempting to use the S3 Connector to back up our Kafka Cluster for Disaster Recovery purposes. However, we're encountering an issue when trying to sink the __consumer_offsets topic into the S3 bucket. We've tried different configurations for the S3 Connector for this topic, but it continues to fail due to the binary format of the data. We're unsure which value converter to use for this topic. We've included the connector configuration and a trace from the SinkTask for reference. Can you please provide some advice on how to resolve this issue?

Please review these questions before submitting any issue?

What version of the Stream Reactor are you reporting this issue for?

2-8-4.0.0

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

Yes

Do you have a supported version of the data source/sink .i.e Cassadra 3.0.9?

Yes, S3 Sink Connector

Have you read the docs? Yes

What is the expected behaviour? __consumer_offset topic data should be sinked into the S3 bucket.

What was observed? SinkTask is throwing exception while sinking the binary data from __consumner_offset topic

What is your Connect cluster configuration (connect-avro-distributed.properties)?

What is your connector properties configuration (my-connector.properties)?

image

Please provide full log files (redact and sensitive information)

image

This is on our roadmap to provide a solution in the future.

I managed to backup __consumer_offsets with the following connector configuration:
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector connect.s3.errors.log.include.messages=true tasks.max=2 topics=__consumer_offsets connect.s3.aws.auth.mode=Credentials connect.s3.aws.access.key=... flush.count=50000 connect.s3.kcql=INSERT INTO msk-s3-xxxxxx-consumer:my_workload SELECT * FROM __consumer_offsets STOREAS AVRO PROPERTIES('store.envelope'=true, 'store.envelope.key'= true, 'store.envelope.headers'=true, 'store.envelope.value'=true, 'store.envelope.metadata'= true) connect.s3.aws.region=eu-central-1 connect.s3.aws.secret.key=... value.converter=org.apache.kafka.connect.converters.ByteArrayConverter flush.interval=200 errors.log.enable=true key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

However, __consumer_offsets is a special topic, and Source Connector writing data back from s3 to kafka throws org.apache.kafka.common.errors.InvalidTopicException: The request attempted to perform an operation on an invalid topic.