Debezium is a distributed platform used to implement Change Data Capture (a.k.a CDC) from SQL and NoSQL databases into event streams, enabling applications to detect and immediately respond to row-level changes in the databases. Debezium is built on top of Apache Kafka and provides a set of Kafka Connect compatible connectors. Each of the connectors works with a specific database.
In this project, I implement the Debezium service programmatically, and run via Docker a MySQL database server with an example table in order to monitor all events about data insertion or change.
- Java
- Docker
- Boot up a MySQL instance with Docker:
docker run --name mysql -e MYSQL_ROOT_PASSWORD=root -e MYSQL_USER=user -e MYSQL_PASSWORD=password -e MYSQL_DATABASE=customerdb -p 3306:3306 --rm -it mysql --binlog-format=ROW --binlog-row-image=FULL --binlog-rows-query-log-events=ON --performance-schema=ON
Property | Description |
---|---|
server-id | The value for the server-id must be unique for each server and replication client in the MySQL cluster. During MySQL connector set up, Debezium assigns a unique server ID to the connector. |
log_bin | The value of log_bin is the base name of the sequence of binlog files. |
binlog_format | The binlog-format must be set to ROW or row. |
binlog_row_image | The binlog_row_image must be set to FULL or full. |
expire_logs_days | This is the number of days for automatic binlog file removal. The default is 0, which means no automatic removal. Set the value to match the needs of your environment. See MySQL purges binlog files. |
For more information visit this link.
- Connect to MySQL docker instance:
docker exec -it mysql bash
- Once inside, login into MySQL server:
mysql --user=user --password=password
- Once logged in, create the database and table to run the demo application:
CREATE DATABASE customerdb;
USE customerdb;
CREATE TABLE customer ( id bigint NOT NULL AUTO_INCREMENT, email varchar(255) DEFAULT NULL, fullname varchar(255) DEFAULT NULL, PRIMARY KEY (id) );
- From another terminal, run the application:
./mvnw spring-boot:run
- Back to the Docker terminal, insert some data into the
customerdb
table:
INSERT INTO customerdb.customer (email, fullname) VALUES ('john.doe@acme.com', 'John Doe');
- From the application's terminal, you should see a data insertion event log similar to the one below:
2022-06-28 10:46:59.493 INFO 2009800 --- [pool-1-thread-1] i.d.connector.common.BaseSourceTask : 1 records sent during previous 00:00:24.117, last recorded offset: {transaction_id=null, ts_sec=1656424019, file=binlog.000002, pos=7258, row=1, server_id=1, event=3}
2022-06-28 10:46:59.493 INFO 2009800 --- [pool-1-thread-1] i.d.listener.DebeziumListener : Key = Struct{id=3}, Value = Struct{after=Struct{id=3,email=2022-06-28 13:46:59,fullname=2022-06-28 13:46:59},source=Struct{version=1.9.4.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1656424019000,db=customerdb,table=customer,server_id=1,file=binlog.000002,pos=7604,row=0,thread=23},op=c,ts_ms=1656424019483}
2022-06-28 10:46:59.493 INFO 2009800 --- [pool-1-thread-1] i.d.listener.DebeziumListener : SourceRecordChangeValue = 'Struct{after=Struct{id=3,email=2022-06-28 13:46:59,fullname=2022-06-28 13:46:59},source=Struct{version=1.9.4.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=1656424019000,db=customerdb,table=customer,server_id=1,file=binlog.000002,pos=7604,row=0,thread=23},op=c,ts_ms=1656424019483}'