This setup is going to demonstrate how to receive events from MySQL database and stream them down to a PostgreSQL database and/or an Elasticsearch server using the Debezium Event Flattening SMT.
+-------------+
| |
| MySQL |
| |
+------+------+
|
|
|
+---------------v------------------+
| |
| Kafka Connect |
| (Debezium, JDBC connectors) |
| |
+---------------+------------------+
|
|
|
|
+-------v--------+
| |
| PostgreSQL |
| |
+----------------+
We are using Docker Compose to deploy following components
How to run:
# Start the application
export DEBEZIUM_VERSION=0.8
docker-compose -f docker-compose-jdbc.yaml up
# Start PostgreSQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json
# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.jsonCheck contents of the MySQL database:
docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+Verify that the PostgreSQL database has the same content:
docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
Thomas | 1001 | Sally | sally.thomas@acme.com
Bailey | 1002 | George | gbailey@foobar.com
Walker | 1003 | Edward | ed@walker.com
Kretchmar | 1004 | Anne | annek@noanswer.org
(4 rows)Insert a new record into MySQL;
docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
mysql> insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');
Query OK, 1 row affected (0.02 sec)Verify that PostgreSQL contains the new record:
docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
...
Doe | 1005 | John | john.doe@example.com
(5 rows)Update a record in MySQL:
mysql> update customers set first_name='Jane', last_name='Roe' where last_name='Doe';
Query OK, 1 row affected (0.02 sec)
Rows matched: 1 Changed: 1 Warnings: 0Verify that record in PostgreSQL is updated:
docker-compose-f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
...
Roe | 1005 | Jane | john.doe@example.com
(5 rows)End application:
# Shut down the cluster
docker-compose -f docker-compose-jdbc.yaml down +-------------+
| |
| MySQL |
| |
+------+------+
|
|
|
+---------------v------------------+
| |
| Kafka Connect |
| (Debezium, ES connectors) |
| |
+---------------+------------------+
|
|
|
|
+-------v--------+
| |
| Elasticsearch |
| |
+----------------+
We are using Docker Compose to deploy the following components:
- MySQL
- Kafka
- ZooKeeper
- Kafka Broker
- Kafka Connect with Debezium and Elasticsearch Connectors
- Elasticsearch
How to run:
# Start the application
export DEBEZIUM_VERSION=0.8
docker-compose -f docker-compose-es.yaml up
# Start Elasticsearch connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @es-sink.json
# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json
Check contents of the MySQL database:
docker-compose -f docker-compose-es.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+Verify that Elasticsearch has the same content:
curl 'http://localhost:9200/customers/_search?pretty'
{
"took" : 42,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 4,
"max_score" : 1.0,
"hits" : [
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1001",
"_score" : 1.0,
"_source" : {
"id" : 1001,
"first_name" : "Sally",
"last_name" : "Thomas",
"email" : "sally.thomas@acme.com"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1004",
"_score" : 1.0,
"_source" : {
"id" : 1004,
"first_name" : "Anne",
"last_name" : "Kretchmar",
"email" : "annek@noanswer.org"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1002",
"_score" : 1.0,
"_source" : {
"id" : 1002,
"first_name" : "George",
"last_name" : "Bailey",
"email" : "gbailey@foobar.com"
}
},
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1003",
"_score" : 1.0,
"_source" : {
"id" : 1003,
"first_name" : "Edward",
"last_name" : "Walker",
"email" : "ed@walker.com"
}
}
]
}
}
Insert a new record into MySQL:
docker-compose -f docker-compose-es.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
mysql> insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');
Query OK, 1 row affected (0.02 sec)Check that Elasticsearch contains the new record:
curl 'http://localhost:9200/customers/_search?pretty'
...
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1005",
"_score" : 1.0,
"_source" : {
"id" : 1005,
"first_name" : "John",
"last_name" : "Doe",
"email" : "john.doe@example.com"
}
}
...Update a record in MySQL:
mysql> update customers set first_name='Jane', last_name='Roe' where last_name='Doe';
Query OK, 1 row affected (0.02 sec)
Rows matched: 1 Changed: 1 Warnings: 0Verify that record in Elasticsearch is updated:
curl 'http://localhost:9200/customers/_search?pretty'
...
{
"_index" : "customers",
"_type" : "customer",
"_id" : "1005",
"_score" : 1.0,
"_source" : {
"id" : 1005,
"first_name" : "Jane",
"last_name" : "Roe",
"email" : "john.doe@example.com"
}
}
...End the application:
# Shut down the cluster
docker-compose -f docker-compose-es.yaml down +-------------+
| |
| MySQL |
| |
+------+------+
|
|
|
+---------------v------------------+
| |
| Kafka Connect |
| (Debezium, JDBC, ES connectors) |
| |
+---+-----------------------+------+
| |
| |
| |
| |
+-------------v--+ +---v---------------+
| | | |
| PostgreSQL | | ElasticSearch |
| | | |
+----------------+ +-------------------+
We are using Docker Compose to deploy the following components:
- MySQL
- Kafka
- ZooKeeper
- Kafka Broker
- Kafka Connect with Debezium, JDBC and Elasticsearch Connectors
- PostgreSQL
- Elasticsearch
How to run:
# Start the application
export DEBEZIUM_VERSION=0.8
export DEBEZIUM_VERSION=1.0
docker-compose up
# Start Elasticsearch connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @es-sink.json
# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.jsonNow you can execute commands as defined in the sections for JDBC and Elasticsearch sinks and you can verify that inserts and updates are present in both sinks.
End the application:
# Shut down the cluster
docker-compose downsp_changedbowner 'sa'
GO
EXEC sys.sp_cdc_enable_db
GO
USE AdventureWorks2017
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'HumanResources',
@source_name = N'EmployeePayHistory',
@role_name = NULL
GO
docker exec -it <confluentinc/cp-ksql-cli-hash> bash
ksql http://ksql-server:8088
CREATE STREAM employee_from_adventureworks_1 (BusinessEntityID integer, JobTitle string, SickLeaveHours integer, VacationHours integer) WITH (KAFKA_TOPIC='adventrueworks.HumanResources.Employee',VALUE_FORMAT='json');
LIST TOPICS;
ksql> LIST STREAMS;
Stream Name | Kafka Topic | Format
----------------------------------------------------------------------------------
EMPLOYEE_FROM_ADVENTUREWORKS_1 | adventrueworks.HumanResources.Employee | JSON
----------------------------------------------------------------------------------
SELECT * FROM EMPLOYEE_FROM_ADVENTUREWORKS_1;
SELECT * FROM EMPLOYEE_FROM_ADVENTUREWORKS_1 WHERE JobTitle = 'Chief Executive Officer'
Then modify the table for SickDays from 69 to 68
you should see the following stream result
1573527391053 | {"BusinessEntityID":1} | 1 | Chief Executive Officer | 69 | 99
1573539035402 | {"BusinessEntityID":1} | 1 | Chief Executive Officer | 68 | 99