Sink__consumer_offsets topic into the S3 bucket using S3 Lenses Plugins
cchidhu opened this issue · 2 comments
Issue Guidelines
We're attempting to use the S3 Connector to back up our Kafka Cluster for Disaster Recovery purposes. However, we're encountering an issue when trying to sink the __consumer_offsets topic into the S3 bucket. We've tried different configurations for the S3 Connector for this topic, but it continues to fail due to the binary format of the data. We're unsure which value converter to use for this topic. We've included the connector configuration and a trace from the SinkTask for reference. Can you please provide some advice on how to resolve this issue?
Please review these questions before submitting any issue?
What version of the Stream Reactor are you reporting this issue for?
2-8-4.0.0
Are you running the correct version of Kafka/Confluent for the Stream reactor release?
Yes
Do you have a supported version of the data source/sink .i.e Cassadra 3.0.9?
Yes, S3 Sink Connector
Have you read the docs? Yes
What is the expected behaviour? __consumer_offset topic data should be sinked into the S3 bucket.
What was observed? SinkTask is throwing exception while sinking the binary data from __consumner_offset topic
What is your Connect cluster configuration (connect-avro-distributed.properties)?
What is your connector properties configuration (my-connector.properties)?
Please provide full log files (redact and sensitive information)
This is on our roadmap to provide a solution in the future.
I managed to backup __consumer_offsets with the following connector configuration:
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector connect.s3.errors.log.include.messages=true tasks.max=2 topics=__consumer_offsets connect.s3.aws.auth.mode=Credentials connect.s3.aws.access.key=... flush.count=50000 connect.s3.kcql=INSERT INTO msk-s3-xxxxxx-consumer:my_workload SELECT * FROM __consumer_offsets STOREAS
AVRO PROPERTIES('store.envelope'=true, 'store.envelope.key'= true, 'store.envelope.headers'=true, 'store.envelope.value'=true, 'store.envelope.metadata'= true) connect.s3.aws.region=eu-central-1 connect.s3.aws.secret.key=... value.converter=org.apache.kafka.connect.converters.ByteArrayConverter flush.interval=200 errors.log.enable=true key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
However, __consumer_offsets is a special topic, and Source Connector writing data back from s3 to kafka throws org.apache.kafka.common.errors.InvalidTopicException: The request attempted to perform an operation on an invalid topic.