confluentinc/kafka-connect-datagen

Implement dockerized kafka sink connector to mongo

mostafaghadimi opened this issue · 1 comments

Hi @rhauch and @ybyzek,

I am trying to implement kafka connection to mongodb and mysql using docker.

What I want is the following figure:

Kafka connect as sink to MongoDB and MySQL

Kafka Connect MongoDB:

I have seen the docker-compose of official mongodb repository. It has two problems:

  1. It is too complicated for my purpose. Because it has run multiple containers of mongodb and also used many images that consume so much resources.

  2. It has some issues that isn't solved which end in malfunctioning of kafka to mongodb connection. Here you can see my issue.

What I have implemented in docker-compose.yml using debezium for connection is the following:

version: '3.2'
services:
  kafka:
    image: wurstmeister/kafka:latest
    ports:
      - target: 9094
        published: 9094
        protocol: tcp
        mode: host
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_LOG_DIRS: /kafka/logs
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - kafka:/kafka
  
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    volumes:
      - zookeeper:/opt/zookeeper-3.4.13
  
  mongo:
    image: mongo
    container_name: mongo
    ports:
      - 27017:27017

  connect:
    image: debezium/connect
    container_name: connect
    ports:
      - 8083:8083
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets

volumes:
  kafka:
  zookeeper:

As @cricket_007 says, I should not use debezium for my purpose. So I have used the confluentinc/kafka-connect-datagen image. Here I have added the following to the docker-compose.yml file instead of debezium:

connect:
    image: confluentinc/kafka-connect-datagen
    build:
      context: .
      dockerfile: Dockerfile
    hostname: connect
    container_name: connect
    depends_on: 
      - zookeeper
    ports: 
      - 8083:8083
    environment: 
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      # Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.2.2 Connect image
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.2.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
    command: "bash -c 'if [ ! -d /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen ]; then echo \"WARNING: Did not find directory for kafka-connect-datagen (did you remember to run: docker-compose up -d --build ?)\"; fi ; /etc/confluent/docker/run'"
    volumes:
      - ../build/confluent/kafka-connect-mongodb:/usr/share/confluent-hub-components/kafka-connect-mongodb

Dockerfile:

FROM confluentinc/cp-kafka-connect
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN  confluent-hub install --no-prompt confluentinc/kafka-connect-datagen

Problem:

  1. The Kafka-connect-datagen image generates fake data and as it mentioned in [the repository], it's not suitable for production. What I want is just connect Kafka to mongodb, neither less nor more than it.

  2. I face with the CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL is required. error. What should I have to do to resolve this problem and test my connection?

I'm looking forward to hearing from you.

Thanks

Hello @mostafaghadimi

Since this "issue" seems not to be with this repo, as you're asking for a tutorial, I'll answer what I can here.

Problem:

The Kafka-connect-datagen image generates fake data and as it mentioned in [the repository], it's not suitable for production. What I want is just connect Kafka to mongodb, neither less nor more than it.

If you don't want fake data, then you will want to write your own Kafka producer code or use a different Kafka Connect source (such as Debezium)

I face with the CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL is required. error

You set CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter, but that AvroConverter requires you to set the Schema Registry.

You don't have a Schema Registry container, so I assume you aren't even using Avro and you need to change your converters to not use Avro... e.g. If you want all connectors to deserialize JSON data

  CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
  CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter

As far as Mongo is concerned, that is not an issue for here. They have their own support forums, and I've tried to help as much as I can on StackOverflow.

Cheers!