There are many connector developed by other party. This demo will use debezium, a MySql connector, as an example. The image that we use already comes with the connector plug-in. Otherwise, we need to download the connector jar, and place it in the designated folder.
- Data Set is from mysql (https://github.com/datacharmer/test_db)
When you start your Connect workers, each worker discovers all connectors, transforms, and converter plugins found inside the directories on the plugin path. When you use a connector, transform, or converter, the Connect worker loads the classes from the respective plugin first, followed by the Kafka Connect runtime and Java libraries. Connect explicitly avoids all of the libraries in other plugins. This prevents conflicts and makes it very easy to add and use connectors and transforms developed by different providers. (https://docs.confluent.io/platform/current/connect/userguide.html)
Start up the docker container for the MySql server, and Copy necessary file to container.
docker run --rm --name mysql -p 3306:3306 \
-v $PWD/mysql.cnf:/etc/mysql/conf.d/mysql.cnf \
-v $PWD/test.sql:/test.sql \
-e MYSQL_ROOT_PASSWORD=password mysql
Import the data using the sample data for the kafka connect usage.
mysql -u root -p < 'file of sql'
This is just simply docker compose the file.
- List Topic
kafka-topics --bootstrap-server localhost:9092 --list
- Describe Topic
kafka-topics --topic {someTopic} --bootstrap-server localhost:9092 --describe
- Check Partition Offset
kafka-consumer-groups --bootstrap-server localhost:9092 --group {group} --describe
kafka-consumer-groups --bootstrap-server localhost:9092 --all-groups --describe
- Create Topic
kafka-topics --bootstrap-server localhost:9092 --topic {topic} --create --replication-factor 1 --partitions 6
- Produce Record
kafka-console-producer --bootstrap-server localhost:9092 --topic {topic} --property key.separator=: --property parse.key=true
- Consume Topic
kafka-console-consumer --bootstrap-server localhost:9092 --topic {topic} --from-beginning
- Add New Connector Task (Specific to debezium)
curl POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "password",
"database.server.id": "123456",
"database.server.name": "mysql",
"database.include.list": "employees",
"table.include.list": "employees.employees",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes",
"database.allowPublicKeyRetrieval":"true"
}
}'
- Update the exisiting config (Specific to debezium)
curl PUT 'http://localhost:8083/connectors/{name of the connector}/config' \
--header 'Content-Type: application/json' \
--data-raw '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "password",
"database.server.id": "123456",
"database.server.name": "mysql",
"database.include.list": "employees",
"table.include.list": "employees.departments, employees.employees",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes",
"database.allowPublicKeyRetrieval": "true"
}'
- Get Status of the Connector
curl GET 'http://localhost:8083/connectors/{name of the connector}/stauts'