lensesio/stream-reactor

Support gzip compression for S3 source connector

Opened this issue · 0 comments

What version of the Stream Reactor are you reporting this issue for?

7.4.1

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

Yes. We are running Kafka version 3.6.0

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

Yes. We are using S3 as source.

Have you read the docs?

Yes.

What is the expected behaviour?

If files in S3 are in compressed format (eg. gzip/snappy, etc) then it should decompress the data and send it to kafka. Otherwise, if the data is already uncompressed, it should send it as is.

What was observed?

If the data is in gzipped format, for example, JSON gzip, it is unable to send data correctly. It tries to use the json parsing on the blob it gets from S3 and fails.

What is your Connect cluster configuration (connect-avro-distributed.properties)?

We use MSK connect. Our corresponding worker config is

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
consumer.auto.offset.reset=latest
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

What is your connector properties configuration (my-connector.properties)?

connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
topics=sinkConnectorParams.topicRegex
tasks.max=8
connect.s3.aws.region=us-east-2
connect.s3.compression.codec=GZIP
connect.s3.kcql=insert into ${topicName} select * from ${bucketName}:${prefix} STOREAS \`JSON\` COMPRESSION \`GZIP\`
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
errors.log.enable=true
connect.s3.aws.auth.mode=Default