memiiso/debezium-server-iceberg

not able get records from the table.

Opened this issue · 11 comments

hi @ismailsimsek ,
so we are using debezium as a part of our architecture where we are using Oracle db as a source and destination as a dremio.
in that we are trying to access the initial and incremental load of the table from the database through CDC events. the process which we are following is
oracle -> aws iceberg tables -> aws s3 bucket (where data and metadata are stored) -> dremio (configured iceberg tables and s3 bucket), for this we have provided parameters (application.properties) for which I have attached a file for better understanding.
now the problem which we are facing is that, we are not able to get any of the data from the table. we are just getting the metadata of that in dremio and in dremio I have configured both
still i'm not able to get data from any of it.
application_properties.txt
image

Are you able to read the table using AWS glue? Could you compare the glue table path and S3 path? Recently there were issue on that #320

hi @ismailsimsek ,
thanks for quick response, but we are getting data as above image, but we don't want that way, we expect data as a below attachment.
Thank you,
Dhruv
image

@mevadadhruv please check following configs. make sure include exclude list settings are containing the schema you want to replicate.

debezium.source.table.include.list=schemaname.tablename

the table you are querying is just additional metadata table created by following config. since you are able to query it that means everything is working as expected.

debezium.source.include.schema.changes=true

hi @ismailsimsek ,
if you could please check our application.properties, in that you will find out that we already provided that parameters.
Thank you,
Dhruv Mevada

@mevadadhruv please check the value you are using schemaname.tablename. i recommend trying with regexp *

hi @ismailsimsek,
yah we tried actually this before but, that's not our problem, our actual problem is that we are not getting any records of table once we start the debezium server but we are getting the data and metadata folder in the s3 bucket but in that we are also getting schemas and logs of it, not the records of the table(please finds attachments for better understanding).

image
Thank you,
Dhruv Mevada

@mevadadhruv could you attach debezium logs too?

Hi @ismailsimsek , I'm working with @mevadadhruv on this part, the issue we are facing is that, oracle databases contain n number of table and we are trying to get data for particular table, we are getting the schema for the table but not the column and row field , it giving entire schema,

In postgres if we made anything update delete or insert we are getting data for that table
#341 (comment)
like that
but in oracle we are getting entire schema not the rows and column for that tables
[
{
"id": "e1aa0a60-7e65-4aa8-b647-c1eafd40efed",
"history_data": "{"source":{"server":"tutorial"},"position":{"snapshot_scn":"551361732","snapshot":true,"scn":"551361732","snapshot_completed":false},"ts_ms":1718010053332,"databaseName":"AMSGCPYPDB1","schemaName":"AMOS_PRIV_PRD_1","ddl":"\n CREATE TABLE \"AMOS_PRIV_PRD_1\".\"PART\" \n (\t\"EXTENDED_STATENO_I\" NUMBER(12,0), \n\t\"PARTNO\" VARCHAR2(32 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"PARTMATCH\" VARCHAR2(32 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"PARTSEQNO_I\" NUMBER(12,0) DEFAULT 0 NOT NULL ENABLE, \n\t\"DESCRIPTION\" VARCHAR2(36 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"REMARKS\" VARCHAR2(36 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"SPECIFICATION\" VARCHAR2(50 CHAR), \n\t\"ATA_CHAPTER\" VARCHAR2(12 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"VENDOR\" VARCHAR2(12 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"WEIGHT\" BINARY_DOUBLE DEFAULT 0 NOT NULL ENABLE, \n\t\"STORETIME\" NUMBER(12,0) DEFAULT 0 NOT NULL ENABLE, \n\t\"ALERT_QTY\" BINARY_DOUBLE DEFAULT 0 NOT NULL ENABLE, \n\t\"MEASURE_UNIT\" VARCHAR2(4 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"WASTE_CODE\" VARCHAR2(8 CHAR) DEFAULT ' ', \n\t\"REORD_LEVEL\" BINARY_DOUBLE DEFAULT 0 NOT NULL ENABLE, \n\t\"SAFETY_STOCK\" BINARY_DOUBLE, \n\t\"MAX_PURCH\" BINARY_DOUBLE DEFAULT 0 NOT NULL ENABLE, \n\t\"AC_TYP\" VARCHAR2(6 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"MAT_CLASS\" VARCHAR2(4 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"MAT_TYPE\" VARCHAR2(4 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"COUNTRY_ORIGIN\" VARCHAR2(4 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"REORDER\" VARCHAR2(2 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"TOOL\" VARCHAR2(1 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"REPAIRABLE\" VARCHAR2(1 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"AVG_TA_TIME\" NUMBER(12,0) DEFAULT 0 NOT NULL ENABLE, \n\t\"DEFAULT_SUPPLIER\" VARCHAR2(12 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"DEFAULT_REPAIR\" VARCHAR2(12 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"SPECIAL_CONTRACT\" VARCHAR2(2 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"FIXED_ASSET\" VARCHAR2(1 CHAR) DEFAULT 'N' NOT NULL ENABLE, \n\t\"REORDER_LAST_MUTATOR\" VARCHAR2(8 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"REORDER_LAST_MUTATION\" NUMBER(12,0) DEFAULT 0 NOT NULL ENABLE, \n\t\"MAX_SHOP_VISIT\" NUMBER(12,0) DEFAULT 0 NOT NULL ENABLE, \n\t\"SHOP_VISIT_RESET_CONDITION\" VARCHAR2(8 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"SPECIAL_MEASURE_UNIT\" VARCHAR2(4 CHAR) DEFAULT ' ', \n\t\"MANUFACTURER\" NUMBER(12,0), \n\t\"PMA\" VARCHAR2(1 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"RESOURCE_TYPE_ID\" VARCHAR2(20 CHAR), \n\t\"COUNTER_TEMPLATE_GROUPNO_I\" NUMBER(12,0), \n\t\"MUTATION\" NUMBER(12,0) DEFAULT 0 NOT NULL ENABLE, \n\t\"MUTATOR\" VARCHAR2(8 CHAR) DEFAULT ' ' NOT NULL ENABLE, \n\t\"STATUS\" NUMBER(12,0) DEFAULT 0 NOT NULL ENABLE, \n\t\"MUTATION_TIME\" NUMBER(12,0), \n\t\"CREATED_BY\" VARCHAR2(8 CHAR), \n\t\"CREATED_DATE\" NUMBER(12,0), \n\t\"MANUAL_SAFETY_STOCK\" VARCHAR2(10 CHAR) DEFAULT NULL, \n\t\"MIN_SAFETY_STOCK\" BINARY_DOUBLE DEFAULT NULL, \n\t CONSTRAINT \"U_16020\" UNIQUE (\"PARTSEQNO_I\")\n USING INDEX ENABLE, \n\t CONSTRAINT \"FK_16026\" FOREIGN KEY (\"AC_TYP\")\n\t REFERENCES \"AMOS_PRIV_PRD_1\".\"AC_TYP\" (\"AC_TYP\") ENABLE NOVALIDATE\n ) ;\n CREATE INDEX \"AMOS_PRIV_PRD_1\".\"I_102365\" ON \"AMOS_PRIV_PRD_1\".\"PART\" (\"PARTNO\", \"MUTATION\", \"MUTATION_TIME\") \n ;\nALTER TABLE \"AMOS_PRIV_PRD_1\".\"PART\" ADD CONSTRAINT \"PK_16019\" PRIMARY KEY (\"PARTNO\")\n USING INDEX \"AMOS_PRIV_PRD_1\".\"I_102365\" ENABLE;","tableChanges":[{"type":"CREATE","id":"\"AMSGCPYPDB1\".\"AMOS_PRIV_PRD_1\".\"PART\"","table":{"defaultCharsetName":null,"primaryKeyColumnNames":["PARTNO"],"columns":[{"name":"EXTENDED_STATENO_I","jdbcType":2,"typeName":"NUMBER","typeExpression":"NUMBER","charsetName":null,"length":12,"scale":0,"position":1,"optional":true,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"enumValues":[]},{"name":"PARTNO","jdbcType":12,"typeName":"VARCHAR2","typeExpression":"VARCHAR2","charsetName":null,"length":32,"position":2,"optional":false,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"defaultValueExpression":"' ' ","enumValues":[]},{"name":"PARTMATCH","jdbcType":12,"typeName":"VARCHAR2","typeExpression":"VARCHAR2","charsetName":null,"length":32,"position":3,"optional":false,"autoIncremented":false,"generated":false,"comment":null,"hasDefaultValue":true,"defaultValueExpression":"' ' ","enumValues":[]},
sample data we are getiing

@GOVINDARAMTEKKAR97 i recommend checking debezium-server logs, then you can see what is actually happening.
having schema changes doesn't means debezium is replicating the table.

Hi @ismailsimsek ok if I do ./run.sh i will showing the process which parameter need to set on application.properties so I can see details log, can you provide me please.

Oracle connector configuration

debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=1521
debezium.source.database.user=user
debezium.source.database.password=password$1234
debezium.source.database.dbname=db
debezium.source.database.pdb.name=pdb
debezium.source.table.include.list=schemaname.tablename
snapshot.include.collection.list=schemaname.tablename
#debezium.source.schema.include.list=schemaname
#debezium.source.database.connection.adapter=logminer
#debezium.source.snapshot.mode=schema_only
#debezium.source.database.schema=
#################################################################################

debezium.source.include.schema.changes=true

Uncomment if using a PDB

debezium.source.topic.prefix=tutorial

If you have a specific start SCN, set it here. If not, let it auto-generate.

#debezium.source.start.scn=default

Uncomment if you have a specific end SCN, otherwise, omit it

#debezium.source.end.scn=null

Optional table filters

#debezium.source.table.include.list=your-schema.your-table
#debezium.source.include.schema.changes=true
#debezium.source.replica.identity.autoset.values=lot:FULL

##########################################################################################

Sink type configuration

debezium.sink.type=iceberg
debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.upsert=false
debezium.sink.iceberg.upsert-keep-deletes=false
debezium.sink.iceberg.write.format.default=parquet
debezium.sink.iceberg.catalog-name=iceberg
#debezium.sink.iceberg.table-name=tablename
############################################################################################

Enable event schemas

debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

###############################################################################################

Unwrap messages

debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

############################################################################################

Logging configuration

quarkus.log.level=INFO
quarkus.log.console.json=false
quarkus.log.category."org.apache.hadoop".level=WARN
quarkus.log.category."org.apache.parquet".level=WARN
quarkus.log.category."org.eclipse.jetty".level=WARN

##############################################################################################

debezium.source.log.mining.scn.gap.detection.gap.size.min=10000
debezium.source.log.mining.scn.gap.detection.time.interval.max.ms=300000

Advanced consuming configuration

#debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
#debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table
debezium.source.schema.history.internal=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.schema.history.internal.iceberg.table-name=debezium_database_history_storage_test

##################################################################################################

Batch configuration

debezium.sink.batch.batch-size-wait=MaxBatchSizeWait
debezium.sink.batch.batch-size-wait.max-wait-ms=180000
debezium.sink.batch.batch-size-wait.wait-interval-ms=120000
debezium.sink.batch.metrics.snapshot-mbean=debezium.oracle:type=connector-metrics,context=snapshot,server=testc
debezium.sink.batch.metrics.streaming-mbean=debezium.oracle:type=connector-metrics,context=streaming,server=testc
debezium.source.max.batch.size=15
debezium.source.max.queue.size=45

S3 File IO configuration

debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.s3.access-key-id=
debezium.sink.iceberg.s3.secret-access-key=
debezium.sink.iceberg.warehouse=s3://path
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
AWS_REGION=eu-west-1

logging.level.io.debezium=DEBUG
logging.level.org.apache.iceberg=DEBUG

we are using this things in applicaion.properties file

hello @ismailsimsek Good Morning, and also like if possible than can we connect for this through ZULIP or any bridge so you can get better understanding on this.