debezium-postgresql-elasticsearch

Target Architecture

Alt text

Tech Stack

  • Docker
  • Postgresql
  • Zookeeper
  • Schema Registry (Avro)
  • Kafka
  • Debezium
  • Elasticsearch

How to Run

# load environment variables
source .env

# Run all docker instances
docker-compose up

Alt text

# Check if debezium is UP
curl -H "Accept:application/json" localhost:8083/

Alt text

# Check if elasticsearch is UP
curl http://localhost:9200

Alt text

# Deploy all configurations when elasticsearch and debezium is UP
sh deploy-configurations.sh

Alt text

# Check installed debezium connector plugins
curl -H "Accept:application/json" http://localhost:8083/connector-plugins

Alt text

# Check installed debezium configurations
curl -H "Accept:application/json" http://localhost:8083/connectors

Alt text

# Check installed source connector status
curl -H "Accept:application/json" http://localhost:8083/connectors/postgres-employee-source/status

Alt text

# Check installed sink connector status
curl -H "Accept:application/json" http://localhost:8083/connectors/es-employee-sink/status

Alt text

# Check elasticsearch configurations
curl -H "Accept:application/json" http://localhost:9200/cdc.employee_db.public.employee

Alt text

# Check if debezium topic is created 
docker-compose exec kafka /kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list

Alt text

# Check if elasticsearch already has content
curl -H "Accept:application/json" http://localhost:9200/cdc.employee_db.public.employee/_search?pretty

Alt text

# Open new tab to manipulate table
docker-compose exec postgres env PGOPTIONS="--search_path=public" bash -c 'psql -U $POSTGRES_USER postgres'
# Check insert query
curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match": {
      "id": {
        "query": 6
      }
    }
  }
}'

Alt text Alt text

# Check update query
curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match": {
      "id": {
        "query": 5
      }
    }
  }
}'

Alt text Alt text

# Check delete query
curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match": {
      "id": {
        "query": 5
      }
    }
  }
}'

Alt text Alt text

# Watch messages from debezium topic as Binary
docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic cdc.employee_db.public.employee

# Watch messages from debezium topic as Converted Avro to Json
docker run -it --rm --name avro-consumer --network=debezium-postgresql-elasticsearch_default \
    --link cdc_zookeeper \
    --link cdc_kafka \
    --link cdc_postgres \
    --link cdc_schema_registry \
    debezium/connect:0.10 \
    /kafka/bin/kafka-console-consumer.sh \
      --bootstrap-server kafka:9092 \
      --property print.key=true \
      --formatter io.confluent.kafka.formatter.AvroMessageFormatter \
      --property schema.registry.url=http://schema-registry:8081 \
      --topic cdc.employee_db.public.employee

# Terminate all docker instances
docker-compose down

References