Description
A Scala App writes avro records to kafka using Kafka Schema Registry, then we use Kafka Connect to read data from Kafka and sink the data in S3. We use MinIO docker image to mock S3.
-
First publish the image of your Kafka producer
sbt docker:publishLocal
and make sure the image was published
docker images | grep "producer-app"
-
Bring the Docker Compose up
docker-compose up -d
-
Look at the Kafka-producer/Schema-registry integration
curl -s "http://localhost:8081/subjects/test-value/versions/latest" | jq
-
Make sure everything is up and running
$ docker-compose ps Name Command State Ports --------------------------------------------------------------------------------------------- kafka /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp kafka-connect bash -c # Up (healthy) 0.0.0.0:8083->8083/tcp, 9092/tcp echo "Installing ... kafka-producer /opt/docker/bin/producer-app Up 8080/tcp schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp zookeeper /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
-
Create the S3 Sink connector
curl -i -X PUT -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/sink-s3-test/config \ -d ' { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "tasks.max": "1", "topics": "test", "store.url": "http://minio:9000", "s3.region": "eu-west-1", "s3.bucket.name": "test", "flush.size": "10", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE", "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner" } '
-
You can check the output of kafka-connect by browsing, using
login: minioadmin
andpassword: minioadmin
http://localhost:9000/
-
When done testing, clean the stack
docker-compose down -v --rmi all --remove-orphans
References