Table of Contents
CDC Demo using Kafka and Debezium
Tested with:
-
AMQ Streams 1.6.2
-
OpenShift 4.7.2
Change to yaml view
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-cluster
namespace: debezium-demo
spec:
kafka:
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.message.format.version: '2.6'
inter.broker.protocol.version: '2.6'
version: 2.6.0
storage:
type: ephemeral
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
entityOperator:
topicOperator: {}
userOperator: {}
zookeeper:
storage:
type: ephemeral
replicas: 1
create pull secret using the web console. Use the name: debezium-mysql-pull-secret
-
Secret Name: debezium-mysql-pull-secret
-
Authentication Type: Image Registry Credentials
-
Registry Server Address: docker.io
-
Username: luszczynski
-
Password: <pass>
-
Email: Leave it empty
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: my-connect-cluster
namespace: debezium-demo
annotations:
strimzi.io/use-connector-resources: "true"
spec:
bootstrapServers: 'my-cluster-kafka-bootstrap:9093'
version: 2.6.0
template:
pod:
imagePullSecrets:
- name: debezium-mysql-pull-secret
image: docker.io/luszczynski/debezium-mysql
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
replicas: 1
jvmOptions:
gcLoggingEnabled: false
oc new-app --name=mysql quay.io/debezium/example-mysql:latest -n $DEMO_NAMESPACE
oc set env deploy/mysql MYSQL_ROOT_PASSWORD=debezium MYSQL_USER=mysqluser MYSQL_PASSWORD=mysqlpw -n $DEMO_NAMESPACE
export POD_MYSQL=$(oc -n $DEMO_NAMESPACE get pods -o name -l app=mysql)
oc -n $DEMO_NAMESPACE exec $POD_MYSQL -- mysql -u mysqluser -pmysqlpw inventory
oc -n $DEMO_NAMESPACE exec $POD_MYSQL -- mysql -u mysqluser -pmysqlpw inventory -Bse 'show tables;'
oc -n $DEMO_NAMESPACE exec $POD_MYSQL -- mysql -u mysqluser -pmysqlpw inventory -Bse 'select * from customers;'
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: inventory-connector
labels:
strimzi.io/cluster: my-connect-cluster
namespace: debezium-demo
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
database.hostname: mysql
database.port: 3306
database.user: debezium
database.password: dbz
database.server.id: 184054
database.server.name: dbserver1
database.whitelist: inventory
database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
database.history.kafka.topic: schema-changes.inventory
oc logs -n $DEMO_NAMESPACE $(oc -n $DEMO_NAMESPACE get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
oc -n $DEMO_NAMESPACE exec -it my-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
oc -n $DEMO_NAMESPACE exec $POD_MYSQL -- mysql -u mysqluser -pmysqlpw inventory -Bse 'INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitty@acme.com");'
oc -n $DEMO_NAMESPACE exec $POD_MYSQL -- mysql -u mysqluser -pmysqlpw inventory -Bse 'INSERT INTO customers VALUES (default, "Gustavo", "Luszczynski", "gustavo.duarte@redhat.com");'
oc -n $DEMO_NAMESPACE exec $POD_MYSQL -- mysql -u mysqluser -pmysqlpw inventory -Bse 'UPDATE customers SET first_name="Anne Marie" WHERE id=1004;'