Debezium
Practice Repo with Debezium which spins up MySQL, Postgres, Kafka, Kafka Connect, Kafka UI (http://localhost:8080
), and an optional Python Producer to test CDC Implementations with Debezium as well as other Data Engineering use cases.
This Repo is setup to test Debezium for both MySQL & Postgres. Debezium Connector Configs + S3 Sinks can be found in the connectors/
directory. The Python Producer can be used to write out SQL Records to the second_movies
table in Postgres every 5 seconds to test a real-time CDC use case. You can adapt the connectors to include more tables as needed.
Steps
- Run
make up
- Submit one of the Debezium Connectors for either Postgres or MySQL found in the
connectors/
directory - Submit one of the S3 Sinks (make sure the
topics
parameter matches your Kafka Topic names) found in theconnectors/s3_sink.sh
file- You'll need to setup an
aws_credentials
file at the root of the directory in order to pass credentials for any S3 Sink to work
- You'll need to setup an
- Spin up the Kafka UI at
http://localhost:8080
to view the Topics, Messages in the Topics, and the status of the Connectors - Spin up your DB tool of choice and make record changes in the tables connected to your connectors to see the changes be captured by Debezium & sent off to the Kafka Topics
- When finished, run
make down
Workflow Diagram
Each MySQL/Postgres Database you have would need its own Debezium Connector, but they can all write to the same Kafka Cluster.
Articles
S3 Sink - IAM User
To use the S3 Sink, create an IAM User in your AWS Account and attach policy that looks like below, then create access/secret credentials and paste into a aws_credentials
file at the root of the directory
aws_credentials
[default]
aws_access_key_id = xxx
aws_secret_access_key = yyy
Example S3 Policy to write to an S3 Bucket called jyablonski-kafka-s3-sink
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ListObjectsInBucket",
"Effect": "Allow",
"Action": [
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::jyablonski-kafka-s3-sink"
]
},
{
"Sid": "AllObjectActions",
"Effect": "Allow",
"Action": "s3:*Object*",
"Resource": [
"arn:aws:s3:::jyablonski-kafka-s3-sink/*"
]
}
]
}
Example S3 Sink Connector
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/jyablonski-kafka-s3-sink/config \
-d '
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
"topics": "movies",
"s3.region": "us-east-1",
"s3.bucket.name": "jyablonski-kafka-s3-sink",
"rotate.schedule.interval.ms": "60000",
"timezone": "UTC",
"flush.size": "65536",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility": "NONE",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"transforms": "AddMetadata",
"transforms.AddMetadata.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddMetadata.offset.field": "_offset",
"transforms.AddMetadata.partition.field": "_partition"
}
'
Storage Class Formats:
io.confluent.connect.s3.format.parquet.ParquetFormat
io.confluent.connect.s3.format.avro.AvroFormat
io.confluent.connect.s3.format.json.JsonFormat
- JSON is the only one that will work out of the box; Parquet + Avro require a Schema Registry Container + additional Config Parameters to be setup to point at the Schema Registry in order for them to work.
Debezium Config Options Explanations
connector.class
- The Installed Connector to use
tasks.max
- Max number of tasks for this connector. Defaults to 1, only use 1 for MySQL + Postgres
database.server.id
(MySQL only) - Must be a UNIQUE value if multiple Debezium Connectors are hooked up to that specific Database. If they're the same database server ID then you will encounter errors.
database.hostname
- IP Address or Hostname for the Postgres Database.
database.dbname
- The Database to connect to on the Server.
database.server.name
- A separate, internal name for Debezium to identify the Database.
schema.include.list
- Comma separated list of schemas you want to capture. By default it captures every schema.
table.include.list
- If specified, it will only capture data for these tables. Format is schemaName.tableName
.
table.exclude.list
- Can alternatively exclude tables instead. table.include.list
and table.exclude.list
cant both be set at the same time.
plugin.name
(Postgres only) - Basically just put it to pgoutput
at all times.
include.schema.changes
- Boolean which dictates whether to send DDL changes and send them to the a Kafka Topic w/ the same name as the database server ID. Probably fine to set to false unless you actually plan to have a consumer read from this topic.
snapshot.mode
- Whether to do a select *
from all records currently in the table. It will incrementally grab these in chunks if there's a lot.
- NOTE Values for this parameter mean different things in MySQL + Postgres. If you don't want to re-capture 100% of data when the Connectors boots up, for Postgres it can be set to
never
, while in MySQL it has to be set toschema_only
.
io.debezium.transforms.ExtractNewRecordState
- Flattens the message to only include the record after the NEW changes. By default, Debezium will capture both the Old + New record state, but we only want the new one for CDC + Data Engineering use cases.
transforms.unwrap.drop.tombstones
- When a record is deleted, 2 changes are captured by Debezium: 1 for the record as it was before the delete, and 1 that sets all the columns to null. If set to true, this parameter drops this record that has everything set to null before it sends it to Kafka which is what we want for DE use cases.
transforms.unwrap.delete.handling.mode: rewrite
- Creates a new column called __deleted
. When a record is deleted, Debezium will capture the record as it was before the delete & set the __deleted
field to true
.
transforms.dropTopicPrefix.type: org.apache.kafka.connect.transforms.RegexRouter
- Used to update a Topic's name.
transforms.dropTopicPrefix.regex
- Used to specify the structure for a Topic's name that you want to change.
asgard_postgres.dbz_schema.(.*)
means it will removeasgard_postgres.dbz_schema.
in the topic name and only keep the actual topic name (.*
)
transforms.dropTopicPrefix.replacement
- Used to specify what to replace the filtered Regex with.
$1
means it captures the first part.$2
would capture the second part, if you were filtering 2 different parts of a regex out or something.
Schema Registry
Used to capture schema information from connectors.
If you want to use a Parquet or Avro Sink Format for the Sink files, then you must use a Schema Registry. The following key + value converter parameters must be set on the Debezium Connector so the records are initially written with the appropriate Schema Metadata in the system.
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.enhanced.avro.schema.support": "true",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.enhanced.avro.schema.support": "true",
"value.converter.schema.registry.url": "http://schema-registry:8081"
And the S3 Sink would subsequently need these parameters to write out to Parquet
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility": "NONE",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner"
Mutliple Debezium Connectors on 1 Database
These 2 database properties have nothing to do with the MySQL Database apparently. They're supposed to have something to do with schema changes + the Kafka Topic. There's also some additional properties, which seems like they're writing stuff as asgard.{table_name} and the transform are to drop that prefix and the asgard.demo.(.*) is to drop the ACTUAL MySQL DB name (demo) as well.
If you have 2+ Debezium Connectors, you CANNOT use the same database.server.id
or database.server.name
for each table to do CDC on or it'll yell at you and the worker will die.
"database.server.id": "42",
"database.server.name": "asgard",
"include.schema.changes": "true",
"transforms": "unwrap,dropTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"asgard.demo.(.*)",
"transforms.dropTopicPrefix.replacement":"$1",
What are Tombstones
- A tombstone record is created after a record is deleted, and it keeps the primary key and sets all other columns to null.
- You can set
delete.handling.mode = rewrite
which adds a__deleted
column to the tables, and when that record gets deleted an "update" event happens which sets this _deleted column to true so you can filter it out downstream later on. - If you set
"transforms.unwrap.drop.tombstones": "false",
it breaks shit because it leaves the deleted records in there with all columns set to NULL, but certain columns aren't supposed to be null or maybe it's the schema registry that sees them and freaks out and causes errors, i dont know. This parameter should always be set totrue
Adding the following properties allows you to track deletes - it will add a __deleted
column to every record which is set to true or false depending on whether the event represents a delete operation or not.
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.drop.tombstones": "true"