- Oracle database (Oracle EE 12 or 19 tested)
- Kubernetes cluster for deploying Kafka Strimzi and Kafka Connect
- Kafka cluster (provided by Kafka Strimzi)
- Basic knowledge of Kubernetes, Kafka, and Oracle databases
- Read debezium docs for reference
To use the Debezium Oracle Source Kafka Connector to Debezium Postgres Sink Connector to stream data from the DISK_SPACE table in Oracle, the following steps can be taken:
- Set up a Kafka cluster using the Strimzi operator on an Amazon Elastic Kubernetes Service (EKS) cluster.
- Ensure permissions and logging is enabled
- Install and configure the Debezium Oracle Source Kafka Connector and the Debezium Postgres Sink Connector on the Kafka cluster.
- Configure the Oracle database to use LogMiner to capture changes made to the DISK_SPACE table and write them to the redo log.
- Configure the Oracle Source Kafka Connector to read the redo log and publish the changes to a Kafka topic.
- Configure the Postgres Sink Kafka Connector to consume the Kafka topic and write the changes to the Postgres database.
- Verify that the changes are being streamed correctly from the DISK_SPACE table in Oracle to the Postgres database by checking the Postgres database for the changes.
- Monitor the Kafka cluster and connectors for any errors or issues and troubleshoot as necessary.
By following these steps, it is possible to set up a data streaming pipeline that can continuously replicate data from an Oracle database to a Postgres database using Kafka and the Debezium connectors. This can be useful for a variety of use cases, including data warehousing, data synchronization, and data migration.
1.1. Install the Strimzi operator on your Kubernetes/Openshift cluster by applying the installation YAML file:
kubectl apply -f https://strimzi.io/install/latest?namespace=kafka-operator -n kafka-operator
2.1. Create a Kafka custom resource (CR) YAML file, e.g., kafka-cluster.yaml
, defining your Kafka cluster configuration.
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-kafka-cluster
spec:
kafka:
replicas: 3
listeners:
plain: {}
tls: {}
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.message.format.version: '2.8'
zookeeper:
replicas: 3
storage:
size: 5Gi
entityOperator:
topicOperator: {}
userOperator: {}
2.2. Deploy the Kafka cluster:
kubectl apply -f kafka-cluster.yaml
Configuration needed for Oracle LogMiner
ORACLE_SID=ORACLCDB dbz_oracle sqlplus /nolog
CONNECT sys/top_secret AS SYSDBA
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should now "Database log mode: Archive Mode"
archive log list
exit;
To confirm that Oracle has backups enabled, execute the command below first. The LOG_MODE should say ARCHIVELOG. If it does not, you may need to reboot your Oracle AWS RDS instance.
ALTER TABLE schema.table_name ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
LOG_MODE
------------
ARCHIVELOG
Once LOG_MODE
is set to ARCHIVELOG
, execute the commands to complete LogMiner configuration. The first command set the database to archivelogs and the second adds supplemental logging.
To enable Debezium to capture the before state of changed database rows, you must also enable supplemental logging for captured tables or for the entire database. The following example illustrates how to configure supplemental logging for all columns in a single inventory.customers table.
#!/bin/bash
sqlplus / as sysdba <<EOF
CREATE TABLESPACE logminer_tbs DATAFILE '/oradata/ORA_DM/logminer_tbs.dbf'
SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
EOF
sqlplus / as sysdba <<EOF
CREATE USER dbzuser IDENTIFIED BY dbz
DEFAULT TABLESPACE logminer_tbs
QUOTA UNLIMITED ON logminer_tbs
;
GRANT CREATE SESSION TO dbzuser ;
GRANT SET CONTAINER TO dbzuser ;
GRANT SELECT ON V_\$DATABASE to dbzuser ;
GRANT FLASHBACK ANY TABLE TO dbzuser ;
GRANT SELECT ANY TABLE TO dbzuser ;
GRANT SELECT_CATALOG_ROLE TO dbzuser ;
GRANT EXECUTE_CATALOG_ROLE TO dbzuser ;
GRANT SELECT ANY TRANSACTION TO dbzuser ;
GRANT LOGMINING TO dbzuser ;
GRANT CREATE TABLE TO dbzuser ;
GRANT LOCK ANY TABLE TO dbzuser ;
GRANT CREATE SEQUENCE TO dbzuser ;
GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser ;
GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser ;
GRANT SELECT ON V_\$LOG TO dbzuser ;
GRANT SELECT ON V_\$LOG_HISTORY TO dbzuser ;
GRANT SELECT ON V_\$LOGMNR_LOGS TO dbzuser ;
GRANT SELECT ON V_\$LOGMNR_CONTENTS TO dbzuser ;
GRANT SELECT ON V_\$LOGMNR_PARAMETERS TO dbzuser ;
GRANT SELECT ON V_\$LOGFILE TO dbzuser ;
GRANT SELECT ON V_\$ARCHIVED_LOG TO dbzuser ;
GRANT SELECT ON V_\$ARCHIVE_DEST_STATUS TO dbzuser ;
GRANT SELECT ON V_\$TRANSACTION TO dbzuser ;
EOF
5.1. Create a Kafka Connector custom resource YAML file, e.g., debezium-connector.yaml
, to define the Debezium Oracle connector:
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: debezium-connector
spec:
class: io.debezium.connector.oracle.OracleConnector
tasksMax: 1
config:
database.server.name: 'my-oracle-db'
database.hostname: 'oracle-hostname'
database.port: 1521
database.user: 'oracle-username'
database.password: 'oracle-password'
database.dbname: 'oracle-sid'
database.history.kafka.bootstrap.servers: 'my-kafka-cluster-kafka-bootstrap:9092'
database.history.kafka.topic: 'schema-changes.inventory'
database.connection.adapter: 'logminer'
snapshot.mode: 'when_needed'
table.include.list: 'inventory.customers'
5.2. Deploy the Debezium Oracle connector:
kubectl apply -f debezium-connector.yaml
6.1. Monitor the Debezium Oracle connector logs to ensure it's capturing and streaming CDC events:
kubectl logs -f <debezium-connector-pod>
6.2. Verify the Kafka topics containing the CDC events in your Kafka cluster.
mvn archetype:generate -DgroupId=com.kinnate -DartifactId=oracle-numb-to-int -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
- Add the proper dependencies, which are already added to the
pom.xml
file mvn package
- copy the
.jar
file to the directory where the Dockerfile is. - use
kubectl
to apply the (1) cluster connector (2) debezium oracle connector and (3) debezium postgres connector - check deployment by running:
Congratulations! You have successfully set up Kafka Strimzi, deployed Kafka Connect, and configured the Debezium Oracle connector for Change Data Capture processes.
k get kafka -n kafka
k get kctr -n kafka
k logs ${DEBEZIUM_POD_NAME} -n kafka
k describe kctr ${DEBEZIUM_CONNECTOR_NAME} -n kafka
k describe kafkaconnect ${CLUSTER_NAME} -n kafka