debezium-postgres-kafka-connector

Prerequisites

  • Oracle database (Oracle EE 12 or 19 tested)
  • Kubernetes cluster for deploying Kafka Strimzi and Kafka Connect
  • Kafka cluster (provided by Kafka Strimzi)
  • Basic knowledge of Kubernetes, Kafka, and Oracle databases
  • Read debezium docs for reference

Summary

To use the Debezium Oracle Source Kafka Connector to Debezium Postgres Sink Connector to stream data from the DISK_SPACE table in Oracle, the following steps can be taken:

  • Set up a Kafka cluster using the Strimzi operator on an Amazon Elastic Kubernetes Service (EKS) cluster.
  • Ensure permissions and logging is enabled
  • Install and configure the Debezium Oracle Source Kafka Connector and the Debezium Postgres Sink Connector on the Kafka cluster.
  • Configure the Oracle database to use LogMiner to capture changes made to the DISK_SPACE table and write them to the redo log.
  • Configure the Oracle Source Kafka Connector to read the redo log and publish the changes to a Kafka topic.
  • Configure the Postgres Sink Kafka Connector to consume the Kafka topic and write the changes to the Postgres database.
  • Verify that the changes are being streamed correctly from the DISK_SPACE table in Oracle to the Postgres database by checking the Postgres database for the changes.
  • Monitor the Kafka cluster and connectors for any errors or issues and troubleshoot as necessary.

By following these steps, it is possible to set up a data streaming pipeline that can continuously replicate data from an Oracle database to a Postgres database using Kafka and the Debezium connectors. This can be useful for a variety of use cases, including data warehousing, data synchronization, and data migration.

diagram

1. Install Kafka Strimzi Operator

1.1. Install the Strimzi operator on your Kubernetes/Openshift cluster by applying the installation YAML file:

kubectl apply -f https://strimzi.io/install/latest?namespace=kafka-operator -n kafka-operator

2. Deploy Kafka Cluster

2.1. Create a Kafka custom resource (CR) YAML file, e.g., kafka-cluster.yaml, defining your Kafka cluster configuration.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-kafka-cluster
spec:
  kafka:
    replicas: 3
    listeners:
      plain: {}
      tls: {}
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      log.message.format.version: '2.8'
  zookeeper:
    replicas: 3
    storage:
      size: 5Gi
  entityOperator:
    topicOperator: {}
    userOperator: {}

2.2. Deploy the Kafka cluster:

kubectl apply -f kafka-cluster.yaml

Preparing database

Configuration needed for Oracle LogMiner

ORACLE_SID=ORACLCDB dbz_oracle sqlplus /nolog
CONNECT sys/top_secret AS SYSDBA
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should now "Database log mode: Archive Mode"
archive log list

exit;

To confirm that Oracle has backups enabled, execute the command below first. The LOG_MODE should say ARCHIVELOG. If it does not, you may need to reboot your Oracle AWS RDS instance.

ALTER TABLE schema.table_name ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

LOG_MODE
------------
ARCHIVELOG

Once LOG_MODE is set to ARCHIVELOG, execute the commands to complete LogMiner configuration. The first command set the database to archivelogs and the second adds supplemental logging.

To enable Debezium to capture the before state of changed database rows, you must also enable supplemental logging for captured tables or for the entire database. The following example illustrates how to configure supplemental logging for all columns in a single inventory.customers table.

#!/bin/bash

sqlplus / as sysdba <<EOF
  CREATE TABLESPACE logminer_tbs DATAFILE '/oradata/ORA_DM/logminer_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;
EOF

sqlplus / as sysdba <<EOF
  CREATE USER dbzuser IDENTIFIED BY dbz
    DEFAULT TABLESPACE logminer_tbs
    QUOTA UNLIMITED ON logminer_tbs
    ;

  GRANT CREATE SESSION TO dbzuser ;
  GRANT SET CONTAINER TO dbzuser ;
  GRANT SELECT ON V_\$DATABASE to dbzuser ;
  GRANT FLASHBACK ANY TABLE TO dbzuser ;
  GRANT SELECT ANY TABLE TO dbzuser ;
  GRANT SELECT_CATALOG_ROLE TO dbzuser ;
  GRANT EXECUTE_CATALOG_ROLE TO dbzuser ;
  GRANT SELECT ANY TRANSACTION TO dbzuser ;
  GRANT LOGMINING TO dbzuser ;

  GRANT CREATE TABLE TO dbzuser ;
  GRANT LOCK ANY TABLE TO dbzuser ;
  GRANT CREATE SEQUENCE TO dbzuser ;

  GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser ;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser ;

  GRANT SELECT ON V_\$LOG TO dbzuser ;
  GRANT SELECT ON V_\$LOG_HISTORY TO dbzuser ;
  GRANT SELECT ON V_\$LOGMNR_LOGS TO dbzuser ;
  GRANT SELECT ON V_\$LOGMNR_CONTENTS TO dbzuser ;
  GRANT SELECT ON V_\$LOGMNR_PARAMETERS TO dbzuser ;
  GRANT SELECT ON V_\$LOGFILE TO dbzuser ;
  GRANT SELECT ON V_\$ARCHIVED_LOG TO dbzuser ;
  GRANT SELECT ON V_\$ARCHIVE_DEST_STATUS TO dbzuser ;
  GRANT SELECT ON V_\$TRANSACTION TO dbzuser ;

EOF

5. Deploy Debezium Oracle Connector

5.1. Create a Kafka Connector custom resource YAML file, e.g., debezium-connector.yaml, to define the Debezium Oracle connector:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: debezium-connector
spec:
  class: io.debezium.connector.oracle.OracleConnector
  tasksMax: 1
  config:
    database.server.name: 'my-oracle-db'
    database.hostname: 'oracle-hostname'
    database.port: 1521
    database.user: 'oracle-username'
    database.password: 'oracle-password'
    database.dbname: 'oracle-sid'
    database.history.kafka.bootstrap.servers: 'my-kafka-cluster-kafka-bootstrap:9092'
    database.history.kafka.topic: 'schema-changes.inventory'
    database.connection.adapter: 'logminer'
    snapshot.mode: 'when_needed'
    table.include.list: 'inventory.customers'

5.2. Deploy the Debezium Oracle connector:

kubectl apply -f debezium-connector.yaml

6. Verify CDC Process

6.1. Monitor the Debezium Oracle connector logs to ensure it's capturing and streaming CDC events:

kubectl logs -f <debezium-connector-pod>

6.2. Verify the Kafka topics containing the CDC events in your Kafka cluster.

7. Add the java custom data converter for CLOBS

  • mvn archetype:generate -DgroupId=com.kinnate -DartifactId=oracle-numb-to-int -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
  • Add the proper dependencies, which are already added to the pom.xml file
  • mvn package
  • copy the .jar file to the directory where the Dockerfile is.
  • use kubectl to apply the (1) cluster connector (2) debezium oracle connector and (3) debezium postgres connector
  • check deployment by running:

Congratulations! You have successfully set up Kafka Strimzi, deployed Kafka Connect, and configured the Debezium Oracle connector for Change Data Capture processes.

Additional terminal commands to check deploymemt

k get kafka -n kafka
k get kctr -n kafka
k logs ${DEBEZIUM_POD_NAME} -n kafka
k describe kctr ${DEBEZIUM_CONNECTOR_NAME} -n kafka
k describe kafkaconnect ${CLUSTER_NAME} -n kafka