debezium-postgresql-elasticsearch
Target Architecture
Tech Stack
- Docker
- Postgresql
- Zookeeper
- Schema Registry (Avro)
- Kafka
- Debezium
- Elasticsearch
How to Run
# load environment variables
source .env
# Run all docker instances
docker-compose up
# Check if debezium is UP
curl -H "Accept:application/json" localhost:8083/
# Check if elasticsearch is UP
curl http://localhost:9200
# Deploy all configurations when elasticsearch and debezium is UP
sh deploy-configurations.sh
# Check installed debezium connector plugins
curl -H "Accept:application/json" http://localhost:8083/connector-plugins
# Check installed debezium configurations
curl -H "Accept:application/json" http://localhost:8083/connectors
# Check installed source connector status
curl -H "Accept:application/json" http://localhost:8083/connectors/postgres-employee-source/status
# Check installed sink connector status
curl -H "Accept:application/json" http://localhost:8083/connectors/es-employee-sink/status
# Check elasticsearch configurations
curl -H "Accept:application/json" http://localhost:9200/cdc.employee_db.public.employee
# Check if debezium topic is created
docker-compose exec kafka /kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list
# Check if elasticsearch already has content
curl -H "Accept:application/json" http://localhost:9200/cdc.employee_db.public.employee/_search?pretty
# Open new tab to manipulate table
docker-compose exec postgres env PGOPTIONS="--search_path=public" bash -c 'psql -U $POSTGRES_USER postgres'
# Check insert query
curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": {
"match": {
"id": {
"query": 6
}
}
}
}'
# Check update query
curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": {
"match": {
"id": {
"query": 5
}
}
}
}'
# Check delete query
curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": {
"match": {
"id": {
"query": 5
}
}
}
}'
# Watch messages from debezium topic as Binary
docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic cdc.employee_db.public.employee
# Watch messages from debezium topic as Converted Avro to Json
docker run -it --rm --name avro-consumer --network=debezium-postgresql-elasticsearch_default \
--link cdc_zookeeper \
--link cdc_kafka \
--link cdc_postgres \
--link cdc_schema_registry \
debezium/connect:0.10 \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--property print.key=true \
--formatter io.confluent.kafka.formatter.AvroMessageFormatter \
--property schema.registry.url=http://schema-registry:8081 \
--topic cdc.employee_db.public.employee
# Terminate all docker instances
docker-compose down
References