Publishing data to Kafka and then streaming the data from Kafka to S3 using Kafka Connect

 
 

Description

A Scala App writes avro records to kafka using Kafka Schema Registry, then we use Kafka Connect to read data from Kafka and sink the data in S3. We use MinIO docker image to mock S3.

 

  1. First publish the image of your Kafka producer

    sbt docker:publishLocal

    and make sure the image was published

    docker images | grep "producer-app"
  2. Bring the Docker Compose up

    docker-compose up -d
  3. Look at the Kafka-producer/Schema-registry integration

    curl -s "http://localhost:8081/subjects/test-value/versions/latest" | jq
  4. Make sure everything is up and running

    $ docker-compose ps
         Name                  Command               State                    Ports
    ---------------------------------------------------------------------------------------------
    kafka            /etc/confluent/docker/run     Up             0.0.0.0:9092->9092/tcp
    kafka-connect    bash -c #                     Up (healthy)   0.0.0.0:8083->8083/tcp, 9092/tcp
                     echo "Installing ...
    kafka-producer   /opt/docker/bin/producer-app  Up             8080/tcp
    schema-registry  /etc/confluent/docker/run     Up             0.0.0.0:8081->8081/tcp
    zookeeper        /etc/confluent/docker/run     Up             2181/tcp, 2888/tcp, 3888/tcp
  5. Create the S3 Sink connector

    curl -i -X PUT -H "Accept:application/json" \
        -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-s3-test/config \
        -d '
     {
            "connector.class": "io.confluent.connect.s3.S3SinkConnector",
            "key.converter":"org.apache.kafka.connect.storage.StringConverter",
            "tasks.max": "1",
            "topics": "test",
            "store.url": "http://minio:9000",
            "s3.region": "eu-west-1",
            "s3.bucket.name": "test",
            "flush.size": "10",
            "storage.class": "io.confluent.connect.s3.storage.S3Storage",
            "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
            "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
            "schema.compatibility": "NONE",
            "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner"
            }
    '
  6. You can check the output of kafka-connect by browsing, using login: minioadmin and password: minioadmin

    http://localhost:9000/
  7. When done testing, clean the stack

    docker-compose down -v --rmi all --remove-orphans

References