Learning Tech: DIY Change Data Capture On Localhost

thumbnail Hey there! So, imagine your data is like a lively neighborhood, and every day, new things are happening—new people moving in, kids starting school, and maybe some folks leaving. Now, change data capture (CDC) is like having a super observant neighbor who keeps tabs on all these happenings and lets you know what's different every day.

In tech terms, CDC is a cool way to track and capture changes in your database, like when new data is added, existing data is updated, or when something is deleted. It's like having a snapshot of your data's daily adventures, making it easier for you to stay in the loop without having to sift through the entire neighborhood gossip.

Setting up CDC is like putting on your neighborhood watch hat—it helps you keep track of all the comings and goings in your data world. So, let's build this system for free on our local laptop to better understand how to monitor our data and the tools used! First, let's introduce the tools that we are going to use: Postgres, Kafka, Debezium!


We will be hosting all of these tools with docker-compose. We will also be building a very slim python app which listens to a kafka topic. Because of this, I will assume you have both docker-compose and python installed.

Tools and Terminology


Finally, let's meet PostgreSQL, the unsung hero of the relational database world. If databases were superheroes, PostgreSQL would be the one quietly ensuring data integrity and reliability behind the scenes. This open-source, object-relational database system has been winning hearts for its extensibility, standards compliance, and robustness.

What sets PostgreSQL apart? Well, it goes beyond just storing data; it's your trustworthy companion for handling complex queries, transactions, and scaling with ease. Whether you're a small startup or a large enterprise, PostgreSQL's flexibility and adherence to SQL standards make it a go-to choice for developers and database administrators alike.

Apache Kafka

Kafka is one of the backbones of the real-time data revolution. Picture it as the bustling central hub where data streams converge and diverge, forming a bustling metropolis of information exchange. Kafka ensures that no byte of data is left unexplored, connecting your applications in a seamless dance of communication.

At its core, Kafka is an open source distributed event streaming platform. That's a lot of dense wording, but we can summarize it by saying that its a robust, flexible, and efficient PubSub platform. One end will write to topics, and consumers will listen and react to the messages received from the publisher. It helps decouple and scale our applications with asyncronous data processing.


Now, let's introduce Debezium, the game-changer in the realm of change data capture. Imagine a world where you not only store data but also capture every subtle change in real-time - that's the magic Debezium brings to the table.

Debezium acts as a connector between your databases and Apache Kafka, the distributed streaming platform. It taps into the database's transaction log, capturing every insert, update, or delete operation. This change data is then streamed to Kafka topics, creating a real-time pipeline of events that reflects the evolving state of your database.

Why is this important? Well, Debezium's Change Data Capture allows you to react instantly to database changes, opening doors to a plethora of use cases, from maintaining data replicas to triggering real-time analytics or feeding data into microservices architectures.

In simpler terms, Debezium is your data detective, keeping an eye on the ever-changing landscape of your database and broadcasting those changes to the rest of your data ecosystem.

Part I: Hosting our services with compose

First, let's just start all of our services up.

Our entire docker-compose.yml file is below:

version: '3.7'
    image: postgres:14
      - POSTGRES_DATABASE=cdctest
      - POSTGRES_USER=cdctest
      - POSTGRES_PASSWORD=cdctest
      # Settings for debezium, at a minimum, 
      # is to enable wal_level=logical
      - postgres 
      - -c 
      - wal_level=logical 
      - -c 
      - max_wal_senders=10 
      - -c 
      - max_replication_slots=10 
      - 55432:5432
      - cdc-net
      - ./postgres/init.sql:/docker-entrypoint-initdb.d/01-init.sql
    image: wurstmeister/zookeeper:latest
      - cdc-net

    image: wurstmeister/kafka:latest
      KAFKA_CREATE_TOPICS: "cdctest.customers:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      - cdc-net

  # We can create the connector with the REST API:
  # curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @/opt/debezium/customers_conf.json
    image: debezium/connect:2.4
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=cdc_connect_configs
      - OFFSET_STORAGE_TOPIC=cdc_connect_offsets
      - STATUS_STORAGE_TOPIC=cdc_source_connect_statuses
      - BOOTSTRAP_SERVERS=kafka:9092
      - ./debezium/:/opt/debezium:ro
      - cdc-net

    # kafka-console-consumer.sh --topic cdctest.public.customers --bootstrap-server kafka:9092
    image: py-consumer
      context: ./python
      - KAFKA_CDC_TOPIC=cdctest.public.customers
      - kafka
      - cdc-net

    external: false
    name: cdc-net

We have five services:

First, we have the database service. This is our postgres service. We will use postgres 14 here. CDC with Debezium requires a logical wal level, so we will enable that with -c wal_level=logical. We also increase our max_wal_senders and max_replication_slots, but note these are not required settings for Debezium. We will expose the container on port 55432 on our local computer and we will run a few initializing sql commands in ./postgres/init.sql. The SQL is just simply used to create a customers table, which is what we will monitor with our Debezium connector.

Next, we have our zookeeper and kafka services. We won't dive too much into zookeeper. For now, it is suffice to say that it will be used by kafka. Kafka, on the other hand does have some configuration worth mentioning. Our kafka service has a few environment variables:


Now, if you've used kafka in a docker container before, a lot of these should look familiar. But the new one is the KAFKA_CREATE_TOPICS environment variable. This tells kafka to create this topic when it starts up. The name, however, seems a bit weird? It kind of looks like a database table's name with the format: <database_name>.<database_schema>.<database_table>. This is the topic that debezium will use to publish changes/updates/deletes to kafka.

Next, let's look at our connect (or debezium) service. This is the topic that will connect to postgres, connect to kafka, and publish changes to kafka. It has a few environment variables:


These four environment variables are required:

  • GROUP_ID: Set this to an ID that uniquely identifies the Kafka Connect cluster the service and its workers belong to.

  • CONFIG_STORAGE_TOPIC - Set this to the name of the Kafka topic where the Kafka Connect services in the group store connector configurations.

  • OFFSET_STORAGE_TOPIC - Set this to the name of the Kafka topic where the Kafka Connect services in the group store connector offsets.

  • BOOTSTRAP_SERVERS - Set this to a list of host/port pairs to use for establishing the initial connection to the Kafka cluster.

All debezium docker environment configurations can be found here.

We also mount this debezium folder into our container. This folder will house our connector settings. Debezium allows you to configure your connectors via a REST API on localhost:8083. This REST API accepts JSON bodies to provision and create the connectors. Let's take a look at our one configuration:

	"name": "customer-cdc",
	"config": {
		"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
		"database.hostname": "database",
		"database.port": "5432",
		"database.user": "cdctest",
		"database.password": "cdctest",
		"database.dbname": "cdctest",
		"database.server.name": "cdctest",
        "topic.prefix": "cdctest",
		"table.include.list": "public.customers",
        "plugin.name": "pgoutput"

So, if we submit this to our REST API, we will create a connector with the name customer-cdc. It will be of type PostgresConnector. It will connect to database:5432 with the username cdctest and the password cdctest. It will prefix all of the tables is monitors with cdctest and it will monitor the table public.customers. So, here's where the kafka topic name (described above) comes from! We will monitor public.customers and right changes to the cdctest.public.customers topic!

Our last service (app) is a simple python kafka app. Since this isn't really a python blog, we won't go through it too much in depth, but you can find the source code here if you are curious. For now, it is suffice to say that it will:

  1. Connect to our kafka cluster
  2. Consume from the cdctest.public.customers topic
  3. Log the changes to the console

We can now start everything up:

prompt> docker-compose up -d
[+] Running 6/0
 ✔ Network cdc-net                                                                                                                                          Created0.0s 
 ✔ Container ez-cdc-connect-1                                                                                                                               Created0.0s 
prompt> docker-compose ps
NAME                 IMAGE                           COMMAND                  SERVICE     CREATED          STATUS          PORTS
ez-cdc-app-1         py-consumer                     "python main.py"         app         56 seconds ago   Up 55 seconds   
ez-cdc-connect-1     debezium/connect:2.4            "/docker-entrypoint.…"   connect     56 seconds ago   Up 55 seconds   8083/tcp, 8778/tcp, 9092/tcp
ez-cdc-database-1    postgres:14                     "docker-entrypoint.s…"   database    56 seconds ago   Up 55 seconds>5432/tcp
ez-cdc-kafka-1       wurstmeister/kafka:latest       "start-kafka.sh"         kafka       56 seconds ago   Up 55 seconds   
ez-cdc-zookeeper-1   wurstmeister/zookeeper:latest   "/bin/sh -c '/usr/sb…"   zookeeper   56 seconds ago   Up 55 seconds   22/tcp, 2181/tcp, 2888/tcp, 3888/tcp

Setting Up Our Connector

Let's go and setup our connector. We will do that by execing in to the connect container and running a simple cURL command:

prompt> docker-compose exec connect bash
[kafka@eefc386d2cd3 ~]$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @/opt/debezium/customers_conf.json
HTTP/1.1 201 Created
Date: Thu, 21 Dec 2023 13:44:21 GMT
Location: http://localhost:8083/connectors/customer-cdc
Content-Type: application/json
Content-Length: 415
Server: Jetty(9.4.51.v20230217)

[kafka@eefc386d2cd3 ~]$ 

Now, let's go insert some data into our database:

prompt> export PGHOST=localhost
prompt> export PGPORT=55432
prompt> export PGUSER=cdctest
prompt> export PGPASSWORD=cdctest
prompt> psql
psql (14.7 (Homebrew), server 14.10 (Debian 14.10-1.pgdg120+1))
Type "help" for help.

cdctest=# insert into customers (first_name, last_name, email) VALUES ('alex', 'foley', 'alex.foley');
cdctest=# insert into customers (first_name, last_name, email) VALUES ('john', 'doe', 'john.doe');
cdctest=# select * from customers;
 id | first_name | last_name |   email    
  1 | alex       | foley     | alex.foley
  2 | john       | doe       | john.doe
(2 rows)


Now, let's look at the logs of our python app:

prompt> docker-compose logs app
app-1  | 2023-12-21 13:46:28.744 | INFO     | __main__:consume:47 - consumed: value={'schema': {'type': 'struct', 'fields': [{'type': 'struct', 'fields': [{'type': 'int32', 'optional': False, 'default': 0, 'field': 'id'}, {'type': 'string', 'optional': True, 'field': 'first_name'}, {'type': 'string', 'optional': True, 'field': 'last_name'}, {'type': 'string', 'optional': True, 'field': 'email'}], 'optional': True, 'name': 'cdctest.public.customers.Value', 'field': 'before'}, {'type': 'struct', 'fields': [{'type': 'int32', 'optional': False, 'default': 0, 'field': 'id'}, {'type': 'string', 'optional': True, 'field': 'first_name'}, {'type': 'string', 'optional': True, 'field': 'last_name'}, {'type': 'string', 'optional': True, 'field': 'email'}], 'optional': True, 'name': 'cdctest.public.customers.Value', 'field': 'after'}, {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'version'}, {'type': 'string', 'optional': False, 'field': 'connector'}, {'type': 'string', 'optional': False, 'field': 'name'}, {'type': 'int64', 'optional': False, 'field': 'ts_ms'}, {'type': 'string', 'optional': True, 'name': 'io.debezium.data.Enum', 'version': 1, 'parameters': {'allowed': 'true,last,false,incremental'}, 'default': 'false', 'field': 'snapshot'}, {'type': 'string', 'optional': False, 'field': 'db'}, {'type': 'string', 'optional': True, 'field': 'sequence'}, {'type': 'string', 'optional': False, 'field': 'schema'}, {'type': 'string', 'optional': False, 'field': 'table'}, {'type': 'int64', 'optional': True, 'field': 'txId'}, {'type': 'int64', 'optional': True, 'field': 'lsn'}, {'type': 'int64', 'optional': True, 'field': 'xmin'}], 'optional': False, 'name': 'io.debezium.connector.postgresql.Source', 'field': 'source'}, {'type': 'string', 'optional': False, 'field': 'op'}, {'type': 'int64', 'optional': True, 'field': 'ts_ms'}, {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'id'}, {'type': 'int64', 'optional': False, 'field': 'total_order'}, {'type': 'int64', 'optional': False, 'field': 'data_collection_order'}], 'optional': True, 'name': 'event.block', 'version': 1, 'field': 'transaction'}], 'optional': False, 'name': 'cdctest.public.customers.Envelope', 'version': 1}, 'payload': {'before': None, 'after': {'id': 1, 'first_name': 'alex', 'last_name': 'foley', 'email': 'alex.foley'}, 'source': {'version': '2.4.2.Final', 'connector': 'postgresql', 'name': 'cdctest', 'ts_ms': 1703166387948, 'snapshot': 'false', 'db': 'cdctest', 'sequence': '[null,"24284336"]', 'schema': 'public', 'table': 'customers', 'txId': 739, 'lsn': 24284336, 'xmin': None}, 'op': 'c', 'ts_ms': 1703166388457, 'transaction': None}} timestamp=1703166388741
app-1  | 2023-12-21 13:46:42.821 | INFO     | __main__:consume:47 - consumed: value={'schema': {'type': 'struct', 'fields': [{'type': 'struct', 'fields': [{'type': 'int32', 'optional': False, 'default': 0, 'field': 'id'}, {'type': 'string', 'optional': True, 'field': 'first_name'}, {'type': 'string', 'optional': True, 'field': 'last_name'}, {'type': 'string', 'optional': True, 'field': 'email'}], 'optional': True, 'name': 'cdctest.public.customers.Value', 'field': 'before'}, {'type': 'struct', 'fields': [{'type': 'int32', 'optional': False, 'default': 0, 'field': 'id'}, {'type': 'string', 'optional': True, 'field': 'first_name'}, {'type': 'string', 'optional': True, 'field': 'last_name'}, {'type': 'string', 'optional': True, 'field': 'email'}], 'optional': True, 'name': 'cdctest.public.customers.Value', 'field': 'after'}, {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'version'}, {'type': 'string', 'optional': False, 'field': 'connector'}, {'type': 'string', 'optional': False, 'field': 'name'}, {'type': 'int64', 'optional': False, 'field': 'ts_ms'}, {'type': 'string', 'optional': True, 'name': 'io.debezium.data.Enum', 'version': 1, 'parameters': {'allowed': 'true,last,false,incremental'}, 'default': 'false', 'field': 'snapshot'}, {'type': 'string', 'optional': False, 'field': 'db'}, {'type': 'string', 'optional': True, 'field': 'sequence'}, {'type': 'string', 'optional': False, 'field': 'schema'}, {'type': 'string', 'optional': False, 'field': 'table'}, {'type': 'int64', 'optional': True, 'field': 'txId'}, {'type': 'int64', 'optional': True, 'field': 'lsn'}, {'type': 'int64', 'optional': True, 'field': 'xmin'}], 'optional': False, 'name': 'io.debezium.connector.postgresql.Source', 'field': 'source'}, {'type': 'string', 'optional': False, 'field': 'op'}, {'type': 'int64', 'optional': True, 'field': 'ts_ms'}, {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'id'}, {'type': 'int64', 'optional': False, 'field': 'total_order'}, {'type': 'int64', 'optional': False, 'field': 'data_collection_order'}], 'optional': True, 'name': 'event.block', 'version': 1, 'field': 'transaction'}], 'optional': False, 'name': 'cdctest.public.customers.Envelope', 'version': 1}, 'payload': {'before': None, 'after': {'id': 2, 'first_name': 'john', 'last_name': 'doe', 'email': 'john.doe'}, 'source': {'version': '2.4.2.Final', 'connector': 'postgresql', 'name': 'cdctest', 'ts_ms': 1703166402653, 'snapshot': 'false', 'db': 'cdctest', 'sequence': '["24284632","24284688"]', 'schema': 'public', 'table': 'customers', 'txId': 740, 'lsn': 24284688, 'xmin': None}, 'op': 'c', 'ts_ms': 1703166402765, 'transaction': None}} timestamp=1703166402803

Whoa! Look at that! Our two test users flowed into our python app in almost real time!

Here's a video representation below so its easier to see! Demo