En este tutorial, iniciará los servicios de Debezium, ejecutará un servidor de base de datos MySQL con una base de datos de ejemplo simple y utilizará Debezium para monitorear la base de datos en busca de cambios.
Debezium es una plataforma distribuida que convierte sus bases de datos existentes en flujos de eventos, para que las aplicaciones puedan ver y responder de inmediato a cada cambio de nivel de fila en las bases de datos.
Epicurio Registry es una implementacion de Esquema de Registro la cual actua como una API de Codigo Abierto, ademas de proporcionar bibliotecas cliente y una excelente integracion con Apache Kafka y Kafka Connect en forma de serializadores y convertidores. Apicurio
Puede visualizar los logs de cada contenedor con el siguiente comando: docker logs --follow --name container-name
-
Clonar ejemplos de Debezium de su repositorio oficial. Debezium
-
Los servicios necesarios a utilizar en este tutorial son:
- Zookeper
- Kafka
- Base de Datos Mysql
- Cliente de linea de comandos Mysql
- Kafka Connect
-
Tener instalado Docker en su maquina
-
Establecer la variable de entorno DEBEZIUM_VERSION en su version actual que en nuestro caso es la 1.5
- export DEBEZIUM_VERSION=1.5
-
Inicializar los contenedores de Mysql, Kafka y Zookeper
- docker-compose -f ./docker-compose/docker-compose-mysql.yaml up
Validamos que los servcios se encuentren inicializados
- Inicializamos el MySQL Connector
Después de iniciar los servicios Debezium y MySQL, está listo para implementar el conector Debezium MySQL para que pueda comenzar a monitorear la base de datos MySQL de muestra
Al registrar el conector Debezium MySQL, el conector comenzará a monitorear los BinLogs del servidor de base de datos MySQL. La binlog registra todas las transacciones de la base de datos (como cambios en filas individuales y cambios en los esquemas). Cuando cambia una fila en la base de datos, Debezium genera un evento de cambio.
* *curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register/register-mysql.json*
- Consumimos mensajes desde un Topico Kafka
docker-compose -f ./docker-compose/docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh
--bootstrap-server kafka:9092
--from-beginning
--property print.key=true
--topic dbserver1.inventory.customers
Ejecutamos
docker logs --follow container_id
- Procedemos a modificar registros de la Base de Datos Inventory que tenemos en nuestro motor de Base de Datos MySQL
docker-compose -f ./docker-compose/docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
Ejecutar los siguiente comandos para validar:
- use inventory;
- show tables;
- SELECT * FROM customers;
- Continuando el paso 5. Actualizamos un registro de la tabla Customer
UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
- Validamos la Captura del Cambio llevado a cabo el cual se ve reflejado en el Topico descrito en el paso 4. Observar los agregados before , after y op el cual para actualizacion toma el valor u
- Eliminamos un registro y observamos el cambio
DELETE FROM addresses WHERE customer_id=1004; DELETE FROM customers WHERE id=1004;
Observar que el tag op es igual a d
- Detener los servicios y destruir los recursos creados
docker-compose -f ./docker-compose/docker-compose-mysql.yaml down
Apicurio Registry es una API de código abierto y un registro de esquemas que, entre otras cosas, se puede utilizar para almacenar esquemas de registros de Kafka. Proporciona:
- Su propio convertidor Avro nativo y serializador Protobuf
- Un convertidor JSON que exporta su esquema al registro
- Una capa de compatibilidad con otros registros de esquemas como IBM o Confluent; se puede utilizar con el convertidor Confluent Avro.
-
Establecer la variable de entorno DEBEZIUM_VERSION en su version actual que en nuestro caso es la 1.5
- export DEBEZIUM_VERSION=1.5
-
Inicializar los contenedores de Mysql, Kafka, Zookeper y Apicurio
- docker-compose -f ./docker-compose/docker-compose-mysql-apicurio.yaml up
-
Inicializamos el MySQL Connector y especificamos la configuracion de Esquema Registry
- curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register/register-mysql-apicurio-converter-json.json
- Consultamos el Schema Registry de la entidad Customers
curl -X GET http://localhost:8080/api/artifacts/dbserver1.inventory.customers-value | jq .
- Consumimos mensajes desde un Topico Kafka
docker-compose -f ./docker-compose/docker-compose-mysql-apicurio.yaml exec kafka /kafka/bin/kafka-console-consumer.sh
--bootstrap-server kafka:9092
--from-beginning
--property print.key=true
--topic dbserver1.inventory.customers
Llevamos a cabo la actualizacion de un registro de la tabla Customer
UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
Como se observa el mensaje solo obtiene un Payload y una referencia al Id del esquema de registro que tiene asociado ese mensaje. Ya el schema no se infiera como la primera vez sino que esta asociado a un esquema de registro en Apicurio
Ahora si deseamos el esquema por el Id retornado para ver el Payload contra cual Schema Registry esta mapeado bastara con ejecutar:
curl -X GET http://localhost:8080/api/ids/106 | jq .
Mismos pasos que en Formato JSON pero en el paso inicializamos el MySQL Connector y especificamos la configuracion de Esquema Registry
- curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register/register-mysql-apicurio-converter-avro.json
Mismos pasos que en Formato JSON pero en el paso inicializamos el MySQL Connector y especificamos la configuracion de Esquema Registry
- curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register/register-mysql-apicurio.json