/kafka-poc

a poc for kafka

Primary LanguageDockerfile

Debezium Unwrap SMT Demo

This setup is going to demonstrate how to receive events from MySQL database and stream them down to a PostgreSQL database and/or an Elasticsearch server using the Debezium Event Flattening SMT.

Table of Contents

JDBC Sink

Topology

                   +-------------+
                   |             |
                   |    MySQL    |
                   |             |
                   +------+------+
                          |
                          |
                          |
          +---------------v------------------+
          |                                  |
          |           Kafka Connect          |
          |  (Debezium, JDBC connectors)     |
          |                                  |
          +---------------+------------------+
                          |
                          |
                          |
                          |
                  +-------v--------+
                  |                |
                  |   PostgreSQL   |
                  |                |
                  +----------------+


We are using Docker Compose to deploy following components

  • MySQL
  • Kafka
    • ZooKeeper
    • Kafka Broker
    • Kafka Connect with Debezium and JDBC Connectors
  • PostgreSQL

Usage

How to run:

# Start the application
export DEBEZIUM_VERSION=0.8
docker-compose -f docker-compose-jdbc.yaml up

# Start PostgreSQL connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json

# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json

Check contents of the MySQL database:

docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+

Verify that the PostgreSQL database has the same content:

docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email         
-----------+------+------------+-----------------------
 Thomas    | 1001 | Sally      | sally.thomas@acme.com
 Bailey    | 1002 | George     | gbailey@foobar.com
 Walker    | 1003 | Edward     | ed@walker.com
 Kretchmar | 1004 | Anne       | annek@noanswer.org
(4 rows)

New record

Insert a new record into MySQL;

docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'
mysql> insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');
Query OK, 1 row affected (0.02 sec)

Verify that PostgreSQL contains the new record:

docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email         
-----------+------+------------+-----------------------
...
Doe        | 1005 | John       | john.doe@example.com
(5 rows)

Record update

Update a record in MySQL:

mysql> update customers set first_name='Jane', last_name='Roe' where last_name='Doe';
Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0

Verify that record in PostgreSQL is updated:

docker-compose-f docker-compose-jdbc.yaml  exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email         
-----------+------+------------+-----------------------
...
Roe        | 1005 | Jane       | john.doe@example.com
(5 rows)

End application:

# Shut down the cluster
docker-compose -f docker-compose-jdbc.yaml down

Elasticsearch Sink

Topology

                   +-------------+
                   |             |
                   |    MySQL    |
                   |             |
                   +------+------+
                          |
                          |
                          |
          +---------------v------------------+
          |                                  |
          |           Kafka Connect          |
          |    (Debezium, ES connectors)     |
          |                                  |
          +---------------+------------------+
                          |
                          |
                          |
                          |
                  +-------v--------+
                  |                |
                  | Elasticsearch  |
                  |                |
                  +----------------+


We are using Docker Compose to deploy the following components:

  • MySQL
  • Kafka
  • Elasticsearch

Usage

How to run:

# Start the application

export DEBEZIUM_VERSION=0.8
docker-compose -f docker-compose-es.yaml up

# Start Elasticsearch connector

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @es-sink.json

# Start MySQL connector

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json

Check contents of the MySQL database:

docker-compose -f docker-compose-es.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+

Verify that Elasticsearch has the same content:

curl 'http://localhost:9200/customers/_search?pretty'
{
  "took" : 42,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 4,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1001",
        "_score" : 1.0,
        "_source" : {
          "id" : 1001,
          "first_name" : "Sally",
          "last_name" : "Thomas",
          "email" : "sally.thomas@acme.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1004",
        "_score" : 1.0,
        "_source" : {
          "id" : 1004,
          "first_name" : "Anne",
          "last_name" : "Kretchmar",
          "email" : "annek@noanswer.org"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1002",
        "_score" : 1.0,
        "_source" : {
          "id" : 1002,
          "first_name" : "George",
          "last_name" : "Bailey",
          "email" : "gbailey@foobar.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1003",
        "_score" : 1.0,
        "_source" : {
          "id" : 1003,
          "first_name" : "Edward",
          "last_name" : "Walker",
          "email" : "ed@walker.com"
        }
      }
    ]
  }
}

New record

Insert a new record into MySQL:

docker-compose -f docker-compose-es.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'
mysql> insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');
Query OK, 1 row affected (0.02 sec)

Check that Elasticsearch contains the new record:

curl 'http://localhost:9200/customers/_search?pretty'
...
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1005",
        "_score" : 1.0,
        "_source" : {
          "id" : 1005,
          "first_name" : "John",
          "last_name" : "Doe",
          "email" : "john.doe@example.com"
        }
      }
...

Record update

Update a record in MySQL:

mysql> update customers set first_name='Jane', last_name='Roe' where last_name='Doe';
Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0

Verify that record in Elasticsearch is updated:

curl 'http://localhost:9200/customers/_search?pretty'
...
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1005",
        "_score" : 1.0,
        "_source" : {
          "id" : 1005,
          "first_name" : "Jane",
          "last_name" : "Roe",
          "email" : "john.doe@example.com"
        }
      }
...

End the application:

# Shut down the cluster
docker-compose -f docker-compose-es.yaml down

Two Parallel Sinks

Topology

                   +-------------+
                   |             |
                   |    MySQL    |
                   |             |
                   +------+------+
                          |
                          |
                          |
          +---------------v------------------+
          |                                  |
          |           Kafka Connect          |
          |  (Debezium, JDBC, ES connectors) |
          |                                  |
          +---+-----------------------+------+
              |                       |
              |                       |
              |                       |
              |                       |
+-------------v--+                +---v---------------+
|                |                |                   |
|   PostgreSQL   |                |   ElasticSearch   |
|                |                |                   |
+----------------+                +-------------------+

We are using Docker Compose to deploy the following components:

Usage

How to run:

# Start the application
export DEBEZIUM_VERSION=0.8
export DEBEZIUM_VERSION=1.0
docker-compose up

# Start Elasticsearch connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @es-sink.json

# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json

Now you can execute commands as defined in the sections for JDBC and Elasticsearch sinks and you can verify that inserts and updates are present in both sinks.

End the application:

# Shut down the cluster
docker-compose down
sp_changedbowner 'sa'
GO
EXEC sys.sp_cdc_enable_db
GO
USE AdventureWorks2017 
GO 
EXEC sys.sp_cdc_enable_table 
@source_schema = N'HumanResources', 
@source_name   = N'EmployeePayHistory', 
@role_name     = NULL 
GO
docker exec -it <confluentinc/cp-ksql-cli-hash> bash
ksql http://ksql-server:8088
CREATE STREAM employee_from_adventureworks_1 (BusinessEntityID integer, JobTitle string, SickLeaveHours integer, VacationHours integer) WITH (KAFKA_TOPIC='adventrueworks.HumanResources.Employee',VALUE_FORMAT='json');
LIST TOPICS;
ksql> LIST STREAMS;

 Stream Name                    | Kafka Topic                            | Format 
----------------------------------------------------------------------------------
 EMPLOYEE_FROM_ADVENTUREWORKS_1 | adventrueworks.HumanResources.Employee | JSON   
----------------------------------------------------------------------------------

SELECT * FROM EMPLOYEE_FROM_ADVENTUREWORKS_1;
SELECT * FROM EMPLOYEE_FROM_ADVENTUREWORKS_1 WHERE JobTitle = 'Chief Executive Officer'

Then modify the table for SickDays from 69 to 68

you should see the following stream result

1573527391053 | {"BusinessEntityID":1} | 1 | Chief Executive Officer | 69 | 99
1573539035402 | {"BusinessEntityID":1} | 1 | Chief Executive Officer | 68 | 99