dursunkoc/kafka_connect_sample

schema data types aren't getting translated to postgres properly

Opened this issue · 0 comments

hi thanks for your repo. I want to do almost exactly this, except without the oracle target db. I just want to stream oracle -> postgres. I tried to run in my own set up which is a git clone of yours with slight modifications. I am running into an issue:

source:

{
  "name": "oracle-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.oracle.OracleConnector",
    "tasks.max": "1",
    "database.server.name": "orcl_dm",
    "database.hostname": "HOST",
    "database.port": "1521",
    "database.user": "dbzuser",
    "database.password": "dbz",
    "database.dbname": "ora_dm",
    "database.sid": "orcl_dm",
    "database.oracle.version": "19",
    "database.out.server.name": "dbzxout",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "oracle.schema-changes",
    "database.connection.adapter": "logminer",
    "database.schema": "USERDATA",
    "table.include.list": "USERDATA.DISK_SPACE",
    "errors.log.enable": "true",
    "snapshot.lock.timeout.ms": "5000",
    "include.schema.changes": "true",
    "snapshot.mode": "initial",
    "decimal.handling.mode": "double"
  }
}

**I noticed that if I add: `"database.history.store.only.captured.tables.ddl": true` it doesn't work at all. 

and sink:

{
  "name": "jdbc-sink-postgres",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "dialect.name": "PostgreSqlDatabaseDialect",
    "table.name.format": "DISK_SPACE",
    "topics": "orcl_dm.USERDATA.DISK_SPACE",
    "connection.url": "jdbc:postgresql://postgres:5432/postgres",
    "connection.user": "postgres",
    "connection.password": "postgres",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "auto.create": "true",
    "insert.mode": "upsert",
    "delete.enabled": "true",
    "pk.fields": "ID",
    "pk.mode": "record_key"
  }
}

the sql for the source oracle table:

CREATE TABLE userdata.disk_space (
  id NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1) NOT NULL PRIMARY KEY,
  record_date DATE NOT NULL,
  record_extracted clob NOT NULL
);

And here is one row:

INSERT INTO userdata.disk_space
VALUES (SYSDATE, '/dev/xvda1     xfs           50G   47G  3.5G  94% /
/dev/xvdb      ext4          99G   84G  9.9G  90% /oradata
/dev/xvdc      ext4         296G   76G  207G  27% /orabkp
', NULL);

When I registered the source and sink to the connector, it seems ok and I made updates/deletes on oracle source, but on postgres side it looks like:

+-------------+------------------+----+
| DATE [bigint]| RECORD [text] | ID [PK double precision]|
+=============+==================+====+
| 1.676e+12   | null             | 1  |
+-------------+------------------+----+
| 1.677e+12   | null             | 2  |
+-------------+------------------+----+
| 1.677e+12   | null             | 3  |
+-------------+------------------+----+
| 1.678e+12   | null             | 4  |
+-------------+------------------+----+

I was hoping if I can get some help on how to get the postgres data to be exact datatype as oracle. It is date object, clob and NUMBER. But in postgres as you can see, it is bigint, text, and dbl precision. Also, not sure why it can't copy clob properly, showing null. I'm a complete newb to this stuff and it is fascinating. Thanks for your time.